spark streaming中的广播变量应用

2024-06-16 19:58

本文主要是介绍spark streaming中的广播变量应用,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1. 广播变量

我们知道spark 的广播变量允许缓存一个只读的变量在每台机器上面,而不是每个任务保存一份拷贝。常见于spark在一些全局统计的场景中应用。通过广播变量,能够以一种更有效率的方式将一个大数据量输入集合的副本分配给每个节点。Spark也尝试着利用有效的广播算法去分配广播变量,以减少通信的成本。 
一个广播变量可以通过调用SparkContext.broadcast(v)方法从一个初始变量v中创建。广播变量是v的一个包装变量,它的值可以通过value方法访问,下面的代码说明了这个过程:

scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)

2. Spark Streaming 广播变量的更新

广播变量的声明很简单,调用broadcast就能搞定,并且scala中一切可序列化的对象都是可以进行广播的,这就给了我们很大的想象空间,可以利用广播变量将一些经常访问的大变量进行广播,而不是每个任务保存一份,这样可以减少资源上的浪费。

但是,现在项目中遇到一种这样的需求,用spark streaming 通过一些离线全局更新好的数据对用户进行实时推荐(当然这里基于一些spark streaming的内部机制,不能实现真正的时效性):(1)日志流通过kafka获取 (2) 解析日志流数据,融合离线的全局数据,对每个Dtream进行计算(3)计算结果最后发送到redis中。

其中就会涉及这样的问题:(1)离线全局的数据是需要全局获取的,不能局部进行计算 (2)这部分数据是离线定期更新的,而spark streaming一旦开始,就长时间运行。如果离线数据更新了,如何在开始的流计算中,获取到这部分更新后的数据。

针对上述问题,我们可以直接想的一种方法是,在driver端开启一个附属线程,周期性去获取离线的全局数据,然后通过diver分发到各个task中。但是考虑到这种方式:spark streaming整体的性能开销会很大,并且重新开启的后台线程的不易管理。结合spark中的广播变量,我们采用另一种方式来解决以上问题: 
1> spark中的广播变量是只读的,通过unpersist函数,可以内存中的相关序列化对象 
2> 通过Dstream的foreachRDD方法,做到定时更新 (官网上有说明,该方法是在driver端执行的)


