Spark Streaming整合log4j、Flume与Kafka的案例

2024-09-06 20:38

本文主要是介绍Spark Streaming整合log4j、Flume与Kafka的案例,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

点击上方蓝色字体,选择“设为星标

回复”资源“获取更多资源

来源:作者TAI_SPARK,http://suo.im/5w7LF8

大数据技术与架构

点击右侧关注,大数据开发领域最强公众号!

暴走大数据

点击右侧关注,暴走大数据!

1.框架

2.log4j完成模拟日志输出

设置模拟日志格式,log4j.properties:

log4j.rootLogger = INFO,stdoutlog4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.target = System.out
log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern = %d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c] [%p] - %m%n

模拟日志输出,LoggerGenerator.java:

import org.apache.log4j.Logger;/*** 模拟日志产生*/
public class LoggerGenerator {private static Logger logger = Logger.getLogger(LoggerGenerator.class.getName());public static void main(String[] args) throws Exception{int index = 0;while(true){Thread.sleep(1000);logger.info("value:" + index++);}}
}

运行结果:

2020-03-07 18:21:37,637 [main] [LoggerGenerator] [INFO] - current value is:0
2020-03-07 18:21:38,639 [main] [LoggerGenerator] [INFO] - current value is:1
2020-03-07 18:21:39,639 [main] [LoggerGenerator] [INFO] - current value is:2
2020-03-07 18:21:40,640 [main] [LoggerGenerator] [INFO] - current value is:3
2020-03-07 18:21:41,640 [main] [LoggerGenerator] [INFO] - current value is:4
2020-03-07 18:21:42,641 [main] [LoggerGenerator] [INFO] - current value is:5
2020-03-07 18:21:43,641 [main] [LoggerGenerator] [INFO] - current value is:6
2020-03-07 18:21:44,642 [main] [LoggerGenerator] [INFO] - current value is:7
2020-03-07 18:21:45,642 [main] [LoggerGenerator] [INFO] - current value is:8
2020-03-07 18:21:46,642 [main] [LoggerGenerator] [INFO] - current value is:9
2020-03-07 18:21:47,643 [main] [LoggerGenerator] [INFO] - current value is:10

3.Flume收集log4j日志

$FLUME_HOME/conf/streaming.conf:

agent1.sources=avro-source
agent1.channels=logger-channel
agent1.sinks=log-sink#define source
agent1.sources.avro-source.type=avro
agent1.sources.avro-source.bind=0.0.0.0
agent1.sources.avro-source.port=41414#define channel
agent1.channels.logger-channel.type=memory#define sink
agent1.sinks.log-sink.type=loggeragent1.sources.avro-source.channels=logger-channel
agent1.sinks.log-sink.channel=logger-channel

启动Flume(注意输出到控制台上为INFO,console,不是点【.】):

flume-ng agent \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/streaming.conf \
--name agent1 \
-Dflume.root.logger=INFO,console

pom.xml加上一个jar包:

    <dependency><groupId>org.apache.flume.flume-ng-clients</groupId><artifactId>flume-ng-log4jappender</artifactId><version>1.6.0</version></dependency>

修改log4j.properties,使其与Flume链接:

log4j.rootLogger = INFO,stdout,flumelog4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.target = System.out
log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern = %d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c] [%p] - %m%nlog4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender
log4j.appender.flume.Hostname = hadoop000
log4j.appender.flume.Port = 41414
log4j.appender.flume.UnsafeMode = true

启动log4j:

Flume采集成功

4.KafkaSink链接Kafka与Flume

使用Kafka第一件事是把Zookeeper启动起来~

./zkServer.sh start

启动Kafka

./kafka-server-start.sh -daemon /home/hadoop/app/kafka_2.11-0.9.0.0/config/server.properties 

看下Kafka列表(用./kafka-topics.sh会报错,用“./”加文件名.sh执行时,必须给.sh文件加x执行权限):

kafka-topics.sh --list --zookeeper hadoop000:2181

创建一个topic:

kafka-topics.sh --create \
--zookeeper hadoop000:2181 \
--replication-factor 1 \
--partitions 1 \
--topic tp_streamingtopic

对接Flume与Kafka,设置Flume的conf,取名为streaming2.conf:

Kafka sink需要的参数有(每个版本不一样,具体可以查阅官网):

  • sink类型填KafkaSink

  • 需要链接的Kafka topic

  • Kafka中间件broker的地址与端口号

  • 是否使用握手机制

  • 每次发送的数据大小

agent1.sources=avro-source
agent1.channels=logger-channel
agent1.sinks=kafka-sink#define source
agent1.sources.avro-source.type=avro
agent1.sources.avro-source.bind=0.0.0.0
agent1.sources.avro-source.port=41414#define channel
agent1.channels.logger-channel.type=memory#define sink
agent1.sinks.kafka-sink.type=org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.kafka-sink.topic = tp_streamingtopic
agent1.sinks.kafka-sink.brokerList = hadoop000:9092
agent1.sinks.kafka-sink.requiredAcks = 1
agent1.sinks.kafka-sink.batchSize = 20agent1.sources.avro-source.channels=logger-channel
agent1.sinks.kafka-sink.channel=logger-channel

启动Flume:

