kafka 实现worldcount

2024-08-22 22:32
文章标签 实现 kafka worldcount

本文主要是介绍kafka 实现worldcount,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章地址:http://www.haha174.top/article/details/259309
官网地址:http://kafka.apache.org/10/documentation/streams/quickstart
Kafka Streams是一个用来构建流处理程序的库,特别是其输入是一个Kafka topic,输出是另一个Kafka topic的程序(或者是调用外部服务,或者是更新数据库,或者其它)。它使得你以一种分布式以及容错的方式来做这件事情。

例子如下:

 public static void main(String[] args) throws Exception {Properties props = new Properties();props.put("application.id", "streams-wordcount");props.put("bootstrap.servers", "localhost:9092");props.put("cache.max.bytes.buffering", 0);props.put("default.key.serde", Serdes.String().getClass().getName());props.put("default.value.serde", Serdes.String().getClass().getName());props.put("auto.offset.reset", "earliest");StreamsBuilder builder = new StreamsBuilder();KStream<String, String> source = builder.stream("streams-plaintext-input");KTable<String, Long> counts = source.flatMapValues(new ValueMapper<String, Iterable<String>>() {public Iterable<String> apply(String value) {return Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" "));}}).groupBy(new KeyValueMapper<String, String, String>() {public String apply(String key, String value) {return value;}}).count();counts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));final KafkaStreams streams = new KafkaStreams(builder.build(), props);final CountDownLatch latch = new CountDownLatch(1);Runtime.getRuntime().addShutdownHook(new Thread("streams-wordcount-shutdown-hook") {public void run() {streams.close();latch.countDown();}});try {streams.start();latch.await();} catch (Throwable var8) {System.exit(1);}System.exit(0);}

使用它只需要起一个streams-plaintext-input topic 的 生产者
启动一个 streams-wordcount-output的消费者
那到底有什么好处呢
简化点1: 不依赖框架的流处理
Kafka Streams使得构建流处理服务更简单的第一点就是:它不依赖于集群和框架,它只是一个库(而且是挺小的一个库)。你只需要Kafka和你自己的代码。Kafka会协调你的程序代码,使得它们可以处理故障,在不同程序实例间分发负载,在新的程序实例加入时重新对负载进行平衡。

我下面会讲一下为什么我认为这是很重要的,以及我们之前的一点经历,来帮助理解这个模型的重要性。

治愈MapReduce的宿醉
我前边讲到我们构造Apache Samza的经历,以及人们实际想要的(简单的流服务)和我们构建的东西(实时的MapReduce)之间的距离。我认为这种概念的错位是普遍的,毕竟流处理做的很多事情是从批处理世界中接管一些能力,用于低延迟的领域。同样的MapReduce遗产影响了其它主流的流处理平台(Storm, Spark等),就像它们对Samza的影响一样。

在LinkedIn在很多生产数据的处理服务是属于低延迟领域的:email, 用户生成的内容,新消息反馈等。其它的很多公司也应该有类似的异步服务,比如零售业需要给商品排序、重新定价,然后卖出,对于金融公司,实时数据更是核心。大部分这些业务,都是异步的,对于渲染页面或者更新移动app的屏幕就不会有这样的问题(这些是同步的)。

那么为什么在Storm, Samza, Spark Streaming这样的流处理框架之上构建这样的核心应用这么繁琐呢?

一个批处理框架,像是MapReduce或者Spark需要解决一些困难的问题:

它必须在一个机器池之上管理很多短期任务,并且在集群中有效地调度资源分配
为了做到这点,它必须动态地把你的代码、配置、依赖的库以及其它所有需要的东西,打包并且物理地布署到将要执行它的机器上。
它必须管理进程,并且实现共享集群的不同任务之间的隔离。
不幸的是,为了解决这些问题,框架就得变得很有侵入性。为了做到容错和扩展,框架得控制你的程序如何布署、配置、监控和打包。

那么,Kafka Streams有什么不同呢?

Kafka Streams对它想要解决的问题要更关注得多。它做了以下的事情:

