Storm的ACK机制与编码实例

2024-04-26 02:18
文章标签 实例 编码 机制 ack storm

本文主要是介绍Storm的ACK机制与编码实例,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Storm为了保证每条数据成功被处理,实现至少一次语义,通过Storm的ACK机制可以对spout产生的每一个tuple进行跟踪;

tuple处理成功是指这个Tuple以及这个Tuple产生的所有子Tuple都被成功处理, 由每一个处理bolt通过OutputCollector的方法ack(tuple)来告知storm当前bolt处理成功,最终调用spout的ack方法;

处理失败是指这个Tuple或这个Tuple产生的所有Tuple中的任意一个tuple处理失败或者超时(超时时间由Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS指定), 处理失败bolt调用OutputCollector的方法fail(tuple),来告知storm当前bolt处理失败,最终调用spout的fail方法重新发送失败的tuple,失败时storm不会自动重发失败的tuple,需要我们在spout中重新获取发送失败数据,手动重新发送一次。

Ack原理

Storm中有个特殊的task名叫acker,他们负责跟踪spout发出的每一个Tuple的Tuple树(因为一个tuple通过spout发出了,经过每一个bolt处理后,会生成一个新的tuple发送出去)。当acker(框架自启动的task)发现一个Tuple树已经处理完成了,它会发送一个消息给产生这个Tuple的那个task。对任意大的一个Tuple树,storm只需要恒定的20字节就可以进行跟踪。acker对于每个spout-tuple保存一个ack-val的校验值,它的初始值是0,然后每发射一个Tuple或Ack一个Tuple时,这个Tuple的id就要跟这个校验值异或一下,
并且把得到的值更新为ack-val的新值。那么假设每个发射出去的Tuple都被ack了,那么最后ack-val的值就一定是0。Acker就根据ack-val是否为0来判断是否完全处理,如果为0则认为已完全处理。

例如下图是一个简单的Topology:



开启Act机制

1.spout发射tuple的时候指定messageId
2.spout对发射的tuple进行缓存,否则spout无法获取发送失败的数据进行重发,
(这里到底系统里有没有缓存没有成功处理的tuple,比如接口conf.setMaxSpoutPending()是否只缓存了条数还是原始数据还要去查证一下)
3.spout要重写BaseRichSpout的fail和ack方法,spout根据messageId对于成功处理的tuple从缓存队列中删除,对于失败的tuple选择重发或做其它处理;
4.如果使用BasicBolt,BasicOutputCollector在emit新的tuple时自动与源tuple锚定,execute方法结束时源tuple会被自动ack或fail;
使用RichBolt在emit数据的时需显示指定该数据的源tuple,以保持tracker链路,即collector.emit(oldTuple, newTuple);
并且需要在execute执行成功后调用OutputCollector.ack(tuple), 当失败处理时,执行OutputCollector.fail(tuple);
5.设置acker数大于0,conf.setNumAckers(>0);

关闭Ack机制

1.在Tuple层面去掉可靠性。在发射Tuple的时候不指定MessageID来达到不不跟踪这个Tuple的目的
2.如果对于一个Tuple树里面的某一部分到底成不成功不是很关心,那么可以在Bolt发射这些Tuple的时候不锚定它们。
这样这些Tuple就不在Tuple树里面,也就不会被跟踪了。
3.把Config.TOPOLOGY_ACKERS设置成0。在这种情况下,Storm会在Spout发射一个Tuple之后马上调用Spout的ack方法,
也就是说这个Tuple树不会被跟踪。

例子程序:

  1. public class RandomSentenceSpout extends BaseRichSpout {
  2. private SpoutOutputCollector _collector;
  3. private Random _rand;
  4. private ConcurrentHashMap<UUID, Values> _pending;
  5. public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
  6. _collector = spoutOutputCollector;
  7. _rand = new Random();
  8. _pending = new ConcurrentHashMap<UUID, Values>();
  9. }
  10. public void nextTuple() {
  11. Utils.sleep(1000);
  12. String[] sentences = new String[] {
  13. "I write php",
  14. "I learning java",
  15. "I want to learn swool and tfs"
  16. };
  17. String sentence = sentences[_rand.nextInt(sentences.length)];
  18. Values v = new Values(sentence);
  19. UUID msgId = UUID.randomUUID();
  20. this._pending.put(msgId, v);//spout对发射的tuple进行缓存
  21. _collector.emit(v, msgId);//发射tuple时,添加msgId
  22. }
  23. public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
  24. outputFieldsDeclarer.declare(new Fields("world"));
  25. }
  26. public void ack(Object msgId) {
  27. this._pending.remove(msgId);//对于成功处理的tuple从缓存队列中删除
  28. }
  29. public void fail(Object msgId) {
  30. this._collector.emit(this._pending.get(msgId), msgId);//当消息处理失败了,重新发射,当然也可以做其他的逻辑处理
  31. }
  32. }
  33. public class SplitSentence extends BaseRichBolt {
  34. OutputCollector _collector;
  35. public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
  36. _collector = outputCollector;
  37. }
  38. public void execute(Tuple tuple) {
  39. String sentence = tuple.getString(0);
  40. for (String word : sentence.split(" "))
  41. _collector.emit(tuple, new Values(word));//发射tuple时进行锚定
  42. _collector.ack(tuple);//对处理完的tuple进行确认
  43. }
  44. public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
  45. outputFieldsDeclarer.declare(new Fields("word"));
  46. }
  47. }

