打通实时流处理log4j-flume-kafka-structured-streaming

2024-09-06 20:58

本文主要是介绍打通实时流处理log4j-flume-kafka-structured-streaming,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

大数据技术与架构

点击右侧关注,大数据开发领域最强公众号!

暴走大数据

点击右侧关注,暴走大数据!

模拟产生log4j日志

jar包依赖 pom.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
<dependency><groupId>log4j</groupId><artifactId>log4j</artifactId>
</dependency>
<dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId>
</dependency>
<dependency><groupId>org.apache.flume.flume-ng-clients</groupId><artifactId>flume-ng-log4jappender</artifactId><version>1.8.0</version>
</dependency>

java代码 LoggerGenerator.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class LoggerGenerator {private static Logger logger = Logger.getLogger(LoggerGenerator.class.getName());public static void main(String[] args) throws Exception{int index = 0;while(true) {Thread.sleep(1000);logger.info("value : " + index++);}// $ kafka-topics.sh --list --zookeeper 127.0.0.1:2181}
}

log4j.properties配置

1
2
3
4
5
6
7
8
9
10
11
12
log4j.rootLogger=INFO,stdout,flumelog4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.target = System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c] [%p] - %m%nlog4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender
log4j.appender.flume.Hostname = 127.0.0.1
log4j.appender.flume.Port = 44444
log4j.appender.flume.UnsafeMode = true
kafka broker启动

提前创建好topic【不是必须的】
flume-ng启动后,启动一个kafka console consulmer观察数据

1
2
3
$ kafka-server-start.sh $KAFKA_HOME/config/server.properties$ kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --topic default_flume_topic
flume-ng配置和启动

前面文章用过的avro-memory-kafka.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# avro-memory-kafka.conf# Name the components on this agent
avro-memory-kafka.sources = avro-source
avro-memory-kafka.sinks = kafka-sink
avro-memory-kafka.channels = momory-channel# Describe/configure the source
avro-memory-kafka.sources.avro-source.type = avro
avro-memory-kafka.sources.avro-source.bind = 127.0.0.1
avro-memory-kafka.sources.avro-source.port = 44444# Describe the sink
# Must be set to org.apache.flume.sink.kafka.KafkaSin
avro-memory-kafka.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink
avro-memory-kafka.sinks.kafka-sink.kafka.bootstrap.servers = 127.0.0.1:9092
avro-memory-kafka.sinks.kafka-sink.kafka.topic = default_flume_topic# Use a channel which buffers events in memory
avro-memory-kafka.channels.momory-channel.type = memory
avro-memory-kafka.channels.momory-channel.capacity = 1000
avro-memory-kafka.channels.momory-channel.transactionCapacity = 100# Bind the source and sink to the channel
avro-memory-kafka.sources.avro-source.channels = momory-channel
avro-memory-kafka.sinks.kafka-sink.channel = momory-channel

启动flume-ng

1
2
3
$ nohup flume-ng agent --conf conf --conf-file conf/avro-memory-kafka.conf --name avro-memory-kafka > avro-memory-kafka.out 2>&1 &$ kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic default_flume_topic --from-beginning --new-consumer
spark structured streaming实时流处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
topic = 'kafka_streaming_topic'
brokers = "127.0.0.1:9092"spark = SparkSession.builder.appName("log4j-flume-kafka-structured-streaming").getOrCreate()lines = spark.readStream.format("kafka").option("kafka.bootstrap.servers", brokers).option("subscribe", topic).option("startingOffsets", """{"%s":{"0": 7}}""" % topic).load().selectExpr("CAST(value AS STRING)")# 自定义处理传输的数据-比如JSON串
words = lines.select(explode(split(lines.value, ' ')).alias('word')
)
word_counts = words.groupBy('word').count()query = word_counts.writeStream.outputMode("complete").format("console").start()
query.awaitTermination()

欢迎点赞+收藏+转发朋友圈素质三连

文章不错?点个【在看】吧! ????

