Sprint Boot集成Kafka

2023-12-02 09:08
文章标签 boot 集成 kafka sprint

本文主要是介绍Sprint Boot集成Kafka,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

 

Sprint Boot集成Kafka

 

 

 

pom.xml

		<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>1.1.1.RELEASE</version></dependency>

 

 

 

application.properties

......#============== kafka ===================
kafka.consumer.zookeeper.connect=192.168.2.10:2181
kafka.consumer.servers=192.168.2.10:9092
kafka.consumer.enable.auto.commit=true
kafka.consumer.session.timeout=6000
kafka.consumer.auto.commit.interval=100
kafka.consumer.auto.offset.reset=latest
kafka.consumer.topic=test
kafka.consumer.group.id=test
kafka.consumer.concurrency=10kafka.producer.servers=192.168.2.10:9092
kafka.producer.retries=0
kafka.producer.batch.size=4096
kafka.producer.linger=1
kafka.producer.buffer.memory=40960......

 

 

KafkaProducerConfig

package com.youfan.kafka;import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;@Configuration
@EnableKafka
public class KafkaProducerConfig {@Value("${kafka.producer.servers}")private String servers;@Value("${kafka.producer.retries}")private int retries;@Value("${kafka.producer.batch.size}")private int batchSize;@Value("${kafka.producer.linger}")private int linger;@Value("${kafka.producer.buffer.memory}")private int bufferMemory;public Map<String, Object> producerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);props.put(ProducerConfig.RETRIES_CONFIG, retries);props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);props.put(ProducerConfig.LINGER_MS_CONFIG, linger);props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return props;}public ProducerFactory<String, String> producerFactory() {return new DefaultKafkaProducerFactory<>(producerConfigs());}@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {return new KafkaTemplate<String, String>(producerFactory());}
}

 

 

@Autowired    
private KafkaTemplate kafkaTemplate;

 

/*** 埋点收集日志,频道数量少,该模块的频道id固定,id为1,代表生鲜*/long pindaoid = 1;//频道id//productytpeid//产品类别id//producid//产品id//用户idlong userid = 0;//游客//ip地址String ipaddress = IpUtil.getIpAddress(request);System.out.println(ipaddress);HttpSession sesion = request.getSession();Object userobject = sesion.getAttribute("user");if(userobject!=null){User user = (User)userobject;userid =  user.getId();}//获取浏览器信息以及操作系统信息String osandbrowser = BrowserInfoUtil.getOsAndBrowserInfo(request);System.out.println(osandbrowser);String[] temps = osandbrowser.split("---");String os = temps[0].trim();String browser = temps[1].trim();System.out.println(os);System.out.println(browser);Productscanlog productscanlog = new Productscanlog();//根据ip获取地区和运营商try {AreaAndnetwork areaAndnetwork = AreaAndNetworkUtil.getAddressByIp(ipaddress);productscanlog.setCounty(areaAndnetwork.getCounty());productscanlog.setProvice(areaAndnetwork.getProvice());productscanlog.setCity(areaAndnetwork.getCity());productscanlog.setCounty(areaAndnetwork.getCounty());productscanlog.setNetwork(areaAndnetwork.getNetwork());}catch (Exception e){e.printStackTrace();}productscanlog.setPindaoid(pindaoid);productscanlog.setProductytpeid(productytpeid);productscanlog.setProducid(Long.valueOf(producid+""));productscanlog.setUserid(userid);productscanlog.setIp(ipaddress);productscanlog.setBrowser(browser);productscanlog.setOs(os);productscanlog.setTimestamp(new Date().getTime());String productscanlogstring = JSONObject.toJSONString(productscanlog);System.out.println(productscanlogstring);kafkaTemplate.send("productscanlog", "key", productscanlogstring);String productflume = userid +"\t" + pindaoid+"\t"+productscanlog.getTimestamp();kafkaTemplate.send("productscanlogflume","key",productflume);

 

 


