kafka+spark streaming例子入门

2024-06-15 22:38

本文主要是介绍kafka+spark streaming例子入门,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

  1. 启动Kafka Server:
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server/properties
  1. 创建topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
  1. 作为producer
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

KafkaWordCount

object KafkaWordCount{def main(args:Array[String]){if(args.length<4){System.err.println("Usage:KafkaWordCount <zkQuorum>                         <group><topics><numThreads>")System.exit(1)} StreamingExamples.setStreamingLogLevels()val Array(zkQuorum,group,topics,numThreads)=argsval sparkconf=new SparkConf().setAppName("KafkaWordCount")val ssc=new StreamingContext(SparkConf,Seconds(2))ssc.checkpoint("checkpoint")val topicMap=topics.split(",").map((_,numThreads,toInt)).toMapval lines=KafkaUtils.createStream(ssc,zkQuorum,group,topicMap).map(_._2)val words=lines.flagMap(_.split(" "))val wordCounts=words.map(x=>(x,1L)).reduceByKeyAndWindow(_+_,_-_,Minutes(20),Seconds(2),2)wordCounts.print()ssc.start()ssc.awaitTermination()}}

运行KafkaWordCountProducer

bin/run-example org.apache.spark.examples.streaming.KafkaWordCountProducer localhost:9092 test 3 5
//localhost:9092是producer的地址及其端口  test是Topic,3表示每秒发多少信息,5表示每条消息中有多少个单词

运行kafkawordcount

bin/run-example org.apache.spark.examples.streaming.KafkaWordCount localhost:2181 test-consumer-group test 1
//localhost:2181表示zookeeper的监听地址;// test-consumer-group表示consumer-group的名称,必须//和$KAFKA_HOME/config/consumer.properties中的group.id配置内
//test表示topic,1表示线程数

这篇关于kafka+spark streaming例子入门的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1064779

相关文章

Python中OpenCV与Matplotlib的图像操作入门指南

《Python中OpenCV与Matplotlib的图像操作入门指南》:本文主要介绍Python中OpenCV与Matplotlib的图像操作指南,本文通过实例代码给大家介绍的非常详细,对大家的学... 目录一、环境准备二、图像的基本操作1. 图像读取、显示与保存 使用OpenCV操作2. 像素级操作3.

SpringBoot实现Kafka动态反序列化的完整代码

《SpringBoot实现Kafka动态反序列化的完整代码》在分布式系统中,Kafka作为高吞吐量的消息队列,常常需要处理来自不同主题(Topic)的异构数据,不同的业务场景可能要求对同一消费者组内的... 目录引言一、问题背景1.1 动态反序列化的需求1.2 常见问题二、动态反序列化的核心方案2.1 ht

POI从入门到实战轻松完成EasyExcel使用及Excel导入导出功能

《POI从入门到实战轻松完成EasyExcel使用及Excel导入导出功能》ApachePOI是一个流行的Java库,用于处理MicrosoftOffice格式文件,提供丰富API来创建、读取和修改O... 目录前言:Apache POIEasyPoiEasyExcel一、EasyExcel1.1、核心特性

Python中模块graphviz使用入门

《Python中模块graphviz使用入门》graphviz是一个用于创建和操作图形的Python库,本文主要介绍了Python中模块graphviz使用入门,具有一定的参考价值,感兴趣的可以了解一... 目录1.安装2. 基本用法2.1 输出图像格式2.2 图像style设置2.3 属性2.4 子图和聚

深入理解Apache Kafka(分布式流处理平台)

《深入理解ApacheKafka(分布式流处理平台)》ApacheKafka作为现代分布式系统中的核心中间件,为构建高吞吐量、低延迟的数据管道提供了强大支持,本文将深入探讨Kafka的核心概念、架构... 目录引言一、Apache Kafka概述1.1 什么是Kafka?1.2 Kafka的核心概念二、Ka

Spring Boot + MyBatis Plus 高效开发实战从入门到进阶优化(推荐)

《SpringBoot+MyBatisPlus高效开发实战从入门到进阶优化(推荐)》本文将详细介绍SpringBoot+MyBatisPlus的完整开发流程,并深入剖析分页查询、批量操作、动... 目录Spring Boot + MyBATis Plus 高效开发实战:从入门到进阶优化1. MyBatis

Python FastAPI入门安装使用

《PythonFastAPI入门安装使用》FastAPI是一个现代、快速的PythonWeb框架,用于构建API,它基于Python3.6+的类型提示特性,使得代码更加简洁且易于绶护,这篇文章主要介... 目录第一节:FastAPI入门一、FastAPI框架介绍什么是ASGI服务(WSGI)二、FastAP

一文详解kafka开启kerberos认证的完整步骤

《一文详解kafka开启kerberos认证的完整步骤》这篇文章主要为大家详细介绍了kafka开启kerberos认证的完整步骤,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录一、kerberos安装部署二、准备机器三、Kerberos Server 安装1、配置krb5.con

Debezium 与 Apache Kafka 的集成方式步骤详解

《Debezium与ApacheKafka的集成方式步骤详解》本文详细介绍了如何将Debezium与ApacheKafka集成,包括集成概述、步骤、注意事项等,通过KafkaConnect,D... 目录一、集成概述二、集成步骤1. 准备 Kafka 环境2. 配置 Kafka Connect3. 安装 D

Java中Springboot集成Kafka实现消息发送和接收功能

《Java中Springboot集成Kafka实现消息发送和接收功能》Kafka是一个高吞吐量的分布式发布-订阅消息系统,主要用于处理大规模数据流,它由生产者、消费者、主题、分区和代理等组件构成,Ka... 目录一、Kafka 简介二、Kafka 功能三、POM依赖四、配置文件五、生产者六、消费者一、Kaf