当你的程序的新的实例加入,或已经有实例退出时,它会重新平衡要处理的负载
维护表的本地状态
从错误中恢复
它使用了Kafka为普通的consumer所提供的同样的组管理协议(group manager protocol)来实现。Kafka Streams可以有一些本地的状态,存储在磁盘上,但是它只是一个缓存。如果这个缓存丢失了,或者这个程序实例被转移到了别的地方,这个本地状态是可以被重建的。你可以把Kafka Streams这个库用在你的程序里,然后启动任意数量的你想要程序实例,Kafka将会把它们进行分区,并且在这些实例间进行负载的平衡。

这对于实现像滚动重启(rolling restart)或者无宕机时间的扩展(no-downtime expansion)这样简单的事情是很重要的。在现代的软件工程中,我们把这样的功能看做是应该的,但是很多流处理框架却做不到这点。

Dockers, Mesos, 以及Kurbernetes, 我的天哪!
从流处理框架中分离出打包和布署的原因是,打包和布署这个领域本身就正在进行自身的复兴。Kafka Streams可以使用经典的老实巴交维工具,像是Puppet, Chef, Salt来布署,把可以从命令行来启动。如果你年轻,时髦,你也可以把你的程序做成Dock镜像;或者你不是这样的人,那么你可以用WAR文件。

但是,对于寻找更加有灵活的管理方式的人,有很多框架的目标就是让程序更加灵活。这里列了一部分:

Apache Mesos with a framework like Marathon
Kubernetes
YARN with something like Slider
Swarm from Docker
Various hosted container services such as ECS from Amazon
Cloud Foundry
这个生态系统就和流处理生态一样专注。

的确,Mesos和Kubernets想要解决的问题是把进程分布到很多机器上,这也是当你布署一个Storm任务到Storm集群时,Storm尝试解决的问题。关键在于,这个问题最终被发现是挺难的,而这些通用的框架,至少是其中优秀的那些,会比其它的做得好得多-它们具有执行像在保持并行度的情况下重启、对主机的粘性(sticky host affinity)、真正的基于cgroup的隔离、用docker打包、花哨的UI等等功能。

你可以在这些框架里的任何一种里使用Kafka Streams,就像你会对其它程序做的一样,这是用来实现动态和有弹性的进程管理的一种简单的方式。比如,如果你有Mesos和Marathon,你可以使用Marathon UI直接启动你的Kafka Streams程序,然后动态地扩展它,而不会有服务中断, Meos会管理好进程,Kafka会管理和负载匀衡以及维护你的任务进程的状态。

使用一种这些的框架的开销是和使用Storm这样的框架的集群管理部分是一样的,但是优点是所有这些框架都是可选的(当然,Kafka Streams没有了它们也可以很好的工作)。

简化点2:Streams Meet Tables
Kafka Strems用于简化处理程序的另一个关键方式是把“表”和”流“这两个概念紧密地结合在一起。我们在之前的”turning the database inside out”中简化这个想法。那句话抓住了作为结果的系统是如何重铸程序和它的数据之彰的关系以及它是怎么应于数据变化,这样的要点。为了理想这些,我会回顾一下,解释我对于”table”和”stream”的定义,以及把二者结合在一起如何能够简化常见的异步程序。

传统的数据库都是关于在表格中存储状态的。当需要对事件流进行反应时,传统数据库做得并不好。什么是事件呢?事件只是一些已经发生了的事-可以是一个点击、一次出售、源自某个传感器的一个动态,或者抽象成任何这个世界上发生的事情。

像Storm一样的流处理程序,是从这个等式的另一端出发的。它们被设计用于处理事件流,但是基于流来产生状态却是后面才加进来的。

我认为异步程序的基本问题是把代表当前世界状态的tables与代表正在发生事件的event streams结合在一起。框架需要处理好如何表示它们,以及如何在它们之间进行转化。

为什么说这些概念是相关的呢?我们举一个零售商的简单例子。对于零售商而言,核心的事件流是卖出商品、订购新商品以及接收订购的商品。“库存表”是一个基于当前的存货量,通过售出和接收流进行加减的“表”。对于零售商而言两个关键的流处理动作是当库存开始降低时订购商品,以及根据供需关系调整商品价格。