场景分析

1. Bolt挂掉了,一个tuple没有被ack,storm的超时机制在超时之后会把这个tuple标记为失败,从而可以重新处理。
2. Acker挂掉了: 这种情况下由这个acker所跟踪的所有spout tuple都会超时,也就会被重新处理。
3. Spout挂掉了: 在这种情况下给spout发送消息的消息源负责重新发送这些消息。
以上机制保证storm的高度容错性

另外Ack机制还常用于限流作用: 为了避免spout发送数据太快,而bolt处理太慢,常常设置pending数,当spout有等于或超过pending数的tuple没有收到ack或fail响应时,跳过执行nextTuple, 从而限制spout发送数据。
通过conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, pending);设置spout pend数。

本文参考:
http://www.cnblogs.com/intsmaze/p/5918087.html
http://itindex.net/detail/55385-storm-trident-%E5%AD%A6%E4%B9%A0
http://www.tuicool.com/articles/vErmIb


这篇关于Storm的ACK机制与编码实例的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/936443

相关文章

JAVA实现Token自动续期机制的示例代码

《JAVA实现Token自动续期机制的示例代码》本文主要介绍了JAVA实现Token自动续期机制的示例代码,通过动态调整会话生命周期平衡安全性与用户体验,解决固定有效期Token带来的风险与不便,感兴... 目录1. 固定有效期Token的内在局限性2. 自动续期机制:兼顾安全与体验的解决方案3. 总结PS

PyQt6 键盘事件处理的实现及实例代码

《PyQt6键盘事件处理的实现及实例代码》本文主要介绍了PyQt6键盘事件处理的实现示例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起... 目录一、键盘事件处理详解1、核心事件处理器2、事件对象 QKeyEvent3、修饰键处理(1)、修饰键类

Python动态处理文件编码的完整指南

《Python动态处理文件编码的完整指南》在Python文件处理的高级应用中,我们经常会遇到需要动态处理文件编码的场景,本文将深入探讨Python中动态处理文件编码的技术,有需要的小伙伴可以了解下... 目录引言一、理解python的文件编码体系1.1 Python的IO层次结构1.2 编码问题的常见场景二

详解Spring中REQUIRED事务的回滚机制详解

《详解Spring中REQUIRED事务的回滚机制详解》在Spring的事务管理中,REQUIRED是最常用也是默认的事务传播属性,本文就来详细的介绍一下Spring中REQUIRED事务的回滚机制,... 目录1. REQUIRED 的定义2. REQUIRED 下的回滚机制2.1 异常触发回滚2.2 回

Java中字符编码问题的解决方法详解

《Java中字符编码问题的解决方法详解》在日常Java开发中,字符编码问题是一个非常常见却又特别容易踩坑的地方,这篇文章就带你一步一步看清楚字符编码的来龙去脉,并结合可运行的代码,看看如何在Java项... 目录前言背景:为什么会出现编码问题常见场景分析控制台输出乱码文件读写乱码数据库存取乱码解决方案统一使

SpringBoot+RustFS 实现文件切片极速上传的实例代码

《SpringBoot+RustFS实现文件切片极速上传的实例代码》本文介绍利用SpringBoot和RustFS构建高性能文件切片上传系统,实现大文件秒传、断点续传和分片上传等功能,具有一定的参考... 目录一、为什么选择 RustFS + SpringBoot?二、环境准备与部署2.1 安装 RustF

Java实现字节字符转bcd编码

《Java实现字节字符转bcd编码》BCD是一种将十进制数字编码为二进制的表示方式,常用于数字显示和存储,本文将介绍如何在Java中实现字节字符转BCD码的过程,需要的小伙伴可以了解下... 目录前言BCD码是什么Java实现字节转bcd编码方法补充总结前言BCD码(Binary-Coded Decima

基于Redis自动过期的流处理暂停机制

《基于Redis自动过期的流处理暂停机制》基于Redis自动过期的流处理暂停机制是一种高效、可靠且易于实现的解决方案,防止延时过大的数据影响实时处理自动恢复处理,以避免积压的数据影响实时性,下面就来详... 目录核心思路代码实现1. 初始化Redis连接和键前缀2. 接收数据时检查暂停状态3. 检测到延时过

Redis中哨兵机制和集群的区别及说明

《Redis中哨兵机制和集群的区别及说明》Redis哨兵通过主从复制实现高可用,适用于中小规模数据;集群采用分布式分片,支持动态扩展,适合大规模数据,哨兵管理简单但扩展性弱,集群性能更强但架构复杂,根... 目录一、架构设计与节点角色1. 哨兵机制(Sentinel)2. 集群(Cluster)二、数据分片

MySQL的配置文件详解及实例代码

《MySQL的配置文件详解及实例代码》MySQL的配置文件是服务器运行的重要组成部分,用于设置服务器操作的各种参数,下面:本文主要介绍MySQL配置文件的相关资料,文中通过代码介绍的非常详细,需要... 目录前言一、配置文件结构1.[mysqld]2.[client]3.[mysql]4.[mysqldum