实时流计算、Spark Streaming、Kafka、Redis、Exactly-once、实时去重

2024-05-05 06:58

本文主要是介绍实时流计算、Spark Streaming、Kafka、Redis、Exactly-once、实时去重,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

本文想记录和表达的东西挺多的,一时想不到什么好的标题,所以就用上面的关键字作为标题了。

在实时流式计算中,最重要的是在任何情况下,消息不重复、不丢失,即Exactly-once。本文以Kafka–>Spark Streaming–>Redis为例,一方面说明一下如何做到Exactly-once,另一方面说明一下我是如何计算实时去重指标的。

spark streaming

1. 关于数据源

数据源是文本格式的日志,由Nginx产生,存放于日志服务器上。在日志服务器上部署Flume Agent,使用TAILDIR Source和Kafka Sink,将日志采集到Kafka进行临时存储。日志格式如下:

2018-02-22T00:00:00+08:00|~|200|~|/test?pcid=DEIBAH&siteid=3

2018-02-22T00:00:00+08:00|~|200|~|/test?pcid=GLLIEG&siteid=3

2018-02-22T00:00:00+08:00|~|200|~|/test?pcid=HIJMEC&siteid=8

2018-02-22T00:00:00+08:00|~|200|~|/test?pcid=HMGBDE&siteid=3

2018-02-22T00:00:00+08:00|~|200|~|/test?pcid=HIJFLA&siteid=4

2018-02-22T00:00:01+08:00|~|200|~|/test?pcid=JCEBBC&siteid=9

2018-02-22T00:00:01+08:00|~|200|~|/test?pcid=KJLAKG&siteid=8

2018-02-22T00:00:01+08:00|~|200|~|/test?pcid=FHEIKI&siteid=3

2018-02-22T00:00:01+08:00|~|200|~|/test?pcid=IGIDLB&siteid=3

2018-02-22T00:00:01+08:00|~|200|~|/test?pcid=IIIJCD&siteid=5

日志是由测试程序模拟产生的,字段之间由|~|分隔。

2. 实时计算需求

分天、分小时PV;

分天、分小时、分网站(siteid)PV;

分天 UV;

3. Spark Streaming消费Kafka数据

http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html

在Spark Streaming中消费Kafka数据,保证Exactly-once的核心有三点:

使用Direct方式连接Kafka;自己保存和维护Offset;更新Offset和计算在同一事务中完成;

后面的Spark Streaming程序(文章结尾),主要有以下步骤:

  1. 启动后,先从Redis中获取上次保存的Offset,Redis中的key为”topic_partition”,即每个分区维护一个Offset;
  2. 使用获取到的Offset,创建DirectStream;
  3. 在处理每批次的消息时,利用Redis的事务机制,确保在Redis中指标的计算和Offset的更新维护,在同一事务中完成。只有这两者同步,才能真正保证消息的Exactly-once。
 
  1. ./spark-submit \
  2. --class com.lxw1234.spark.TestSparkStreaming \
  3. --master local[2] \
  4. --conf spark.streaming.kafka.maxRatePerPartition=20000 \
  5. --jars /data1/home/dmp/lxw/realtime/commons-pool2-2.3.jar,\
  6. /data1/home/dmp/lxw/realtime/jedis-2.9.0.jar,\
  7. /data1/home/dmp/lxw/realtime/kafka-clients-0.11.0.1.jar,\
  8. /data1/home/dmp/lxw/realtime/spark-streaming-kafka-0-10_2.11-2.2.1.jar \
  9. /data1/home/dmp/lxw/realtime/testsparkstreaming.jar \
  10. --executor-memory 4G \
  11. --num-executors 1

在启动Spark Streaming程序时候,有个参数最好指定:

spark.streaming.kafka.maxRatePerPartition=20000(每秒钟从topic的每个partition最多消费的消息条数)

如果程序第一次运行,或者因为某种原因暂停了很久重新启动时候,会积累很多消息,如果这些消息同时被消费,很有可能会因为内存不够而挂掉,因此,需要根据实际的数据量大小,以及批次的间隔时间来设置该参数,以限定批次的消息量。

如果该参数设置20000,而批次间隔时间未10秒,那么每个批次最多从Kafka中消费20万消息。

4. Redis中的数据模型

  • 分小时、分网站PV

普通K-V结构,计算时候使用incr命令递增,

Key为 “site_pv_网站ID_小时”,

如:site_pv_9_2018-02-21-00、site_pv_10_2018-02-21-01

该数据模型用于计算分网站的按小时及按天PV。

  • 分小时PV

