Sprint Boot集成Kafka

2023-12-02 09:08
文章标签 boot 集成 kafka sprint

本文主要是介绍Sprint Boot集成Kafka,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

 

Sprint Boot集成Kafka

 

 

 

pom.xml

		<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>1.1.1.RELEASE</version></dependency>

 

 

 

application.properties

......#============== kafka ===================
kafka.consumer.zookeeper.connect=192.168.2.10:2181
kafka.consumer.servers=192.168.2.10:9092
kafka.consumer.enable.auto.commit=true
kafka.consumer.session.timeout=6000
kafka.consumer.auto.commit.interval=100
kafka.consumer.auto.offset.reset=latest
kafka.consumer.topic=test
kafka.consumer.group.id=test
kafka.consumer.concurrency=10kafka.producer.servers=192.168.2.10:9092
kafka.producer.retries=0
kafka.producer.batch.size=4096
kafka.producer.linger=1
kafka.producer.buffer.memory=40960......

 

 

KafkaProducerConfig

package com.youfan.kafka;import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;@Configuration
@EnableKafka
public class KafkaProducerConfig {@Value("${kafka.producer.servers}")private String servers;@Value("${kafka.producer.retries}")private int retries;@Value("${kafka.producer.batch.size}")private int batchSize;@Value("${kafka.producer.linger}")private int linger;@Value("${kafka.producer.buffer.memory}")private int bufferMemory;public Map<String, Object> producerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);props.put(ProducerConfig.RETRIES_CONFIG, retries);props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);props.put(ProducerConfig.LINGER_MS_CONFIG, linger);props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return props;}public ProducerFactory<String, String> producerFactory() {return new DefaultKafkaProducerFactory<>(producerConfigs());}@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {return new KafkaTemplate<String, String>(producerFactory());}
}

 

 

@Autowired    
private KafkaTemplate kafkaTemplate;

 

/*** 埋点收集日志,频道数量少,该模块的频道id固定,id为1,代表生鲜*/long pindaoid = 1;//频道id//productytpeid//产品类别id//producid//产品id//用户idlong userid = 0;//游客//ip地址String ipaddress = IpUtil.getIpAddress(request);System.out.println(ipaddress);HttpSession sesion = request.getSession();Object userobject = sesion.getAttribute("user");if(userobject!=null){User user = (User)userobject;userid =  user.getId();}//获取浏览器信息以及操作系统信息String osandbrowser = BrowserInfoUtil.getOsAndBrowserInfo(request);System.out.println(osandbrowser);String[] temps = osandbrowser.split("---");String os = temps[0].trim();String browser = temps[1].trim();System.out.println(os);System.out.println(browser);Productscanlog productscanlog = new Productscanlog();//根据ip获取地区和运营商try {AreaAndnetwork areaAndnetwork = AreaAndNetworkUtil.getAddressByIp(ipaddress);productscanlog.setCounty(areaAndnetwork.getCounty());productscanlog.setProvice(areaAndnetwork.getProvice());productscanlog.setCity(areaAndnetwork.getCity());productscanlog.setCounty(areaAndnetwork.getCounty());productscanlog.setNetwork(areaAndnetwork.getNetwork());}catch (Exception e){e.printStackTrace();}productscanlog.setPindaoid(pindaoid);productscanlog.setProductytpeid(productytpeid);productscanlog.setProducid(Long.valueOf(producid+""));productscanlog.setUserid(userid);productscanlog.setIp(ipaddress);productscanlog.setBrowser(browser);productscanlog.setOs(os);productscanlog.setTimestamp(new Date().getTime());String productscanlogstring = JSONObject.toJSONString(productscanlog);System.out.println(productscanlogstring);kafkaTemplate.send("productscanlog", "key", productscanlogstring);String productflume = userid +"\t" + pindaoid+"\t"+productscanlog.getTimestamp();kafkaTemplate.send("productscanlogflume","key",productflume);

 

 


==============================
QQ群:143522604
群里有相关资源
欢迎和大家一起学习、交流、提升!
==============================

 

 

这篇关于Sprint Boot集成Kafka的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/444728

相关文章

