kafka Consumer high-level api 之白名单

2024-04-05 07:38

本文主要是介绍kafka Consumer high-level api 之白名单,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Kafka提供了两套API给Consumer

  1. The high-level Consumer API
  2. The SimpleConsumer API     

第一种高度抽象的Consumer API,它使用起来简单、方便,但是对于某些特殊的需求我们可能要用到第二种更底层的API,那么下面来介绍下第一种API:

使用白名单可以适配多个topic的情况。

示例代码:

import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.UnsupportedEncodingException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.consumer.Whitelist;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import com.mysite.constant.Constants;
import com.mysite.util.PropertiesUtil;
import com.mysite.util.Utils;public class KafkaConsumer {private static Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);private ConsumerIterator<byte[], byte[]> iterator = null;private static ConsumerConfig consumerConfig;private ConsumerConnector connector = null;private List<KafkaStream<byte[], byte[]>> partitions = null;private Whitelist whitelist = null;private int threads = 0;private String[] topics;private String type;private String topic = null;private String message = null;private MessageAndMetadata<byte[], byte[]> next = null;public KafkaConsumer(Properties props) {String topicStr = props.getProperty("topics");if(topicStr==null||topicStr.trim().length()<=0){throw new NullPointerException("请正确填写TOPIC.");}		threads = Integer.parseInt(props.getProperty("threads", "1").trim());consumerConfig = createConsumerConfig(props);// topic的过滤器whitelist = new Whitelist("(" + topicStr + ")");init();}/*** 初始化参数* * @param props* @return*/private static ConsumerConfig createConsumerConfig(Properties props) {logger.info("---init kafka config...");props.put("zookeeper.session.timeout.ms", "30000");props.put("zookeeper.sync.time.ms", "6000");props.put("auto.commit.enable", "true");props.put("auto.commit.interval.ms", "1000");props.put("auto.offset.reset", "largest");return new ConsumerConfig(props);}private void init() {connector = Consumer.createJavaConsumerConnector(consumerConfig);partitions = connector.createMessageStreamsByFilter(whitelist,threads);if (CollectionUtils.isEmpty(partitions)) {logger.info("empty!");try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}}logger.info("---connect kafka success!");try{for (KafkaStream<byte[], byte[]> partition : partitions) {iterator = partition.iterator();while (iterator.hasNext()) {next = iterator.next();try {message = new String(next.message(), Constants.UTF8);} catch (UnsupportedEncodingException e) {e.printStackTrace();}logger.info(Thread.currentThread()+",partition:"+partition+",offset:" + next.offset() + ",message:" + message);}}}catch (Exception e) {logger.error("run time error:{}",e);close();try {TimeUnit.SECONDS.sleep(5);} catch (InterruptedException e1) {e1.printStackTrace();}init();}}/*** 销毁资源 未使用* */private void close() {logger.info("close resource...");if (partitions != null)partitions.clear();partitions = null;if (iterator != null)iterator.clearCurrentChunk();iterator = null;if (connector != null)connector.shutdown();connector = null;}/*** 主方法入口* * @param args*/public static void main(String[] args) {FileInputStream fis = null;Properties props = new Properties();Properties kafkaProps = null;Properties syslogProps = null;try {String encode = System.getProperty(Constants.ENCODE, Constants.UTF8).trim();logger.info("encode:{}", encode);String path = System.getProperty(Constants.CONFIG_PATH);logger.info("path:{}", path);if(path==null||path.trim().length()<=0){throw new NullPointerException("请正确填写配置文件路径.");}fis = new FileInputStream(path);props.load(new InputStreamReader(fis, encode));kafkaProps = PropertiesUtil.getProperties(Constants.KAFKA_PREFIX, props);logger.info("kafkaProps:{}", kafkaProps);new KafkaConsumer(kafkaProps);} catch (Exception e) {logger.error("----Runtime error:", e);} finally {if (fis != null) {try {fis.close();} catch (IOException e) {e.printStackTrace();}}if (props != null)props.clear();if (kafkaProps != null)kafkaProps.clear();}}
}


使用到的配置:

