kafka Consumer high-level api 之白名单

2024-04-05 07:38

本文主要是介绍kafka Consumer high-level api 之白名单,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Kafka提供了两套API给Consumer

  1. The high-level Consumer API
  2. The SimpleConsumer API     

第一种高度抽象的Consumer API,它使用起来简单、方便,但是对于某些特殊的需求我们可能要用到第二种更底层的API,那么下面来介绍下第一种API:

使用白名单可以适配多个topic的情况。

示例代码:

import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.UnsupportedEncodingException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.consumer.Whitelist;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import com.mysite.constant.Constants;
import com.mysite.util.PropertiesUtil;
import com.mysite.util.Utils;public class KafkaConsumer {private static Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);private ConsumerIterator<byte[], byte[]> iterator = null;private static ConsumerConfig consumerConfig;private ConsumerConnector connector = null;private List<KafkaStream<byte[], byte[]>> partitions = null;private Whitelist whitelist = null;private int threads = 0;private String[] topics;private String type;private String topic = null;private String message = null;private MessageAndMetadata<byte[], byte[]> next = null;public KafkaConsumer(Properties props) {String topicStr = props.getProperty("topics");if(topicStr==null||topicStr.trim().length()<=0){throw new NullPointerException("请正确填写TOPIC.");}		threads = Integer.parseInt(props.getProperty("threads", "1").trim());consumerConfig = createConsumerConfig(props);// topic的过滤器whitelist = new Whitelist("(" + topicStr + ")");init();}/*** 初始化参数* * @param props* @return*/private static ConsumerConfig createConsumerConfig(Properties props) {logger.info("---init kafka config...");props.put("zookeeper.session.timeout.ms", "30000");props.put("zookeeper.sync.time.ms", "6000");props.put("auto.commit.enable", "true");props.put("auto.commit.interval.ms", "1000");props.put("auto.offset.reset", "largest");return new ConsumerConfig(props);}private void init() {connector = Consumer.createJavaConsumerConnector(consumerConfig);partitions = connector.createMessageStreamsByFilter(whitelist,threads);if (CollectionUtils.isEmpty(partitions)) {logger.info("empty!");try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}}logger.info("---connect kafka success!");try{for (KafkaStream<byte[], byte[]> partition : partitions) {iterator = partition.iterator();while (iterator.hasNext()) {next = iterator.next();try {message = new String(next.message(), Constants.UTF8);} catch (UnsupportedEncodingException e) {e.printStackTrace();}logger.info(Thread.currentThread()+",partition:"+partition+",offset:" + next.offset() + ",message:" + message);}}}catch (Exception e) {logger.error("run time error:{}",e);close();try {TimeUnit.SECONDS.sleep(5);} catch (InterruptedException e1) {e1.printStackTrace();}init();}}/*** 销毁资源 未使用* */private void close() {logger.info("close resource...");if (partitions != null)partitions.clear();partitions = null;if (iterator != null)iterator.clearCurrentChunk();iterator = null;if (connector != null)connector.shutdown();connector = null;}/*** 主方法入口* * @param args*/public static void main(String[] args) {FileInputStream fis = null;Properties props = new Properties();Properties kafkaProps = null;Properties syslogProps = null;try {String encode = System.getProperty(Constants.ENCODE, Constants.UTF8).trim();logger.info("encode:{}", encode);String path = System.getProperty(Constants.CONFIG_PATH);logger.info("path:{}", path);if(path==null||path.trim().length()<=0){throw new NullPointerException("请正确填写配置文件路径.");}fis = new FileInputStream(path);props.load(new InputStreamReader(fis, encode));kafkaProps = PropertiesUtil.getProperties(Constants.KAFKA_PREFIX, props);logger.info("kafkaProps:{}", kafkaProps);new KafkaConsumer(kafkaProps);} catch (Exception e) {logger.error("----Runtime error:", e);} finally {if (fis != null) {try {fis.close();} catch (IOException e) {e.printStackTrace();}}if (props != null)props.clear();if (kafkaProps != null)kafkaProps.clear();}}
}


