Spark Streaming整合log4j、Flume与Kafka的案例

2024-09-06 20:38

本文主要是介绍Spark Streaming整合log4j、Flume与Kafka的案例,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

点击上方蓝色字体,选择“设为星标

回复”资源“获取更多资源

来源:作者TAI_SPARK,http://suo.im/5w7LF8

大数据技术与架构

点击右侧关注,大数据开发领域最强公众号!

暴走大数据

点击右侧关注,暴走大数据!

1.框架

2.log4j完成模拟日志输出

设置模拟日志格式,log4j.properties:

log4j.rootLogger = INFO,stdoutlog4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.target = System.out
log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern = %d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c] [%p] - %m%n

模拟日志输出,LoggerGenerator.java:

import org.apache.log4j.Logger;/*** 模拟日志产生*/
public class LoggerGenerator {private static Logger logger = Logger.getLogger(LoggerGenerator.class.getName());public static void main(String[] args) throws Exception{int index = 0;while(true){Thread.sleep(1000);logger.info("value:" + index++);}}
}

运行结果:

2020-03-07 18:21:37,637 [main] [LoggerGenerator] [INFO] - current value is:0
2020-03-07 18:21:38,639 [main] [LoggerGenerator] [INFO] - current value is:1
2020-03-07 18:21:39,639 [main] [LoggerGenerator] [INFO] - current value is:2
2020-03-07 18:21:40,640 [main] [LoggerGenerator] [INFO] - current value is:3
2020-03-07 18:21:41,640 [main] [LoggerGenerator] [INFO] - current value is:4
2020-03-07 18:21:42,641 [main] [LoggerGenerator] [INFO] - current value is:5
2020-03-07 18:21:43,641 [main] [LoggerGenerator] [INFO] - current value is:6
2020-03-07 18:21:44,642 [main] [LoggerGenerator] [INFO] - current value is:7
2020-03-07 18:21:45,642 [main] [LoggerGenerator] [INFO] - current value is:8
2020-03-07 18:21:46,642 [main] [LoggerGenerator] [INFO] - current value is:9
2020-03-07 18:21:47,643 [main] [LoggerGenerator] [INFO] - current value is:10

3.Flume收集log4j日志

$FLUME_HOME/conf/streaming.conf:

agent1.sources=avro-source
agent1.channels=logger-channel
agent1.sinks=log-sink#define source
agent1.sources.avro-source.type=avro
agent1.sources.avro-source.bind=0.0.0.0
agent1.sources.avro-source.port=41414#define channel
agent1.channels.logger-channel.type=memory#define sink
agent1.sinks.log-sink.type=loggeragent1.sources.avro-source.channels=logger-channel
agent1.sinks.log-sink.channel=logger-channel

启动Flume(注意输出到控制台上为INFO,console,不是点【.】):

flume-ng agent \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/streaming.conf \
--name agent1 \
-Dflume.root.logger=INFO,console

pom.xml加上一个jar包:

    <dependency><groupId>org.apache.flume.flume-ng-clients</groupId><artifactId>flume-ng-log4jappender</artifactId><version>1.6.0</version></dependency>

修改log4j.properties,使其与Flume链接:

log4j.rootLogger = INFO,stdout,flumelog4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.target = System.out
log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern = %d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c] [%p] - %m%nlog4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender
log4j.appender.flume.Hostname = hadoop000
log4j.appender.flume.Port = 41414
log4j.appender.flume.UnsafeMode = true

启动log4j:

Flume采集成功

4.KafkaSink链接Kafka与Flume

使用Kafka第一件事是把Zookeeper启动起来~

./zkServer.sh start

启动Kafka

./kafka-server-start.sh -daemon /home/hadoop/app/kafka_2.11-0.9.0.0/config/server.properties 

看下Kafka列表(用./kafka-topics.sh会报错,用“./”加文件名.sh执行时,必须给.sh文件加x执行权限):

kafka-topics.sh --list --zookeeper hadoop000:2181

创建一个topic:

kafka-topics.sh --create \
--zookeeper hadoop000:2181 \
--replication-factor 1 \
--partitions 1 \
--topic tp_streamingtopic

对接Flume与Kafka,设置Flume的conf,取名为streaming2.conf:

Kafka sink需要的参数有(每个版本不一样,具体可以查阅官网):

  • sink类型填KafkaSink

  • 需要链接的Kafka topic

  • Kafka中间件broker的地址与端口号

  • 是否使用握手机制

  • 每次发送的数据大小

agent1.sources=avro-source
agent1.channels=logger-channel
agent1.sinks=kafka-sink#define source
agent1.sources.avro-source.type=avro
agent1.sources.avro-source.bind=0.0.0.0
agent1.sources.avro-source.port=41414#define channel
agent1.channels.logger-channel.type=memory#define sink
agent1.sinks.kafka-sink.type=org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.kafka-sink.topic = tp_streamingtopic
agent1.sinks.kafka-sink.brokerList = hadoop000:9092
agent1.sinks.kafka-sink.requiredAcks = 1
agent1.sinks.kafka-sink.batchSize = 20agent1.sources.avro-source.channels=logger-channel
agent1.sinks.kafka-sink.channel=logger-channel

启动Flume:

