打通实时流处理log4j-flume-kafka-structured-streaming

2024-09-06 20:58

本文主要是介绍打通实时流处理log4j-flume-kafka-structured-streaming,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

大数据技术与架构

点击右侧关注,大数据开发领域最强公众号!

暴走大数据

点击右侧关注,暴走大数据!

模拟产生log4j日志

jar包依赖 pom.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
<dependency><groupId>log4j</groupId><artifactId>log4j</artifactId>
</dependency>
<dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId>
</dependency>
<dependency><groupId>org.apache.flume.flume-ng-clients</groupId><artifactId>flume-ng-log4jappender</artifactId><version>1.8.0</version>
</dependency>

java代码 LoggerGenerator.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class LoggerGenerator {private static Logger logger = Logger.getLogger(LoggerGenerator.class.getName());public static void main(String[] args) throws Exception{int index = 0;while(true) {Thread.sleep(1000);logger.info("value : " + index++);}// $ kafka-topics.sh --list --zookeeper 127.0.0.1:2181}
}

log4j.properties配置

1
2
3
4
5
6
7
8
9
10
11
12
log4j.rootLogger=INFO,stdout,flumelog4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.target = System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c] [%p] - %m%nlog4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender
log4j.appender.flume.Hostname = 127.0.0.1
log4j.appender.flume.Port = 44444
log4j.appender.flume.UnsafeMode = true
kafka broker启动

提前创建好topic【不是必须的】
flume-ng启动后,启动一个kafka console consulmer观察数据

1
2
3
$ kafka-server-start.sh $KAFKA_HOME/config/server.properties$ kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --topic default_flume_topic
flume-ng配置和启动

前面文章用过的avro-memory-kafka.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# avro-memory-kafka.conf# Name the components on this agent
avro-memory-kafka.sources = avro-source
avro-memory-kafka.sinks = kafka-sink
avro-memory-kafka.channels = momory-channel# Describe/configure the source
avro-memory-kafka.sources.avro-source.type = avro
avro-memory-kafka.sources.avro-source.bind = 127.0.0.1
avro-memory-kafka.sources.avro-source.port = 44444# Describe the sink
# Must be set to org.apache.flume.sink.kafka.KafkaSin
avro-memory-kafka.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink
avro-memory-kafka.sinks.kafka-sink.kafka.bootstrap.servers = 127.0.0.1:9092
avro-memory-kafka.sinks.kafka-sink.kafka.topic = default_flume_topic# Use a channel which buffers events in memory
avro-memory-kafka.channels.momory-channel.type = memory
avro-memory-kafka.channels.momory-channel.capacity = 1000
avro-memory-kafka.channels.momory-channel.transactionCapacity = 100# Bind the source and sink to the channel
avro-memory-kafka.sources.avro-source.channels = momory-channel
avro-memory-kafka.sinks.kafka-sink.channel = momory-channel

启动flume-ng

1
2
3
$ nohup flume-ng agent --conf conf --conf-file conf/avro-memory-kafka.conf --name avro-memory-kafka > avro-memory-kafka.out 2>&1 &$ kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic default_flume_topic --from-beginning --new-consumer
spark structured streaming实时流处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
topic = 'kafka_streaming_topic'
brokers = "127.0.0.1:9092"spark = SparkSession.builder.appName("log4j-flume-kafka-structured-streaming").getOrCreate()lines = spark.readStream.format("kafka").option("kafka.bootstrap.servers", brokers).option("subscribe", topic).option("startingOffsets", """{"%s":{"0": 7}}""" % topic).load().selectExpr("CAST(value AS STRING)")# 自定义处理传输的数据-比如JSON串
words = lines.select(explode(split(lines.value, ' ')).alias('word')
)
word_counts = words.groupBy('word').count()query = word_counts.writeStream.outputMode("complete").format("console").start()
query.awaitTermination()

欢迎点赞+收藏+转发朋友圈素质三连

文章不错?点个【在看】吧! ????

这篇关于打通实时流处理log4j-flume-kafka-structured-streaming的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1143102

相关文章