这篇关于打通实时流处理log4j-flume-kafka-structured-streaming的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1143102

相关文章

Python Transformers库(NLP处理库)案例代码讲解

《PythonTransformers库(NLP处理库)案例代码讲解》本文介绍transformers库的全面讲解,包含基础知识、高级用法、案例代码及学习路径,内容经过组织,适合不同阶段的学习者,对... 目录一、基础知识1. Transformers 库简介2. 安装与环境配置3. 快速上手示例二、核心模

一文详解Java异常处理你都了解哪些知识

《一文详解Java异常处理你都了解哪些知识》:本文主要介绍Java异常处理的相关资料,包括异常的分类、捕获和处理异常的语法、常见的异常类型以及自定义异常的实现,文中通过代码介绍的非常详细,需要的朋... 目录前言一、什么是异常二、异常的分类2.1 受检异常2.2 非受检异常三、异常处理的语法3.1 try-

Python使用getopt处理命令行参数示例解析(最佳实践)

《Python使用getopt处理命令行参数示例解析(最佳实践)》getopt模块是Python标准库中一个简单但强大的命令行参数处理工具,它特别适合那些需要快速实现基本命令行参数解析的场景,或者需要... 目录为什么需要处理命令行参数?getopt模块基础实际应用示例与其他参数处理方式的比较常见问http

Java Response返回值的最佳处理方案

《JavaResponse返回值的最佳处理方案》在开发Web应用程序时,我们经常需要通过HTTP请求从服务器获取响应数据,这些数据可以是JSON、XML、甚至是文件,本篇文章将详细解析Java中处理... 目录摘要概述核心问题:关键技术点:源码解析示例 1:使用HttpURLConnection获取Resp

Java中Switch Case多个条件处理方法举例

《Java中SwitchCase多个条件处理方法举例》Java中switch语句用于根据变量值执行不同代码块,适用于多个条件的处理,:本文主要介绍Java中SwitchCase多个条件处理的相... 目录前言基本语法处理多个条件示例1:合并相同代码的多个case示例2:通过字符串合并多个case进阶用法使用

Java实现优雅日期处理的方案详解

《Java实现优雅日期处理的方案详解》在我们的日常工作中,需要经常处理各种格式,各种类似的的日期或者时间,下面我们就来看看如何使用java处理这样的日期问题吧,感兴趣的小伙伴可以跟随小编一起学习一下... 目录前言一、日期的坑1.1 日期格式化陷阱1.2 时区转换二、优雅方案的进阶之路2.1 线程安全重构2

Python处理函数调用超时的四种方法

《Python处理函数调用超时的四种方法》在实际开发过程中,我们可能会遇到一些场景,需要对函数的执行时间进行限制,例如,当一个函数执行时间过长时,可能会导致程序卡顿、资源占用过高,因此,在某些情况下,... 目录前言func-timeout1. 安装 func-timeout2. 基本用法自定义进程subp

Java字符串处理全解析(String、StringBuilder与StringBuffer)

《Java字符串处理全解析(String、StringBuilder与StringBuffer)》:本文主要介绍Java字符串处理全解析(String、StringBuilder与StringBu... 目录Java字符串处理全解析:String、StringBuilder与StringBuffer一、St

浅析Java中如何优雅地处理null值

《浅析Java中如何优雅地处理null值》这篇文章主要为大家详细介绍了如何结合Lambda表达式和Optional,让Java更优雅地处理null值,感兴趣的小伙伴可以跟随小编一起学习一下... 目录场景 1:不为 null 则执行场景 2:不为 null 则返回,为 null 则返回特定值或抛出异常场景

深入理解Apache Kafka(分布式流处理平台)

《深入理解ApacheKafka(分布式流处理平台)》ApacheKafka作为现代分布式系统中的核心中间件,为构建高吞吐量、低延迟的数据管道提供了强大支持,本文将深入探讨Kafka的核心概念、架构... 目录引言一、Apache Kafka概述1.1 什么是Kafka?1.2 Kafka的核心概念二、Ka