普通K-V结构,计算时候使用incr命令递增,

Key为“pv_小时”,如:pv_2018-02-21-14、pv_2018-02-22-03

该数据模型用于计算按小时及按天总PV。

  • 分天UV

Set结构,计算时候使用sadd命令添加,

Key为”uv_天”,如:uv_2018-02-21、uv_2018-02-20

该数据模型用户计算按天UV(获取时候使用SCARD命令获取Set元素个数)

 

注:这些Key对应的时间,均由实际消息中的第一个字段(时间)而定。

5. 故障恢复

如果Spark Streaming程序因为停电、网络等意外情况终止而需要恢复,则直接重启即可;

如果因为其他原因需要重新计算某一时间段的消息,可以先删除Redis中对应时间段内的Key,然后从原始日志中截取该时间段内的消息,当做新消息添加至Kafka,由Spark Streaming程序重新消费并进行计算;

6. 附程序

依赖jar包:

commons-pool2-2.3.jar

jedis-2.9.0.jar

kafka-clients-0.11.0.1.jar

spark-streaming-kafka-0-10_2.11-2.2.1.jar

InternalRedisClient (Redis链接池)

 
  1. package com.lxw1234.spark
  2.  
  3. import redis.clients.jedis.JedisPool
  4. import org.apache.commons.pool2.impl.GenericObjectPoolConfig
  5.  
  6. /**
  7. * @author lxw1234
  8. */
  9. /**
  10. * Internal Redis client for managing Redis connection {@link Jedis} based on {@link RedisPool}
  11. */
  12. object InternalRedisClient extends Serializable {
  13.  
  14. @transient private var pool: JedisPool = null
  15.  
  16. def makePool(redisHost: String, redisPort: Int, redisTimeout: Int,
  17. maxTotal: Int, maxIdle: Int, minIdle: Int): Unit = {
  18. makePool(redisHost, redisPort, redisTimeout, maxTotal, maxIdle, minIdle, true, false, 10000)
  19. }
  20.  
  21. def makePool(redisHost: String, redisPort: Int, redisTimeout: Int,
  22. maxTotal: Int, maxIdle: Int, minIdle: Int, testOnBorrow: Boolean,
  23. testOnReturn: Boolean, maxWaitMillis: Long): Unit = {
  24. if(pool == null) {
  25. val poolConfig = new GenericObjectPoolConfig()
  26. poolConfig.setMaxTotal(maxTotal)
  27. poolConfig.setMaxIdle(maxIdle)
  28. poolConfig.setMinIdle(minIdle)
  29. poolConfig.setTestOnBorrow(testOnBorrow)
  30. poolConfig.setTestOnReturn(testOnReturn)
  31. poolConfig.setMaxWaitMillis(maxWaitMillis)
  32. pool = new JedisPool(poolConfig, redisHost, redisPort, redisTimeout)
  33.  
  34. val hook = new Thread{
  35. override def run = pool.destroy()
  36. }
  37. sys.addShutdownHook(hook.run)
  38. }
  39. }
  40.  
  41. def getPool: JedisPool = {
  42. assert(pool != null)
  43. pool
  44. }
  45. }