解决docker目录内存不足扩容处理方案

《解决docker目录内存不足扩容处理方案》文章介绍了Docker存储目录迁移方法:因系统盘空间不足,需将Docker数据迁移到更大磁盘(如/home/docker),通过修改daemon.json配... 目录1、查看服务器所有磁盘的使用情况2、查看docker镜像和容器存储目录的空间大小3、停止dock

5 种使用Python自动化处理PDF的实用方法介绍

《5种使用Python自动化处理PDF的实用方法介绍》自动化处理PDF文件已成为减少重复工作、提升工作效率的重要手段,本文将介绍五种实用方法,从内置工具到专业库,帮助你在Python中实现PDF任务... 目录使用内置库(os、subprocess)调用外部工具使用 PyPDF2 进行基本 PDF 操作使用

分析 Java Stream 的 peek使用实践与副作用处理方案

《分析JavaStream的peek使用实践与副作用处理方案》StreamAPI的peek操作是中间操作,用于观察元素但不终止流,其副作用风险包括线程安全、顺序混乱及性能问题,合理使用场景有限... 目录一、peek 操作的本质:有状态的中间操作二、副作用的定义与风险场景1. 并行流下的线程安全问题2. 顺

Java 结构化并发Structured Concurrency实践举例

《Java结构化并发StructuredConcurrency实践举例》Java21结构化并发通过作用域和任务句柄统一管理并发生命周期,解决线程泄漏与任务追踪问题,提升代码安全性和可观测性,其核心... 目录一、结构化并发的核心概念与设计目标二、结构化并发的核心组件(一)作用域(Scopes)(二)任务句柄

Python异常处理之避免try-except滥用的3个核心原则

《Python异常处理之避免try-except滥用的3个核心原则》在Python开发中,异常处理是保证程序健壮性的关键机制,本文结合真实案例与Python核心机制,提炼出避免异常滥用的三大原则,有需... 目录一、精准打击:只捕获可预见的异常类型1.1 通用异常捕获的陷阱1.2 精准捕获的实践方案1.3

Pandas处理缺失数据的方式汇总

《Pandas处理缺失数据的方式汇总》许多教程中的数据与现实世界中的数据有很大不同,现实世界中的数据很少是干净且同质的,本文我们将讨论处理缺失数据的一些常规注意事项,了解Pandas如何表示缺失数据,... 目录缺失数据约定的权衡Pandas 中的缺失数据None 作为哨兵值NaN:缺失的数值数据Panda

C++中处理文本数据char与string的终极对比指南

《C++中处理文本数据char与string的终极对比指南》在C++编程中char和string是两种用于处理字符数据的类型,但它们在使用方式和功能上有显著的不同,:本文主要介绍C++中处理文本数... 目录1. 基本定义与本质2. 内存管理3. 操作与功能4. 性能特点5. 使用场景6. 相互转换核心区别

Python动态处理文件编码的完整指南

《Python动态处理文件编码的完整指南》在Python文件处理的高级应用中,我们经常会遇到需要动态处理文件编码的场景,本文将深入探讨Python中动态处理文件编码的技术,有需要的小伙伴可以了解下... 目录引言一、理解python的文件编码体系1.1 Python的IO层次结构1.2 编码问题的常见场景二

Python函数的基本用法、返回值特性、全局变量修改及异常处理技巧

《Python函数的基本用法、返回值特性、全局变量修改及异常处理技巧》本文将通过实际代码示例,深入讲解Python函数的基本用法、返回值特性、全局变量修改以及异常处理技巧,感兴趣的朋友跟随小编一起看看... 目录一、python函数定义与调用1.1 基本函数定义1.2 函数调用二、函数返回值详解2.1 有返

SpringBoot分段处理List集合多线程批量插入数据方式

《SpringBoot分段处理List集合多线程批量插入数据方式》文章介绍如何处理大数据量List批量插入数据库的优化方案:通过拆分List并分配独立线程处理,结合Spring线程池与异步方法提升效率... 目录项目场景解决方案1.实体类2.Mapper3.spring容器注入线程池bejsan对象4.创建