==============================
QQ群:143522604
群里有相关资源
欢迎和大家一起学习、交流、提升!
==============================

 

 

这篇关于Sprint Boot集成Kafka的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/444728

相关文章

SpringBoot集成LiteFlow实现轻量级工作流引擎的详细过程

《SpringBoot集成LiteFlow实现轻量级工作流引擎的详细过程》LiteFlow是一款专注于逻辑驱动流程编排的轻量级框架,它以组件化方式快速构建和执行业务流程,有效解耦复杂业务逻辑,下面给大... 目录一、基础概念1.1 组件(Component)1.2 规则(Rule)1.3 上下文(Conte

Spring Boot 整合 Apache Flink 的详细过程

《SpringBoot整合ApacheFlink的详细过程》ApacheFlink是一个高性能的分布式流处理框架,而SpringBoot提供了快速构建企业级应用的能力,下面给大家介绍Spri... 目录Spring Boot 整合 Apache Flink 教程一、背景与目标二、环境准备三、创建项目 & 添

使用vscode搭建pywebview集成vue项目实践

《使用vscode搭建pywebview集成vue项目实践》:本文主要介绍使用vscode搭建pywebview集成vue项目实践,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地... 目录环境准备项目源码下载项目说明调试与生成可执行文件核心代码说明总结本节我们使用pythonpywebv

在Spring Boot中实现HTTPS加密通信及常见问题排查

《在SpringBoot中实现HTTPS加密通信及常见问题排查》HTTPS是HTTP的安全版本,通过SSL/TLS协议为通讯提供加密、身份验证和数据完整性保护,下面通过本文给大家介绍在SpringB... 目录一、HTTPS核心原理1.加密流程概述2.加密技术组合二、证书体系详解1、证书类型对比2. 证书获

Maven项目中集成数据库文档生成工具的操作步骤

《Maven项目中集成数据库文档生成工具的操作步骤》在Maven项目中,可以通过集成数据库文档生成工具来自动生成数据库文档,本文为大家整理了使用screw-maven-plugin(推荐)的完... 目录1. 添加插件配置到 pom.XML2. 配置数据库信息3. 执行生成命令4. 高级配置选项5. 注意事

Java集成Onlyoffice的示例代码及场景分析

《Java集成Onlyoffice的示例代码及场景分析》:本文主要介绍Java集成Onlyoffice的示例代码及场景分析,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要... 需求场景:实现文档的在线编辑,团队协作总结:两个接口 + 前端页面 + 配置项接口1:一个接口,将o

SpringBoot实现Kafka动态反序列化的完整代码

《SpringBoot实现Kafka动态反序列化的完整代码》在分布式系统中,Kafka作为高吞吐量的消息队列,常常需要处理来自不同主题(Topic)的异构数据,不同的业务场景可能要求对同一消费者组内的... 目录引言一、问题背景1.1 动态反序列化的需求1.2 常见问题二、动态反序列化的核心方案2.1 ht

Spring Boot中的YML配置列表及应用小结

《SpringBoot中的YML配置列表及应用小结》在SpringBoot中使用YAML进行列表的配置不仅简洁明了,还能提高代码的可读性和可维护性,:本文主要介绍SpringBoot中的YML配... 目录YAML列表的基础语法在Spring Boot中的应用从YAML读取列表列表中的复杂对象其他注意事项总

Swagger2与Springdoc集成与使用详解

《Swagger2与Springdoc集成与使用详解》:本文主要介绍Swagger2与Springdoc集成与使用方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐... 目录1. 依赖配置2. 基础配置2.1 启用 Springdoc2.2 自定义 OpenAPI 信息3.

Spring Boot 整合 Redis 实现数据缓存案例详解

《SpringBoot整合Redis实现数据缓存案例详解》Springboot缓存,默认使用的是ConcurrentMap的方式来实现的,然而我们在项目中并不会这么使用,本文介绍SpringB... 目录1.添加 Maven 依赖2.配置Redis属性3.创建 redisCacheManager4.使用Sp