flume-ng agent \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/streaming2.conf \
--name agent1 \
-Dflume.root.logger=INFO,console

Kafka需要启动一个消费者消费Flume中Kafka sink来的数据:

./kafka-console-consumer.sh --zookeeper hadoop000:2181 --topic tp_streamingtopic

启动log4j:

成功传输~

5.Spark Streaming消费Kafka数据

package com.taipark.sparkimport kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}/*** Spark Streaming 对接 Kafka*/
object KafkaStreamingApp {def main(args: Array[String]): Unit = {if(args.length != 2){System.err.println("Userage:KafkaStreamingApp<brokers><topics>");System.exit(1);}val Array(brokers,topics) = argsval sparkConf = new SparkConf().setAppName("KafkaReceiverWordCount").setMaster("local[2]")val ssc = new StreamingContext(sparkConf,Seconds(5))val kafkaParams = Map[String,String]("metadata.broker.list"-> brokers)val topicSet = topics.split(",").toSetval messages = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topicSet)//第二位是字符串的值messages.map(_._2).flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print()ssc.start()ssc.awaitTermination()}}

入参是Kafka的broker地址与topic名称:

本地Run一下:

欢迎点赞+收藏+转发朋友圈素质三连

文章不错?点个【在看】吧! ????

这篇关于Spark Streaming整合log4j、Flume与Kafka的案例的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1143065

相关文章

Java Stream流使用案例深入详解

《JavaStream流使用案例深入详解》:本文主要介绍JavaStream流使用案例详解,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录前言1. Lambda1.1 语法1.2 没参数只有一条语句或者多条语句1.3 一个参数只有一条语句或者多

SpringBoot整合OpenFeign的完整指南

《SpringBoot整合OpenFeign的完整指南》OpenFeign是由Netflix开发的一个声明式Web服务客户端,它使得编写HTTP客户端变得更加简单,本文为大家介绍了SpringBoot... 目录什么是OpenFeign环境准备创建 Spring Boot 项目添加依赖启用 OpenFeig

SpringBoot整合mybatisPlus实现批量插入并获取ID详解

《SpringBoot整合mybatisPlus实现批量插入并获取ID详解》这篇文章主要为大家详细介绍了SpringBoot如何整合mybatisPlus实现批量插入并获取ID,文中的示例代码讲解详细... 目录【1】saveBATch(一万条数据总耗时:2478ms)【2】集合方式foreach(一万条数

MySQL 中的 JSON 查询案例详解

《MySQL中的JSON查询案例详解》:本文主要介绍MySQL的JSON查询的相关知识,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录mysql 的 jsON 路径格式基本结构路径组件详解特殊语法元素实际示例简单路径复杂路径简写操作符注意MySQL 的 J

Python Transformers库(NLP处理库)案例代码讲解

《PythonTransformers库(NLP处理库)案例代码讲解》本文介绍transformers库的全面讲解,包含基础知识、高级用法、案例代码及学习路径,内容经过组织,适合不同阶段的学习者,对... 目录一、基础知识1. Transformers 库简介2. 安装与环境配置3. 快速上手示例二、核心模

Spring Boot 整合 SSE的高级实践(Server-Sent Events)

《SpringBoot整合SSE的高级实践(Server-SentEvents)》SSE(Server-SentEvents)是一种基于HTTP协议的单向通信机制,允许服务器向浏览器持续发送实... 目录1、简述2、Spring Boot 中的SSE实现2.1 添加依赖2.2 实现后端接口2.3 配置超时时

springboot整合阿里云百炼DeepSeek实现sse流式打印的操作方法

《springboot整合阿里云百炼DeepSeek实现sse流式打印的操作方法》:本文主要介绍springboot整合阿里云百炼DeepSeek实现sse流式打印,本文给大家介绍的非常详细,对大... 目录1.开通阿里云百炼,获取到key2.新建SpringBoot项目3.工具类4.启动类5.测试类6.测

深入理解Apache Kafka(分布式流处理平台)

《深入理解ApacheKafka(分布式流处理平台)》ApacheKafka作为现代分布式系统中的核心中间件,为构建高吞吐量、低延迟的数据管道提供了强大支持,本文将深入探讨Kafka的核心概念、架构... 目录引言一、Apache Kafka概述1.1 什么是Kafka?1.2 Kafka的核心概念二、Ka

Python中使用正则表达式精准匹配IP地址的案例

《Python中使用正则表达式精准匹配IP地址的案例》Python的正则表达式(re模块)是完成这个任务的利器,但你知道怎么写才能准确匹配各种合法的IP地址吗,今天我们就来详细探讨这个问题,感兴趣的朋... 目录为什么需要IP正则表达式?IP地址的基本结构基础正则表达式写法精确匹配0-255的数字验证IP地

MySQL高级查询之JOIN、子查询、窗口函数实际案例

《MySQL高级查询之JOIN、子查询、窗口函数实际案例》:本文主要介绍MySQL高级查询之JOIN、子查询、窗口函数实际案例的相关资料,JOIN用于多表关联查询,子查询用于数据筛选和过滤,窗口函... 目录前言1. JOIN(连接查询)1.1 内连接(INNER JOIN)1.2 左连接(LEFT JOI