zookeeper.connect=192.168.0.25:2181,192.168.0.26:2181
group.id=groupId1
topics=topic1,topic2
threads=2



这篇关于kafka Consumer high-level api 之白名单的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/878007

相关文章

PHP应用中处理限流和API节流的最佳实践

《PHP应用中处理限流和API节流的最佳实践》限流和API节流对于确保Web应用程序的可靠性、安全性和可扩展性至关重要,本文将详细介绍PHP应用中处理限流和API节流的最佳实践,下面就来和小编一起学习... 目录限流的重要性在 php 中实施限流的最佳实践使用集中式存储进行状态管理(如 Redis)采用滑动

Java Kafka消费者实现过程

《JavaKafka消费者实现过程》Kafka消费者通过KafkaConsumer类实现,核心机制包括偏移量管理、消费者组协调、批量拉取消息及多线程处理,手动提交offset确保数据可靠性,自动提交... 目录基础KafkaConsumer类分析关键代码与核心算法2.1 订阅与分区分配2.2 拉取消息2.3

Python利用PySpark和Kafka实现流处理引擎构建指南

《Python利用PySpark和Kafka实现流处理引擎构建指南》本文将深入解剖基于Python的实时处理黄金组合:Kafka(分布式消息队列)与PySpark(分布式计算引擎)的化学反应,并构建一... 目录引言:数据洪流时代的生存法则第一章 Kafka:数据世界的中央神经系统消息引擎核心设计哲学高吞吐

Go语言使用net/http构建一个RESTful API的示例代码

《Go语言使用net/http构建一个RESTfulAPI的示例代码》Go的标准库net/http提供了构建Web服务所需的强大功能,虽然众多第三方框架(如Gin、Echo)已经封装了很多功能,但... 目录引言一、什么是 RESTful API?二、实战目标:用户信息管理 API三、代码实现1. 用户数据

Python用Flask封装API及调用详解

《Python用Flask封装API及调用详解》本文介绍Flask的优势(轻量、灵活、易扩展),对比GET/POST表单/JSON请求方式,涵盖错误处理、开发建议及生产环境部署注意事项... 目录一、Flask的优势一、基础设置二、GET请求方式服务端代码客户端调用三、POST表单方式服务端代码客户端调用四

SpringBoot结合Knife4j进行API分组授权管理配置详解

《SpringBoot结合Knife4j进行API分组授权管理配置详解》在现代的微服务架构中,API文档和授权管理是不可或缺的一部分,本文将介绍如何在SpringBoot应用中集成Knife4j,并进... 目录环境准备配置 Swagger配置 Swagger OpenAPI自定义 Swagger UI 底

使用Python的requests库调用API接口的详细步骤

《使用Python的requests库调用API接口的详细步骤》使用Python的requests库调用API接口是开发中最常用的方式之一,它简化了HTTP请求的处理流程,以下是详细步骤和实战示例,涵... 目录一、准备工作:安装 requests 库二、基本调用流程(以 RESTful API 为例)1.

SpringBoot监控API请求耗时的6中解决解决方案

《SpringBoot监控API请求耗时的6中解决解决方案》本文介绍SpringBoot中记录API请求耗时的6种方案,包括手动埋点、AOP切面、拦截器、Filter、事件监听、Micrometer+... 目录1. 简介2.实战案例2.1 手动记录2.2 自定义AOP记录2.3 拦截器技术2.4 使用Fi

Knife4j+Axios+Redis前后端分离架构下的 API 管理与会话方案(最新推荐)

《Knife4j+Axios+Redis前后端分离架构下的API管理与会话方案(最新推荐)》本文主要介绍了Swagger与Knife4j的配置要点、前后端对接方法以及分布式Session实现原理,... 目录一、Swagger 与 Knife4j 的深度理解及配置要点Knife4j 配置关键要点1.Spri

HTML5 getUserMedia API网页录音实现指南示例小结

《HTML5getUserMediaAPI网页录音实现指南示例小结》本教程将指导你如何利用这一API,结合WebAudioAPI,实现网页录音功能,从获取音频流到处理和保存录音,整个过程将逐步... 目录1. html5 getUserMedia API简介1.1 API概念与历史1.2 功能与优势1.3