1.8.8 大数据-SparkStreaming-Kafka集成

2024-03-16 13:08

本文主要是介绍1.8.8 大数据-SparkStreaming-Kafka集成,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

  • IDEA客户端MAVEN POM中引入
  • Linux JAR包放入 jars目录
  • 或者执行jar包时 引入jar包
  • 启动kafka传输消息
bin/kafka-server-start.sh -daemon config/server.properties
bin/kafka-console-producer.sh --broker-list bigdata-pro01.kfk.com:9092 --topic weblogs

DirectKafka8WordCount

  <properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><hadoop.version>2.5.0</hadoop.version><scala.binary.version>2.11</scala.binary.version><spark.version>2.2.0</spark.version></properties><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-8_${scala.binary.version}</artifactId><version>${spark.version}</version></dependency>
package com.spark.kfkimport kafka.serializer.StringDecoder
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._object DirectKafka8WordCount {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().master("local[2]").appName("straming").getOrCreate()val sc = spark.sparkContext;sc.setLogLevel("WARN");val ssc = new StreamingContext(sc, Seconds(5))// Create direct kafka stream with brokers and topicsval topicsSet = Set("weblogs")val kafkaParams = Map[String, String]("metadata.broker.list" -> "bigdata-pro01.kfk.com:9092")val KafkaStream = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topicsSet)val lines = KafkaStream.map(x=>x._2)val words = lines.flatMap(_.split(" "))val wordCounts = words.map(x => (x,1L)).reduceByKey(_ + _)wordCounts.print()// Start the computationssc.start()ssc.awaitTermination()}
}

DirectKafka10WordCount

<dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_${scala.binary.version}</artifactId><version>${spark.version}</version>
</dependency>
package com.spark.kfkimport org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribeobject DirectKafka10WordCount {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().master("local[2]").appName("straming").getOrCreate()val sc = spark.sparkContext;sc.setLogLevel("WARN");val ssc = new StreamingContext(sc, Seconds(5))// Create direct kafka stream with brokers and topics//val topicsSet = Set("weblogs")val kafkaParams = Map[String, Object]("bootstrap.servers" -> "bigdata-pro01.kfk.com:9092","key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"group.id" -> "use_a_separate_group_id_for_each_stream","auto.offset.reset" -> "latest","enable.auto.commit" -> (false: java.lang.Boolean))val topicsArray = Array("weblogs")val stream = KafkaUtils.createDirectStream[String, String](ssc,PreferConsistent,Subscribe[String, String](topicsArray, kafkaParams))val lines = stream.map(x => x.value)val words = lines.flatMap(_.split(" "))val wordCounts = words.map(x => (x,1L)).reduceByKey(_ + _)wordCounts.print()// Start the computationssc.start()ssc.awaitTermination()}
}

这篇关于1.8.8 大数据-SparkStreaming-Kafka集成的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!


原文地址:
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.chinasem.cn/article/815593

相关文章

OpenCV在Java中的完整集成指南分享

《OpenCV在Java中的完整集成指南分享》本文详解了在Java中集成OpenCV的方法,涵盖jar包导入、dll配置、JNI路径设置及跨平台兼容性处理,提供了图像处理、特征检测、实时视频分析等应用... 目录1. OpenCV简介与应用领域1.1 OpenCV的诞生与发展1.2 OpenCV的应用领域2

C#监听txt文档获取新数据方式

《C#监听txt文档获取新数据方式》文章介绍通过监听txt文件获取最新数据,并实现开机自启动、禁用窗口关闭按钮、阻止Ctrl+C中断及防止程序退出等功能,代码整合于主函数中,供参考学习... 目录前言一、监听txt文档增加数据二、其他功能1. 设置开机自启动2. 禁止控制台窗口关闭按钮3. 阻止Ctrl +

java如何实现高并发场景下三级缓存的数据一致性

《java如何实现高并发场景下三级缓存的数据一致性》这篇文章主要为大家详细介绍了java如何实现高并发场景下三级缓存的数据一致性,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 下面代码是一个使用Java和Redisson实现的三级缓存服务,主要功能包括:1.缓存结构:本地缓存:使

在MySQL中实现冷热数据分离的方法及使用场景底层原理解析

《在MySQL中实现冷热数据分离的方法及使用场景底层原理解析》MySQL冷热数据分离通过分表/分区策略、数据归档和索引优化,将频繁访问的热数据与冷数据分开存储,提升查询效率并降低存储成本,适用于高并发... 目录实现冷热数据分离1. 分表策略2. 使用分区表3. 数据归档与迁移在mysql中实现冷热数据分

C#解析JSON数据全攻略指南

《C#解析JSON数据全攻略指南》这篇文章主要为大家详细介绍了使用C#解析JSON数据全攻略指南,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录一、为什么jsON是C#开发必修课?二、四步搞定网络JSON数据1. 获取数据 - HttpClient最佳实践2. 动态解析 - 快速

SpringBoot集成MyBatis实现SQL拦截器的实战指南

《SpringBoot集成MyBatis实现SQL拦截器的实战指南》这篇文章主要为大家详细介绍了SpringBoot集成MyBatis实现SQL拦截器的相关知识,文中的示例代码讲解详细,有需要的小伙伴... 目录一、为什么需要SQL拦截器?二、MyBATis拦截器基础2.1 核心接口:Interceptor

SpringBoot集成EasyPoi实现Excel模板导出成PDF文件

《SpringBoot集成EasyPoi实现Excel模板导出成PDF文件》在日常工作中,我们经常需要将数据导出成Excel表格或PDF文件,本文将介绍如何在SpringBoot项目中集成EasyPo... 目录前言摘要简介源代码解析应用场景案例优缺点分析类代码方法介绍测试用例小结前言在日常工作中,我们经

MyBatis-Plus通用中等、大量数据分批查询和处理方法

《MyBatis-Plus通用中等、大量数据分批查询和处理方法》文章介绍MyBatis-Plus分页查询处理,通过函数式接口与Lambda表达式实现通用逻辑,方法抽象但功能强大,建议扩展分批处理及流式... 目录函数式接口获取分页数据接口数据处理接口通用逻辑工具类使用方法简单查询自定义查询方法总结函数式接口

Spring Boot集成Druid实现数据源管理与监控的详细步骤

《SpringBoot集成Druid实现数据源管理与监控的详细步骤》本文介绍如何在SpringBoot项目中集成Druid数据库连接池,包括环境搭建、Maven依赖配置、SpringBoot配置文件... 目录1. 引言1.1 环境准备1.2 Druid介绍2. 配置Druid连接池3. 查看Druid监控

SQL中如何添加数据(常见方法及示例)

《SQL中如何添加数据(常见方法及示例)》SQL全称为StructuredQueryLanguage,是一种用于管理关系数据库的标准编程语言,下面给大家介绍SQL中如何添加数据,感兴趣的朋友一起看看吧... 目录在mysql中,有多种方法可以添加数据。以下是一些常见的方法及其示例。1. 使用INSERT I