TestSparkStreaming

 
  1. package com.lxw1234.spark
  2.  
  3. import org.apache.kafka.clients.consumer.ConsumerRecord
  4. import org.apache.kafka.common.TopicPartition
  5. import org.apache.kafka.common.serialization.StringDeserializer
  6. import org.apache.spark.SparkConf
  7. import org.apache.spark.rdd.RDD
  8. import org.apache.spark.streaming.Seconds
  9. import org.apache.spark.streaming.StreamingContext
  10. import org.apache.spark.streaming.kafka010.ConsumerStrategies
  11. import org.apache.spark.streaming.kafka010.HasOffsetRanges
  12. import org.apache.spark.streaming.kafka010.KafkaUtils
  13. import org.apache.spark.streaming.kafka010.LocationStrategies
  14.  
  15. import redis.clients.jedis.Pipeline
  16.  
  17.  
  18. /**
  19. * @author lxw1234
  20. * 获取topic最小的offset
  21. * ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list datadev1:9092 --topic lxw1234 --time -2
  22. */
  23. object TestSparkStreaming {
  24.  
  25. def main(args : Array[String]) : Unit = {
  26. val brokers = "datadev1:9092"
  27. val topic = "lxw1234"
  28. val partition : Int = 0 //测试topic只有一个分区
  29. val start_offset : Long = 0l
  30.  
  31. //Kafka参数
  32. val kafkaParams = Map[String, Object](
  33. "bootstrap.servers" -> brokers,
  34. "key.deserializer" -> classOf[StringDeserializer],
  35. "value.deserializer" -> classOf[StringDeserializer],
  36. "group.id" -> "exactly-once",
  37. "enable.auto.commit" -> (false: java.lang.Boolean),
  38. "auto.offset.reset" -> "none"
  39. )
  40.  
  41. // Redis configurations
  42. val maxTotal = 10
  43. val maxIdle = 10
  44. val minIdle = 1
  45. val redisHost = "172.16.213.79"
  46. val redisPort = 6379
  47. val redisTimeout = 30000
  48. //默认db,用户存放Offset和pv数据
  49. val dbDefaultIndex = 8
  50. InternalRedisClient.makePool(redisHost, redisPort, redisTimeout, maxTotal, maxIdle, minIdle)
  51.  
  52.  
  53. val conf = new SparkConf().setAppName("TestSparkStreaming").setIfMissing("spark.master", "local[2]")
  54. val ssc = new StreamingContext(conf, Seconds(10))
  55.  
  56. //从Redis获取上一次存的Offset
  57. val jedis = InternalRedisClient.getPool.getResource
  58. jedis.select(dbDefaultIndex)
  59. val topic_partition_key = topic + "_" + partition
  60. var lastOffset = 0l
  61. val lastSavedOffset = jedis.get(topic_partition_key)
  62.  
  63. if(null != lastSavedOffset) {
  64. try {
  65. lastOffset = lastSavedOffset.toLong
  66. } catch {
  67. case ex : Exception => println(ex.getMessage)
  68. println("get lastSavedOffset error, lastSavedOffset from redis [" + lastSavedOffset + "] ")
  69. System.exit(1)
  70. }
  71. }
  72. InternalRedisClient.getPool.returnResource(jedis)
  73.  
  74. println("lastOffset from redis -> " + lastOffset)
  75.  
  76. //设置每个分区起始的Offset
  77. val fromOffsets = Map{new TopicPartition(topic, partition) -> lastOffset}
  78.  
  79. //使用Direct API 创建Stream
  80. val stream = KafkaUtils.createDirectStream[String, String](
  81. ssc,
  82. LocationStrategies.PreferConsistent,
  83. ConsumerStrategies.Assign[String, String](fromOffsets.keys.toList, kafkaParams, fromOffsets)
  84. )
  85.  
  86. //开始处理批次消息
  87. stream.foreachRDD {
  88. rdd =>
  89. val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  90.  
  91. val result = processLogs(rdd)
  92. println("=============== Total " + result.length + " events in this batch ..")
  93.  
  94. val jedis = InternalRedisClient.getPool.getResource
  95. val p1 : Pipeline = jedis.pipelined();
  96. p1.select(dbDefaultIndex)
  97. p1.multi() //开启事务
  98.  
  99.  
  100. //逐条处理消息
  101. result.foreach {
  102. record =>
  103. //增加小时总pv
  104. val pv_by_hour_key = "pv_" + record.hour
  105. p1.incr(pv_by_hour_key)
  106.  
  107. //增加网站小时pv
  108. val site_pv_by_hour_key = "site_pv_" + record.site_id + "_" + record.hour
  109. p1.incr(site_pv_by_hour_key)
  110.  
  111. //使用set保存当天的uv
  112. val uv_by_day_key = "uv_" + record.hour.substring(0, 10)
  113. p1.sadd(uv_by_day_key, record.user_id)
  114. }
  115.  
  116. //更新Offset
  117. offsetRanges.foreach { offsetRange =>
  118. println("partition : " + offsetRange.partition + " fromOffset: " + offsetRange.fromOffset + " untilOffset: " + offsetRange.untilOffset)
  119. val topic_partition_key = offsetRange.topic + "_" + offsetRange.partition
  120. p1.set(topic_partition_key, offsetRange.untilOffset + "")
  121. }
  122.  
  123. p1.exec();//提交事务
  124. p1.sync();//关闭pipeline
  125.  
  126. InternalRedisClient.getPool.returnResource(jedis)
  127.  
  128. }
  129.  
  130. case class MyRecord(hour: String, user_id: String, site_id: String)
  131.  
  132. def processLogs(messages: RDD[ConsumerRecord[String, String]]) : Array[MyRecord] = {
  133. messages.map(_.value()).flatMap(parseLog).collect()
  134. }
  135.  
  136. //解析每条日志,生成MyRecord
  137. def parseLog(line: String): Option[MyRecord] = {
  138. val ary : Array[String] = line.split("\\|~\\|", -1);
  139. try {
  140. val hour = ary(0).substring(0, 13).replace("T", "-")
  141. val uri = ary(2).split("[=|&]",-1)
  142. val user_id = uri(1)
  143. val site_id = uri(3)
  144. return Some(MyRecord(hour,user_id,site_id))
  145.  
  146. } catch {
  147. case ex : Exception => println(ex.getMessage)
  148. }
  149.  
  150. return None
  151.  
  152. }
  153.  
  154.  
  155.  
  156.  
  157. ssc.start()
  158. ssc.awaitTermination()
  159. }
  160.  
  161.  
  162. }

 

