node读取卡夫卡

2024-05-06 18:18
文章标签 读取 node 卡夫卡

本文主要是介绍node读取卡夫卡,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

https://www.npmjs.com/package/no-kafka
测试可用
node 代码
//下面代码测试可用

let Kafka = require('no-kafka');
let settings=require('../settings');
let DBProvider = require("../models/db.js").DBProvider;
let DeviceItem = require("../models/deviceitem.js");
let async=require('async');
let db = new DBProvider(settings);
DeviceItem.setdb(db);
let consumer = new Kafka.SimpleConsumer({connectionString: '169.60.8.115:9092,169.60.8.109:9092,169.48.170.237:9092'}); //测试环境
//let consumer = new Kafka.SimpleConsumer({connectionString: '10.177.210.87:9092,10.177.210.82:9092,10.177.184.101:9092'}); //线上// data handler function can return a Promise
let kafka_data=[];
let collection_name="kafka_zaful";
let referRegex=/"refer":"(.*?)","lkid/;let dataHandler = function (messageSet, topic, partition) {messageSet.forEach(function (message) {console.log(topic, partition, message.offset, message.message.value.toString('utf8'));let str=message.message.value.toString('utf8');//console.log('kafka_data.length=',kafka_data.length);let item;try {item=JSON.parse(str);} catch (e) {let obj=referRegex.exec(str);let repl_str='"refer":"'+obj[1].replace(/"/g,"'").replace(/\\/g,'')+'","lkid';try {item=JSON.parse(str.replace(referRegex,repl_str));} catch (e2) {console.log('!!!!!');console.log(str);}}if (item) {kafka_data.push(item);saveKafkaData();} else {console.log('!!! item null');}});
};return consumer.init().then(function () {// Subscribe partitons 0 and 1 in a topic:return consumer.subscribe('cdn-etl-zaful-com', [0,1,2,3,4,5], {offset: 0, groupId: 'cdn-zaful-local20'}, dataHandler);
});

//另一个模块 (测试不可用)

// let ServiceRouter = require('../service-router/serviceRouter.js');
function toKafka() {let kafka = require('kafka-node');let Consumer = kafka.Consumer;//let client = new kafka.Client('10.177.210.87:9092,10.177.210.82:9092,10.177.184.101:9092');//线上let client = new kafka.Client('169.60.8.115:9092,169.60.8.109:9092,169.48.170.237:9092'); //测试let Offset = kafka.Offset;let offset = new Offset(client);console.log('连接kafka中');
//     let topics = [{
//         topic: 'cdn-etl-zaful-com', partition: 0, offset: 0
//     }, {
//         topic: 'cdn-etl-zaful-com', partition: 1, offset: 310
//     }, {
//         topic: 'cdn-etl-zaful-com', partition: 2, offset: 20103
//     }, {
//         topic: 'cdn-etl-zaful-com', partition: 3, offset: 42055
//     }];
//     let options = {
// // Auto commit config
//         autoCommit: true,
//         autoCommitMsgCount: 100,
//         autoCommitIntervalMs: 1000,
// // Fetch message config
//         fetchMaxWaitMs: 100,
//         fetchMinBytes: 1,
//         fetchMaxBytes: 1024 * 10,
//         fromOffset: true,
//         fromBeginning: true
//     };let argv = {topic: "cdn-etl-zaful-com"};let topic = argv.topic || 'cdn-etl-zaful-com';let topics = [{topic: topic,partition:0}],options = {groupId: 'cdn-test',autoCommit: true,autoCommitIntervalMs:1000,sessionTimeout:30000,// fetchMaxWaitMs: 1000,// fetchMaxBytes: 1024 * 1024,// fromOffset: true,// fromBeginning: true};console.log('create ');// console.log('topics=',topics);// console.log('options=',options);let consumer = new Consumer(client,topics,options);console.log('listen ');consumer.connect();// consumer.resume()consumer.on('connect', function () {console.log('connect');client.loadMetadataForTopics([], function (error, results) {if (error) {return console.error(error);}console.log('%j', _.get(results, '1.metadata'));});});consumer.on('message', function (message) {console.log(message);let key = message.key.toString();console.log(key);if (key !== -1) {console.log(message);try {let msg = JSON.parse(message.value);// ServiceRouter.dispatch(key, msg);} catch (e) {console.log(e)}} else {console.log(message)}});consumer.on('offsetOutOfRange', function (topic) {console.log('topic =',topic);console.log("------------- offsetOutOfRange ------------");topic.maxNum = 2;offset.fetch([topic], function (err, offsets) {console.log(offsets);let min = Math.min.apply(null, offsets[topic.topic][topic.partition]);consumer.setOffset(topic.topic, topic.partition, min);});});consumer.on('error', function (message) {console.log(message);console.log('kafka错误');});
}
module.exports = toKafka;

Java
需要构建一个maven项目