在 Spring Boot 中实现异常处理最佳实践

《在SpringBoot中实现异常处理最佳实践》本文介绍如何在SpringBoot中实现异常处理,涵盖核心概念、实现方法、与先前查询的集成、性能分析、常见问题和最佳实践,感兴趣的朋友一起看看吧... 目录一、Spring Boot 异常处理的背景与核心概念1.1 为什么需要异常处理?1.2 Spring B

如何在 Spring Boot 中实现 FreeMarker 模板

《如何在SpringBoot中实现FreeMarker模板》FreeMarker是一种功能强大、轻量级的模板引擎,用于在Java应用中生成动态文本输出(如HTML、XML、邮件内容等),本文... 目录什么是 FreeMarker 模板?在 Spring Boot 中实现 FreeMarker 模板1. 环

Spring Boot中JSON数值溢出问题从报错到优雅解决办法

《SpringBoot中JSON数值溢出问题从报错到优雅解决办法》:本文主要介绍SpringBoot中JSON数值溢出问题从报错到优雅的解决办法,通过修改字段类型为Long、添加全局异常处理和... 目录一、问题背景:为什么我的接口突然报错了?二、为什么会发生这个错误?1. Java 数据类型的“容量”限制

Spring Boot 整合 SSE的高级实践(Server-Sent Events)

《SpringBoot整合SSE的高级实践(Server-SentEvents)》SSE(Server-SentEvents)是一种基于HTTP协议的单向通信机制,允许服务器向浏览器持续发送实... 目录1、简述2、Spring Boot 中的SSE实现2.1 添加依赖2.2 实现后端接口2.3 配置超时时

Spring Boot读取配置文件的五种方式小结

《SpringBoot读取配置文件的五种方式小结》SpringBoot提供了灵活多样的方式来读取配置文件,这篇文章为大家介绍了5种常见的读取方式,文中的示例代码简洁易懂,大家可以根据自己的需要进... 目录1. 配置文件位置与加载顺序2. 读取配置文件的方式汇总方式一:使用 @Value 注解读取配置方式二

Spring Boot 集成 Quartz并使用Cron 表达式实现定时任务

《SpringBoot集成Quartz并使用Cron表达式实现定时任务》本篇文章介绍了如何在SpringBoot中集成Quartz进行定时任务调度,并通过Cron表达式控制任务... 目录前言1. 添加 Quartz 依赖2. 创建 Quartz 任务3. 配置 Quartz 任务调度4. 启动 Sprin

Spring Boot循环依赖原理、解决方案与最佳实践(全解析)

《SpringBoot循环依赖原理、解决方案与最佳实践(全解析)》循环依赖指两个或多个Bean相互直接或间接引用,形成闭环依赖关系,:本文主要介绍SpringBoot循环依赖原理、解决方案与最... 目录一、循环依赖的本质与危害1.1 什么是循环依赖?1.2 核心危害二、Spring的三级缓存机制2.1 三

在Spring Boot中浅尝内存泄漏的实战记录

《在SpringBoot中浅尝内存泄漏的实战记录》本文给大家分享在SpringBoot中浅尝内存泄漏的实战记录,结合实例代码给大家介绍的非常详细,感兴趣的朋友一起看看吧... 目录使用静态集合持有对象引用,阻止GC回收关键点:可执行代码:验证:1,运行程序(启动时添加JVM参数限制堆大小):2,访问 htt

SpringBoot集成Milvus实现数据增删改查功能

《SpringBoot集成Milvus实现数据增删改查功能》milvus支持的语言比较多,支持python,Java,Go,node等开发语言,本文主要介绍如何使用Java语言,采用springboo... 目录1、Milvus基本概念2、添加maven依赖3、配置yml文件4、创建MilvusClient

深入理解Apache Kafka(分布式流处理平台)

《深入理解ApacheKafka(分布式流处理平台)》ApacheKafka作为现代分布式系统中的核心中间件,为构建高吞吐量、低延迟的数据管道提供了强大支持,本文将深入探讨Kafka的核心概念、架构... 目录引言一、Apache Kafka概述1.1 什么是Kafka?1.2 Kafka的核心概念二、Ka