spark streaming

 

spark streaming

如果觉得本博客对您有帮助,请 赞助作者 。

 

转载地址:http://lxw1234.com/archives/2018/02/901.htm

这篇关于实时流计算、Spark Streaming、Kafka、Redis、Exactly-once、实时去重的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/961062

相关文章

windows和Linux使用命令行计算文件的MD5值

《windows和Linux使用命令行计算文件的MD5值》在Windows和Linux系统中,您可以使用命令行(终端或命令提示符)来计算文件的MD5值,文章介绍了在Windows和Linux/macO... 目录在Windows上:在linux或MACOS上:总结在Windows上:可以使用certuti

redis中使用lua脚本的原理与基本使用详解

《redis中使用lua脚本的原理与基本使用详解》在Redis中使用Lua脚本可以实现原子性操作、减少网络开销以及提高执行效率,下面小编就来和大家详细介绍一下在redis中使用lua脚本的原理... 目录Redis 执行 Lua 脚本的原理基本使用方法使用EVAL命令执行 Lua 脚本使用EVALSHA命令

Redis 热 key 和大 key 问题小结

《Redis热key和大key问题小结》:本文主要介绍Redis热key和大key问题小结,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录一、什么是 Redis 热 key?热 key(Hot Key)定义: 热 key 常见表现:热 key 的风险:二、

C#使用StackExchange.Redis实现分布式锁的两种方式介绍

《C#使用StackExchange.Redis实现分布式锁的两种方式介绍》分布式锁在集群的架构中发挥着重要的作用,:本文主要介绍C#使用StackExchange.Redis实现分布式锁的... 目录自定义分布式锁获取锁释放锁自动续期StackExchange.Redis分布式锁获取锁释放锁自动续期分布式

Redis Pipeline(管道) 详解

《RedisPipeline(管道)详解》Pipeline管道是Redis提供的一种批量执行命令的机制,通过将多个命令一次性发送到服务器并统一接收响应,减少网络往返次数(RTT),显著提升执行效率... 目录Redis Pipeline 详解1. Pipeline 的核心概念2. 工作原理与性能提升3. 核

redis过期key的删除策略介绍

《redis过期key的删除策略介绍》:本文主要介绍redis过期key的删除策略,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录第一种策略:被动删除第二种策略:定期删除第三种策略:强制删除关于big key的清理UNLINK命令FLUSHALL/FLUSHDB命

Redis消息队列实现异步秒杀功能

《Redis消息队列实现异步秒杀功能》在高并发场景下,为了提高秒杀业务的性能,可将部分工作交给Redis处理,并通过异步方式执行,Redis提供了多种数据结构来实现消息队列,总结三种,本文详细介绍Re... 目录1 Redis消息队列1.1 List 结构1.2 Pub/Sub 模式1.3 Stream 结

SpringBoot中配置Redis连接池的完整指南

《SpringBoot中配置Redis连接池的完整指南》这篇文章主要为大家详细介绍了SpringBoot中配置Redis连接池的完整指南,文中的示例代码讲解详细,具有一定的借鉴价值,感兴趣的小伙伴可以... 目录一、添加依赖二、配置 Redis 连接池三、测试 Redis 操作四、完整示例代码(一)pom.

深入理解Apache Kafka(分布式流处理平台)

《深入理解ApacheKafka(分布式流处理平台)》ApacheKafka作为现代分布式系统中的核心中间件,为构建高吞吐量、低延迟的数据管道提供了强大支持,本文将深入探讨Kafka的核心概念、架构... 目录引言一、Apache Kafka概述1.1 什么是Kafka?1.2 Kafka的核心概念二、Ka

Redis在windows环境下如何启动

《Redis在windows环境下如何启动》:本文主要介绍Redis在windows环境下如何启动的实现方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录Redis在Windows环境下启动1.在redis的安装目录下2.输入·redis-server.exe