这篇关于kafka 实现worldcount的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1097577

相关文章

C#实现查找并删除PDF中的空白页面

《C#实现查找并删除PDF中的空白页面》PDF文件中的空白页并不少见,因为它们有可能是作者有意留下的,也有可能是在处理文档时不小心添加的,下面我们来看看如何使用Spire.PDFfor.NET通过C#... 目录安装 Spire.PDF for .NETC# 查找并删除 PDF 文档中的空白页C# 添加与删

Java实现MinIO文件上传的加解密操作

《Java实现MinIO文件上传的加解密操作》在云存储场景中,数据安全是核心需求之一,MinIO作为高性能对象存储服务,支持通过客户端加密(CSE)在数据上传前完成加密,下面我们来看看如何通过Java... 目录一、背景与需求二、技术选型与原理1. 加密方案对比2. 核心算法选择三、完整代码实现1. 加密上

Java使用WebView实现桌面程序的技术指南

《Java使用WebView实现桌面程序的技术指南》在现代软件开发中,许多应用需要在桌面程序中嵌入Web页面,例如,你可能需要在Java桌面应用中嵌入一部分Web前端,或者加载一个HTML5界面以增强... 目录1、简述2、WebView 特点3、搭建 WebView 示例3.1 添加 JavaFX 依赖3

使用Python和SQLAlchemy实现高效的邮件发送系统

《使用Python和SQLAlchemy实现高效的邮件发送系统》在现代Web应用中,邮件通知是不可或缺的功能之一,无论是订单确认、文件处理结果通知,还是系统告警,邮件都是最常用的通信方式之一,本文将详... 目录引言1. 需求分析2. 数据库设计2.1 User 表(存储用户信息)2.2 CustomerO

C#实现高性能Excel百万数据导出优化实战指南

《C#实现高性能Excel百万数据导出优化实战指南》在日常工作中,Excel数据导出是一个常见的需求,然而,当数据量较大时,性能和内存问题往往会成为限制导出效率的瓶颈,下面我们看看C#如何结合EPPl... 目录一、技术方案核心对比二、各方案选型建议三、性能对比数据四、核心代码实现1. MiniExcel

在React聊天应用中实现图片上传功能

《在React聊天应用中实现图片上传功能》在现代聊天应用中,除了文字和表情,图片分享也是一个重要的功能,本文将详细介绍如何在基于React的聊天应用中实现图片上传和预览功能,感兴趣的小伙伴跟着小编一起... 目录技术栈实现步骤1. 消息组件改造2. 图片预览组件3. 聊天输入组件改造功能特点使用说明注意事项

VSCode中配置node.js的实现示例

《VSCode中配置node.js的实现示例》本文主要介绍了VSCode中配置node.js的实现示例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着... 目录一.node.js下载安装教程二.配置npm三.配置环境变量四.VSCode配置五.心得一.no

debian12安装docker的实现步骤

《debian12安装docker的实现步骤》本文主要介绍了debian12安装docker的实现步骤,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着... 目录步骤 1:更新你的系统步骤 2:安装依赖项步骤 3:添加 docker 的官方 GPG 密钥步骤

基于Redis实现附近商铺查询功能

《基于Redis实现附近商铺查询功能》:本文主要介绍基于Redis实现-附近商铺查询功能,这个功能将使用到Redis中的GEO这种数据结构来实现,需要的朋友可以参考下... 目录基于Redis实现-附近查询1.GEO相关命令2.使用GEO来实现以下功能3.使用Java实现简China编程单的附近商铺查询4.Red

使用Python实现实时金价监控并自动提醒功能

《使用Python实现实时金价监控并自动提醒功能》在日常投资中,很多朋友喜欢在一些平台买点黄金,低买高卖赚点小差价,但黄金价格实时波动频繁,总是盯着手机太累了,于是我用Python写了一个实时金价监控... 目录工具能干啥?手把手教你用1、先装好这些"食材"2、代码实现讲解1. 用户输入参数2. 设置无头浏