import java.io.{ObjectInputStream, ObjectOutputStream}
import com.bf.dt.wireless.config.WirelessConfig
import com.bf.dt.wireless.formator.WirelessFormator
import com.bf.dt.wireless.storage.MysqlConnectionPool
import com.bf.dt.wireless.utils.DateUtils
import kafka.serializer.StringDecoder
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.json4s._
import org.slf4j.LoggerFactory
import scala.collection.mutableobject WirelessLogAnalysis {object BroadcastWrapper {@volatile private var instance: Broadcast[Map[String, List[String]]] = nullprivate val map = mutable.LinkedHashMap[String, List[String]]()def getMysql(): Map[String, List[String]] = {//1.获取mysql连接池的一个连接val conn = MysqlConnectionPool.getConnection.get//2.查询新的数据val sql = "select aid_type,aids from cf_similarity"val ps = conn.prepareStatement(sql)val rs = ps.executeQuery()while (rs.next()) {val aid = rs.getString("aid_type")val aids = rs.getString("aids").split(",").toListmap += (aid -> aids)}//3.连接池回收连接MysqlConnectionPool.closeConnection(conn)map.toMap}def update(sc: SparkContext, blocking: Boolean = false): Unit = {if (instance != null)instance.unpersist(blocking)instance = sc.broadcast(getMysql())}def getInstance(sc: SparkContext): Broadcast[Map[String, List[String]]] = {if (instance == null) {synchronized {if (instance == null) {instance = sc.broadcast(getMysql)}}}instance}private def writeObject(out: ObjectOutputStream): Unit = {out.writeObject(instance)}private def readObject(in: ObjectInputStream): Unit = {instance = in.readObject().asInstanceOf[Broadcast[Map[String, List[String]]]]}}def main(args: Array[String]): Unit = {val logger = LoggerFactory.getLogger(this.getClass)val conf = new SparkConf().setAppName("wirelessLogAnalysis")val ssc = new StreamingContext(conf, Seconds(10))val kafkaConfig: Map[String, String] = Map("metadata.broker.list" -> WirelessConfig.getConf.get.getString("wireless.metadata.broker.list"),"group.id" -> WirelessConfig.getConf.get.getString("wireless.group.id"),"zookeeper.connect" -> WirelessConfig.getConf.get.getString("wireless.zookeeper.connect"),"auto.offset.reset" -> WirelessConfig.getConf.get.getString("wireless.auto.offset.reset"))val androidvvTopic = WirelessConfig.getConf.get.getString("wireless.topic1")val iphonevvToplic = WirelessConfig.getConf.get.getString("wireless.topic2")val kafkaDStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc,kafkaConfig,Set(androidvvTopic, iphonevvToplic))//原始日志流打印kafkaDStream.print()val jsonDstream = kafkaDStream.map(x =>//解析日志流WirelessFormator.format(x._2))//解密的日志流打印jsonDstream.print()jsonDstream.foreachRDD {rdd => {// driver端运行,涉及操作:广播变量的初始化和更新// 可以自定义更新时间if ((DateUtils.getNowTime().split(" ")(1) >= "08:00:00") && (DateUtils.getNowTime().split(" ")(1) <= "10:10:00")) {BroadcastWrapper.update(rdd.sparkContext, true)println("广播变量更新成功: " + DateUtils.getNowTime())}//worker端运行,涉及操作:Dstream数据的处理和Redis更新rdd.foreachPartition {partitionRecords =>//1.获取redis连接,保证每个partition建立一次连接,避免每个记录建立/关闭连接的性能消耗partitionRecords.foreach(record => {//2.处理日志流val uid = record._1val aid_type = record._2 + "_" + record._3if (cf.value.keySet.contains(aid_type)) {(uid, cf.value.get(aid_type))println((uid, cf.value.get(aid_type)))}else(uid, "-1")}//3.redis更新数据)//4.关闭redis连接}}}ssc.start()ssc.awaitTermination()}
}

说明:以上是无线推荐项目中部分代码,其中离线全局数据存储在mysql中,MysqlConnectionPool是mysql连接池定义类,WirelessFormator是日志解密的定义类

这篇关于spark streaming中的广播变量应用的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1067403

相关文章

一文全面详解Python变量作用域

《一文全面详解Python变量作用域》变量作用域是Python中非常重要的概念,它决定了在哪里可以访问变量,下面我将用通俗易懂的方式,结合代码示例和图表,带你全面了解Python变量作用域,需要的朋友... 目录一、什么是变量作用域?二、python的四种作用域作用域查找顺序图示三、各作用域详解1. 局部作

Python使用Tkinter打造一个完整的桌面应用

《Python使用Tkinter打造一个完整的桌面应用》在Python生态中,Tkinter就像一把瑞士军刀,它没有花哨的特效,却能快速搭建出实用的图形界面,作为Python自带的标准库,无需安装即可... 目录一、界面搭建:像搭积木一样组合控件二、菜单系统:给应用装上“控制中枢”三、事件驱动:让界面“活”

如何确定哪些软件是Mac系统自带的? Mac系统内置应用查看技巧

《如何确定哪些软件是Mac系统自带的?Mac系统内置应用查看技巧》如何确定哪些软件是Mac系统自带的?mac系统中有很多自带的应用,想要看看哪些是系统自带,该怎么查看呢?下面我们就来看看Mac系统内... 在MAC电脑上,可以使用以下方法来确定哪些软件是系统自带的:1.应用程序文件夹打开应用程序文件夹

Python Flask 库及应用场景

《PythonFlask库及应用场景》Flask是Python生态中​轻量级且高度灵活的Web开发框架,基于WerkzeugWSGI工具库和Jinja2模板引擎构建,下面给大家介绍PythonFl... 目录一、Flask 库简介二、核心组件与架构三、常用函数与核心操作 ​1. 基础应用搭建​2. 路由与参

Spring Boot中的YML配置列表及应用小结

《SpringBoot中的YML配置列表及应用小结》在SpringBoot中使用YAML进行列表的配置不仅简洁明了,还能提高代码的可读性和可维护性,:本文主要介绍SpringBoot中的YML配... 目录YAML列表的基础语法在Spring Boot中的应用从YAML读取列表列表中的复杂对象其他注意事项总

电脑系统Hosts文件原理和应用分享

《电脑系统Hosts文件原理和应用分享》Hosts是一个没有扩展名的系统文件,当用户在浏览器中输入一个需要登录的网址时,系统会首先自动从Hosts文件中寻找对应的IP地址,一旦找到,系统会立即打开对应... Hosts是一个没有扩展名的系统文件,可以用记事本等工具打开,其作用就是将一些常用的网址域名与其对应

CSS 样式表的四种应用方式及css注释的应用小结

《CSS样式表的四种应用方式及css注释的应用小结》:本文主要介绍了CSS样式表的四种应用方式及css注释的应用小结,本文通过实例代码给大家介绍的非常详细,详细内容请阅读本文,希望能对你有所帮助... 一、外部 css(推荐方式)定义:将 CSS 代码保存为独立的 .css 文件,通过 <link> 标签

Python使用Reflex构建现代Web应用的完全指南

《Python使用Reflex构建现代Web应用的完全指南》这篇文章为大家深入介绍了Reflex框架的设计理念,技术特性,项目结构,核心API,实际开发流程以及与其他框架的对比和部署建议,感兴趣的小伙... 目录什么是 ReFlex?为什么选择 Reflex?安装与环境配置构建你的第一个应用核心概念解析组件

C#通过进程调用外部应用的实现示例

《C#通过进程调用外部应用的实现示例》本文主要介绍了C#通过进程调用外部应用的实现示例,以WINFORM应用程序为例,在C#应用程序中调用PYTHON程序,具有一定的参考价值,感兴趣的可以了解一下... 目录窗口程序类进程信息类 系统设置类 以WINFORM应用程序为例,在C#应用程序中调用python程序

Java应用如何防止恶意文件上传

《Java应用如何防止恶意文件上传》恶意文件上传可能导致服务器被入侵,数据泄露甚至服务瘫痪,因此我们必须采取全面且有效的防范措施来保护Java应用的安全,下面我们就来看看具体的实现方法吧... 目录恶意文件上传的潜在风险常见的恶意文件上传手段防范恶意文件上传的关键策略严格验证文件类型检查文件内容控制文件存储