使用到的配置:

zookeeper.connect=192.168.0.25:2181,192.168.0.26:2181
group.id=groupId1
topics=topic1,topic2
threads=2



这篇关于kafka Consumer high-level api 之白名单的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/878007

相关文章

SpringBoot实现Kafka动态反序列化的完整代码

《SpringBoot实现Kafka动态反序列化的完整代码》在分布式系统中,Kafka作为高吞吐量的消息队列,常常需要处理来自不同主题(Topic)的异构数据,不同的业务场景可能要求对同一消费者组内的... 目录引言一、问题背景1.1 动态反序列化的需求1.2 常见问题二、动态反序列化的核心方案2.1 ht

使用Python实现调用API获取图片存储到本地的方法

《使用Python实现调用API获取图片存储到本地的方法》开发一个自动化工具,用于从JSON数据源中提取图像ID,通过调用指定API获取未经压缩的原始图像文件,并确保下载结果与Postman等工具直接... 目录使用python实现调用API获取图片存储到本地1、项目概述2、核心功能3、环境准备4、代码实现

无法启动此程序因为计算机丢失api-ms-win-core-path-l1-1-0.dll修复方案

《无法启动此程序因为计算机丢失api-ms-win-core-path-l1-1-0.dll修复方案》:本文主要介绍了无法启动此程序,详细内容请阅读本文,希望能对你有所帮助... 在计算机使用过程中,我们经常会遇到一些错误提示,其中之一就是"api-ms-win-core-path-l1-1-0.dll丢失

python通过curl实现访问deepseek的API

《python通过curl实现访问deepseek的API》这篇文章主要为大家详细介绍了python如何通过curl实现访问deepseek的API,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编... API申请和充值下面是deepeek的API网站https://platform.deepsee

Java对接Dify API接口的完整流程

《Java对接DifyAPI接口的完整流程》Dify是一款AI应用开发平台,提供多种自然语言处理能力,通过调用Dify开放API,开发者可以快速集成智能对话、文本生成等功能到自己的Java应用中,本... 目录Java对接Dify API接口完整指南一、Dify API简介二、准备工作三、基础对接实现1.

一文详解如何在Vue3中封装API请求

《一文详解如何在Vue3中封装API请求》在现代前端开发中,API请求是不可避免的一部分,尤其是与后端交互时,下面我们来看看如何在Vue3项目中封装API请求,让你在实现功能时更加高效吧... 目录为什么要封装API请求1. vue 3项目结构2. 安装axIOS3. 创建API封装模块4. 封装API请求

深入理解Apache Kafka(分布式流处理平台)

《深入理解ApacheKafka(分布式流处理平台)》ApacheKafka作为现代分布式系统中的核心中间件,为构建高吞吐量、低延迟的数据管道提供了强大支持,本文将深入探讨Kafka的核心概念、架构... 目录引言一、Apache Kafka概述1.1 什么是Kafka?1.2 Kafka的核心概念二、Ka

springboot项目中常用的工具类和api详解

《springboot项目中常用的工具类和api详解》在SpringBoot项目中,开发者通常会依赖一些工具类和API来简化开发、提高效率,以下是一些常用的工具类及其典型应用场景,涵盖Spring原生... 目录1. Spring Framework 自带工具类(1) StringUtils(2) Coll

基于Flask框架添加多个AI模型的API并进行交互

《基于Flask框架添加多个AI模型的API并进行交互》:本文主要介绍如何基于Flask框架开发AI模型API管理系统,允许用户添加、删除不同AI模型的API密钥,感兴趣的可以了解下... 目录1. 概述2. 后端代码说明2.1 依赖库导入2.2 应用初始化2.3 API 存储字典2.4 路由函数2.5 应

一文详解kafka开启kerberos认证的完整步骤

《一文详解kafka开启kerberos认证的完整步骤》这篇文章主要为大家详细介绍了kafka开启kerberos认证的完整步骤,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录一、kerberos安装部署二、准备机器三、Kerberos Server 安装1、配置krb5.con