import java.util.Arrays;
import java.util.Properties;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import com.alibaba.fastjson.JSON;public class KafkaConsumerTest
{private static final String BOOTSTRAPSERVERS = "169.60.8.115:9092,169.60.8.109:9092,169.48.170.237:9092";//private static final String BOOTSTRAPSERVERS = "10.177.210.87:9092,10.177.210.82:9092,10.177.184.101:9092";private static final String TOPIC = "cdn-etl-zaful-com";private static Properties getPropsOfConsumer(){Properties props = new Properties();props.put("bootstrap.servers", BOOTSTRAPSERVERS);props.put("group.id", "cdn-local-1");props.put("auto.offset.reset", "earliest");//earliestprops.put("enable.auto.commit", "true");props.put("auto.commit.interval.ms", "1000");props.put("session.timeout.ms", "30000");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");return props;}public static void main(String[] args){System.out.println("开始读");KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(getPropsOfConsumer());consumer.subscribe(Arrays.asList(TOPIC));while (true){//System.out.println("reading...");ConsumerRecords<String, String> records = consumer.poll(100);//System.out.println("records=");
/*          if (records!=null) {System.out.println(records);}*/for (ConsumerRecord<String, String> record : records){System.out.println(record.value());}}}}

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.example</groupId><artifactId>KafkaConsumer</artifactId><version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>0.10.1.1</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>1.6.6</version></dependency><dependency><groupId>net.sf.json-lib</groupId><artifactId>json-lib</artifactId><version>2.4</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.39</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.6.6</version></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.16</version></dependency></dependencies>
</project>

这篇关于node读取卡夫卡的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/965009

相关文章

SpringBoot多环境配置数据读取方式

《SpringBoot多环境配置数据读取方式》SpringBoot通过环境隔离机制,支持properties/yaml/yml多格式配置,结合@Value、Environment和@Configura... 目录一、多环境配置的核心思路二、3种配置文件格式详解2.1 properties格式(传统格式)1.

解决pandas无法读取csv文件数据的问题

《解决pandas无法读取csv文件数据的问题》本文讲述作者用Pandas读取CSV文件时因参数设置不当导致数据错位,通过调整delimiter和on_bad_lines参数最终解决问题,并强调正确参... 目录一、前言二、问题复现1. 问题2. 通过 on_bad_lines=‘warn’ 跳过异常数据3

Python使用openpyxl读取Excel的操作详解

《Python使用openpyxl读取Excel的操作详解》本文介绍了使用Python的openpyxl库进行Excel文件的创建、读写、数据操作、工作簿与工作表管理,包括创建工作簿、加载工作簿、操作... 目录1 概述1.1 图示1.2 安装第三方库2 工作簿 workbook2.1 创建:Workboo

Java中读取YAML文件配置信息常见问题及解决方法

《Java中读取YAML文件配置信息常见问题及解决方法》:本文主要介绍Java中读取YAML文件配置信息常见问题及解决方法,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要... 目录1 使用Spring Boot的@ConfigurationProperties2. 使用@Valu

SpringBoot读取ZooKeeper(ZK)属性的方法实现

《SpringBoot读取ZooKeeper(ZK)属性的方法实现》本文主要介绍了SpringBoot读取ZooKeeper(ZK)属性的方法实现,强调使用@ConfigurationProperti... 目录1. 在配置文件中定义 ZK 属性application.propertiesapplicati

Python中文件读取操作漏洞深度解析与防护指南

《Python中文件读取操作漏洞深度解析与防护指南》在Web应用开发中,文件操作是最基础也最危险的功能之一,这篇文章将全面剖析Python环境中常见的文件读取漏洞类型,成因及防护方案,感兴趣的小伙伴可... 目录引言一、静态资源处理中的路径穿越漏洞1.1 典型漏洞场景1.2 os.path.join()的陷

VSCode中配置node.js的实现示例

《VSCode中配置node.js的实现示例》本文主要介绍了VSCode中配置node.js的实现示例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着... 目录一.node.js下载安装教程二.配置npm三.配置环境变量四.VSCode配置五.心得一.no

如何使用 Python 读取 Excel 数据

《如何使用Python读取Excel数据》:本文主要介绍使用Python读取Excel数据的详细教程,通过pandas和openpyxl,你可以轻松读取Excel文件,并进行各种数据处理操... 目录使用 python 读取 Excel 数据的详细教程1. 安装必要的依赖2. 读取 Excel 文件3. 读

Spring Boot读取配置文件的五种方式小结

《SpringBoot读取配置文件的五种方式小结》SpringBoot提供了灵活多样的方式来读取配置文件,这篇文章为大家介绍了5种常见的读取方式,文中的示例代码简洁易懂,大家可以根据自己的需要进... 目录1. 配置文件位置与加载顺序2. 读取配置文件的方式汇总方式一:使用 @Value 注解读取配置方式二

基于Python实现读取嵌套压缩包下文件的方法

《基于Python实现读取嵌套压缩包下文件的方法》工作中遇到的问题,需要用Python实现嵌套压缩包下文件读取,本文给大家介绍了详细的解决方法,并有相关的代码示例供大家参考,需要的朋友可以参考下... 目录思路完整代码代码优化思路打开外层zip压缩包并遍历文件:使用with zipfile.ZipFil