1.8.7 大数据-Spark-SparkStreaming实时流处理(保存到Mysql)

2024-03-16 13:08

本文主要是介绍1.8.7 大数据-Spark-SparkStreaming实时流处理(保存到Mysql),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

演练环境搭建

安装nc 作为输出流

[kfk@bigdata-pro03 softwares]$ sudo rpm -ivh nc-1.84-22.el6.x86_64.rpm Preparing...                                                            (100%########################################### [100%]1:nc                                                                 ( 19%########################################### [100%]
[kfk@bigdata-pro03 softwares]$ which nc
/usr/bin/nc
$ nc -lk 9999

官网演示DEMO

$ ./bin/run-example streaming.NetworkWordCount localhost 9999

spark-shell演示

[kfk@bigdata-pro03 spark-2.2.0-bin]$ bin/spark-shell
20/06/24 00:39:23 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
20/06/24 00:39:23 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
Spark context Web UI available at http://192.168.0.153:4041
Spark context available as 'sc' (master = local[*], app id = local-1592973563887).
Spark session available as 'spark'.
Welcome to____              __/ __/__  ___ _____/ /___\ \/ _ \/ _ `/ __/  '_//___/ .__/\_,_/_/ /_/\_\   version 2.2.0/_/Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_11)
Type in expressions to have them evaluated.
Type :help for more information.scala> import org.apache.spark._
import org.apache.spark._scala> import org.apache.spark.streaming._
import org.apache.spark.streaming._scala> val ssc = new StreamingContext(sc, Seconds(5))
ssc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@1002b06dscala> val lines = ssc.socketTextStream("localhost", 9999)
lines: org.apache.spark.streaming.dstream.ReceiverInputDStream[String] = org.apache.spark.streaming.dstream.SocketInputDStream@514f2020scala> val words = lines.flatMap(_.split(" "))
words: org.apache.spark.streaming.dstream.DStream[String] = org.apache.spark.streaming.dstream.FlatMappedDStream@4f5df012scala> val pairs = words.map(word => (word, 1))
pairs: org.apache.spark.streaming.dstream.DStream[(String, Int)] = org.apache.spark.streaming.dstream.MappedDStream@39f3285dscala> val wordCounts = pairs.reduceByKey(_ + _)
wordCounts: org.apache.spark.streaming.dstream.DStream[(String, Int)] = org.apache.spark.streaming.dstream.ShuffledDStream@3299e315scala> wordCounts.print()scala> ssc.start()  
//一直运行,除非人为干预再停止 ssc.awaitTermination()

在nc下输入单词 在shell客户端就可以读到了

在IDEA中代码

包含读到MySQL库 、注释部分读到HDFS

package com.spark.streamingimport java.sql.DriverManagerimport org.apache.spark._
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.{Seconds, StreamingContext}object TestStreaming {def main(args: Array[String]): Unit = {val spark = SparkSession.builder.master("local[2]").appName("straming").getOrCreate()val sc = spark.sparkContext;sc.setLogLevel("WARN");val ssc = new StreamingContext(sc, Seconds(5))val lines = ssc.socketTextStream("bigdata-pro03.kfk.com", 9999)val words = lines.flatMap(_.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)//words.foreachRDD(wd =>wd.saveAsTextFile("hdfs://bigdata-pro01.kfk.com/user/kfk/stream"));words.foreachRDD(rdd=>rdd.foreachPartition(line =>{Class.forName("com.mysql.jdbc.Driver")val conn = DriverManager.getConnection("jdbc:mysql://bigdata-pro01.kfk.com/test","root","123456")try{for (row <- line){val sql = "insert into webCount(titleName,count) values ('"+row._1+"',"+row._2+" )";conn.prepareStatement(sql).executeUpdate()}}finally {conn.close()}}))words.print()ssc.start()ssc.awaitTermination()}
}

这篇关于1.8.7 大数据-Spark-SparkStreaming实时流处理(保存到Mysql)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/815592

相关文章

SpringBoot分段处理List集合多线程批量插入数据方式

《SpringBoot分段处理List集合多线程批量插入数据方式》文章介绍如何处理大数据量List批量插入数据库的优化方案:通过拆分List并分配独立线程处理,结合Spring线程池与异步方法提升效率... 目录项目场景解决方案1.实体类2.Mapper3.spring容器注入线程池bejsan对象4.创建

PHP轻松处理千万行数据的方法详解

《PHP轻松处理千万行数据的方法详解》说到处理大数据集,PHP通常不是第一个想到的语言,但如果你曾经需要处理数百万行数据而不让服务器崩溃或内存耗尽,你就会知道PHP用对了工具有多强大,下面小编就... 目录问题的本质php 中的数据流处理:为什么必不可少生成器:内存高效的迭代方式流量控制:避免系统过载一次性

C#实现千万数据秒级导入的代码

《C#实现千万数据秒级导入的代码》在实际开发中excel导入很常见,现代社会中很容易遇到大数据处理业务,所以本文我就给大家分享一下千万数据秒级导入怎么实现,文中有详细的代码示例供大家参考,需要的朋友可... 目录前言一、数据存储二、处理逻辑优化前代码处理逻辑优化后的代码总结前言在实际开发中excel导入很

MySQL的JDBC编程详解

《MySQL的JDBC编程详解》:本文主要介绍MySQL的JDBC编程,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录前言一、前置知识1. 引入依赖2. 认识 url二、JDBC 操作流程1. JDBC 的写操作2. JDBC 的读操作总结前言本文介绍了mysq

java.sql.SQLTransientConnectionException连接超时异常原因及解决方案

《java.sql.SQLTransientConnectionException连接超时异常原因及解决方案》:本文主要介绍java.sql.SQLTransientConnectionExcep... 目录一、引言二、异常信息分析三、可能的原因3.1 连接池配置不合理3.2 数据库负载过高3.3 连接泄漏

Linux下MySQL数据库定时备份脚本与Crontab配置教学

《Linux下MySQL数据库定时备份脚本与Crontab配置教学》在生产环境中,数据库是核心资产之一,定期备份数据库可以有效防止意外数据丢失,本文将分享一份MySQL定时备份脚本,并讲解如何通过cr... 目录备份脚本详解脚本功能说明授权与可执行权限使用 Crontab 定时执行编辑 Crontab添加定

Python实现批量CSV转Excel的高性能处理方案

《Python实现批量CSV转Excel的高性能处理方案》在日常办公中,我们经常需要将CSV格式的数据转换为Excel文件,本文将介绍一个基于Python的高性能解决方案,感兴趣的小伙伴可以跟随小编一... 目录一、场景需求二、技术方案三、核心代码四、批量处理方案五、性能优化六、使用示例完整代码七、小结一、

Python中 try / except / else / finally 异常处理方法详解

《Python中try/except/else/finally异常处理方法详解》:本文主要介绍Python中try/except/else/finally异常处理方法的相关资料,涵... 目录1. 基本结构2. 各部分的作用tryexceptelsefinally3. 执行流程总结4. 常见用法(1)多个e

PHP应用中处理限流和API节流的最佳实践

《PHP应用中处理限流和API节流的最佳实践》限流和API节流对于确保Web应用程序的可靠性、安全性和可扩展性至关重要,本文将详细介绍PHP应用中处理限流和API节流的最佳实践,下面就来和小编一起学习... 目录限流的重要性在 php 中实施限流的最佳实践使用集中式存储进行状态管理(如 Redis)采用滑动

MyBatis-plus处理存储json数据过程

《MyBatis-plus处理存储json数据过程》文章介绍MyBatis-Plus3.4.21处理对象与集合的差异:对象可用内置Handler配合autoResultMap,集合需自定义处理器继承F... 目录1、如果是对象2、如果需要转换的是List集合总结对象和集合分两种情况处理,目前我用的MP的版本