flume-ng agent \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/streaming2.conf \
--name agent1 \
-Dflume.root.logger=INFO,console

Kafka需要启动一个消费者消费Flume中Kafka sink来的数据:

./kafka-console-consumer.sh --zookeeper hadoop000:2181 --topic tp_streamingtopic

启动log4j:

成功传输~

5.Spark Streaming消费Kafka数据

package com.taipark.sparkimport kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}/*** Spark Streaming 对接 Kafka*/
object KafkaStreamingApp {def main(args: Array[String]): Unit = {if(args.length != 2){System.err.println("Userage:KafkaStreamingApp<brokers><topics>");System.exit(1);}val Array(brokers,topics) = argsval sparkConf = new SparkConf().setAppName("KafkaReceiverWordCount").setMaster("local[2]")val ssc = new StreamingContext(sparkConf,Seconds(5))val kafkaParams = Map[String,String]("metadata.broker.list"-> brokers)val topicSet = topics.split(",").toSetval messages = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topicSet)//第二位是字符串的值messages.map(_._2).flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print()ssc.start()ssc.awaitTermination()}}

入参是Kafka的broker地址与topic名称:

本地Run一下:

欢迎点赞+收藏+转发朋友圈素质三连

文章不错?点个【在看】吧! ????

这篇关于Spark Streaming整合log4j、Flume与Kafka的案例的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1143065

相关文章

深度解析Java @Serial 注解及常见错误案例

《深度解析Java@Serial注解及常见错误案例》Java14引入@Serial注解,用于编译时校验序列化成员,替代传统方式解决运行时错误,适用于Serializable类的方法/字段,需注意签... 目录Java @Serial 注解深度解析1. 注解本质2. 核心作用(1) 主要用途(2) 适用位置3

Java Kafka消费者实现过程

《JavaKafka消费者实现过程》Kafka消费者通过KafkaConsumer类实现,核心机制包括偏移量管理、消费者组协调、批量拉取消息及多线程处理,手动提交offset确保数据可靠性,自动提交... 目录基础KafkaConsumer类分析关键代码与核心算法2.1 订阅与分区分配2.2 拉取消息2.3

Java 正则表达式的使用实战案例

《Java正则表达式的使用实战案例》本文详细介绍了Java正则表达式的使用方法,涵盖语法细节、核心类方法、高级特性及实战案例,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要... 目录一、正则表达式语法详解1. 基础字符匹配2. 字符类([]定义)3. 量词(控制匹配次数)4. 边

Python Counter 函数使用案例

《PythonCounter函数使用案例》Counter是collections模块中的一个类,专门用于对可迭代对象中的元素进行计数,接下来通过本文给大家介绍PythonCounter函数使用案例... 目录一、Counter函数概述二、基本使用案例(一)列表元素计数(二)字符串字符计数(三)元组计数三、C

Python利用PySpark和Kafka实现流处理引擎构建指南

《Python利用PySpark和Kafka实现流处理引擎构建指南》本文将深入解剖基于Python的实时处理黄金组合:Kafka(分布式消息队列)与PySpark(分布式计算引擎)的化学反应,并构建一... 目录引言:数据洪流时代的生存法则第一章 Kafka:数据世界的中央神经系统消息引擎核心设计哲学高吞吐

Spring Boot 整合 SSE(Server-Sent Events)实战案例(全网最全)

《SpringBoot整合SSE(Server-SentEvents)实战案例(全网最全)》本文通过实战案例讲解SpringBoot整合SSE技术,涵盖实现原理、代码配置、异常处理及前端交互,... 目录Spring Boot 整合 SSE(Server-Sent Events)1、简述SSE与其他技术的对

Java整合Protocol Buffers实现高效数据序列化实践

《Java整合ProtocolBuffers实现高效数据序列化实践》ProtocolBuffers是Google开发的一种语言中立、平台中立、可扩展的结构化数据序列化机制,类似于XML但更小、更快... 目录一、Protocol Buffers简介1.1 什么是Protocol Buffers1.2 Pro

springboot整合mqtt的步骤示例详解

《springboot整合mqtt的步骤示例详解》MQTT(MessageQueuingTelemetryTransport)是一种轻量级的消息传输协议,适用于物联网设备之间的通信,本文介绍Sprin... 目录1、引入依赖包2、yml配置3、创建配置4、自定义注解6、使用示例使用场景:mqtt可用于消息发

MySQL 临时表与复制表操作全流程案例

《MySQL临时表与复制表操作全流程案例》本文介绍MySQL临时表与复制表的区别与使用,涵盖生命周期、存储机制、操作限制、创建方法及常见问题,本文结合实例代码给大家介绍的非常详细,感兴趣的朋友跟随小... 目录一、mysql 临时表(一)核心特性拓展(二)操作全流程案例1. 复杂查询中的临时表应用2. 临时

MySQL 数据库表与查询操作实战案例

《MySQL数据库表与查询操作实战案例》本文将通过实际案例,详细介绍MySQL中数据库表的设计、数据插入以及常用的查询操作,帮助初学者快速上手,感兴趣的朋友跟随小编一起看看吧... 目录mysql 数据库表操作与查询实战案例项目一:产品相关数据库设计与创建一、数据库及表结构设计二、数据库与表的创建项目二:员