三、消息的可靠处理

2024-09-05 08:58
文章标签 处理 消息 可靠

本文主要是介绍三、消息的可靠处理,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1、消息被完全处理的含义


当树创建完毕,并且树中的每一个消息都已经被处理时,Storm认为来自Spout的元组是“完全处理”的。当一个元组的消息树在指定的超时范围内不能被完全处理,则元组被认为是失败的。


2、如果一个消息被完全处理或完全处理失败会发生什么

首先,让我们看看Spout的元组的生命周期。ISpout接口的定义如下:

public interface ISpout extends Serializable {void open(Map conf, TopologyContext context, SpoutOutputCollector colle- ctor);void close();void nextTuple();void ack(Object msgId);void fail(Object msgId);
}

首先,Storm通过调用Spout的nextTuple()方法从Spout请求一个元组。Spout使用open()方法提供的SpoutOutputCollector对象发射一个元组到它的输出流。当发射元组时,Spout会提供一个“消息id”,以便用来识别元组。例如,KestrelSpout从Kestrel消息队列中读取一个消息时,会发射Kestrel提供的“消息id”。下面发射一个消息到SpoutOutputCollector对象:

_collector.emit(new Values("field1", "field2", 3) , msgId);

接下来,元组被发送到Bolt,同时Storm负责跟踪创建的消息树。如果Storm检测到一个元组是完全处理的,Storm将调用原Spout任务的ack()方法,把Spout提供给Storm的消息id作为输入参数。同样,如果元组超时,Storm将调用Spoutfail()方法。注意,一个元组将由Spout任务来确认成功或失败,这个Spout任务是创建此元组的完全相同的Spout任务。如果一个Spout跨集群执行很多任务,元组是不会被创建它的那个任务外的其他任务确认成功或失败的。

注意:bolt是没有ack()和fail()函数的,任何消息出错了,都是由根spout重发


3、Storm如何保证可靠性

在元组树中指定一个链接,此链接被称为锚定(Anchoring。Anchoring在发射一个新的元组的同一时间完成。让我们使用以下Bolt为例进行介绍,这个Bolt将包含一个句子的元组划分为一个包含每个单词的锚定:

public class SplitSentence extends BaseRichBolt {OutputCollector _collector;public void prepare(Map conf, TopologyContext context, OutputCollector 
collector) {_collector = collector;}public void execute(Tuple tuple) {String sentence = tuple.getString(0);for(String word: sentence.split(" ")) {_collector.emit(tuple, new Values(word));//锚定+发射}_collector.ack(tuple);//确认}public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("word"));}
}<

通过指定输入元组作为第一个参数来发射每个单词元组被锚定anchored。因为这个单词元组是被锚定的如果单词元组未能被下游处理,树的根的Spout元组将在稍后重发。相反,如果单词元组的发射操作如下,让我们看看会发生什么:

_collector.emit(new Values(word));

这种方式发射的单词元组导致未被锚定unanchored。如果元组未被下游处理,根元组将不会重发。这取决于你需要的Topology的容错保证,有时候需要相应地发射一个未被锚定的元组。

很多Bolt遵循一个读取一个输入元组,发射元组,在execute方法确认元组的通用模式。这些Bolt具有类别过滤器和简单的功能。Storm有一个接口称为BasicBolt,为你封装这个模式。SplitSentence的例子可以使用BasicBolt写成:

public class SplitSentence extends BaseBasicBolt {public void execute(Tuple tuple, BasicOutputCollector collector) {String sentence = tuple.getString(0);for(String word: sentence.split(" ")) {collector.emit(new Values(word));}}public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("word"));}        
}

这个实现与之前的实现相比,语义上是相同的,但是更简单。元组发射到BasicOutputCollectorare是自动Anchoring到输入元组execute方法完成时,输入元组自动为你确认。


4、Storm如何实现可靠性

       storm系统中有一组叫做acker(可以设置并行度为多个)的特殊任务,它负责跟踪DAG中的每个消息。每当发现一个DAG被完全处理,它就向创建这个根消息的Spout任务发送一个信号,该tuple tree已经被完全处理成功。

      系统使用一种哈希算法根据Spout消息的id来确定由哪个acker跟踪此消息派生出来的tuple tree。因为每个消息都知道与之对应的根消息的id(每当Bolt新生成一个消息,对应的tuple tree中的根消息id就复制到这个消息中),所以它知道应该与哪个acker通信。当这个消息被应答的时候,它就把关于tuple tree变化的信息发送给跟踪这棵树的acker。例如,它会告诉acker:“本消息已经处理完毕,但是我派生出来了一些新的消息,帮忙跟踪一下吧”

       一个Acker任务存储来自Spout元组id到一对值的映射。第一个值是创建Spout元组的任务id,通过这个ID,acker就知道消息处理完成时该通知哪个spout任务。第二个值是一个64位的值,称为ack val。它是所有消息的随机id的异或结果当一个Acker任务看到ack val已经成为0它就知道元组树已经完成了。



这篇关于三、消息的可靠处理的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1138509

相关文章

解决docker目录内存不足扩容处理方案

《解决docker目录内存不足扩容处理方案》文章介绍了Docker存储目录迁移方法:因系统盘空间不足,需将Docker数据迁移到更大磁盘(如/home/docker),通过修改daemon.json配... 目录1、查看服务器所有磁盘的使用情况2、查看docker镜像和容器存储目录的空间大小3、停止dock

5 种使用Python自动化处理PDF的实用方法介绍

《5种使用Python自动化处理PDF的实用方法介绍》自动化处理PDF文件已成为减少重复工作、提升工作效率的重要手段,本文将介绍五种实用方法,从内置工具到专业库,帮助你在Python中实现PDF任务... 目录使用内置库(os、subprocess)调用外部工具使用 PyPDF2 进行基本 PDF 操作使用

分析 Java Stream 的 peek使用实践与副作用处理方案

《分析JavaStream的peek使用实践与副作用处理方案》StreamAPI的peek操作是中间操作,用于观察元素但不终止流,其副作用风险包括线程安全、顺序混乱及性能问题,合理使用场景有限... 目录一、peek 操作的本质:有状态的中间操作二、副作用的定义与风险场景1. 并行流下的线程安全问题2. 顺

Python异常处理之避免try-except滥用的3个核心原则

《Python异常处理之避免try-except滥用的3个核心原则》在Python开发中,异常处理是保证程序健壮性的关键机制,本文结合真实案例与Python核心机制,提炼出避免异常滥用的三大原则,有需... 目录一、精准打击:只捕获可预见的异常类型1.1 通用异常捕获的陷阱1.2 精准捕获的实践方案1.3

Pandas处理缺失数据的方式汇总

《Pandas处理缺失数据的方式汇总》许多教程中的数据与现实世界中的数据有很大不同,现实世界中的数据很少是干净且同质的,本文我们将讨论处理缺失数据的一些常规注意事项,了解Pandas如何表示缺失数据,... 目录缺失数据约定的权衡Pandas 中的缺失数据None 作为哨兵值NaN:缺失的数值数据Panda

C++中处理文本数据char与string的终极对比指南

《C++中处理文本数据char与string的终极对比指南》在C++编程中char和string是两种用于处理字符数据的类型,但它们在使用方式和功能上有显著的不同,:本文主要介绍C++中处理文本数... 目录1. 基本定义与本质2. 内存管理3. 操作与功能4. 性能特点5. 使用场景6. 相互转换核心区别

Python动态处理文件编码的完整指南

《Python动态处理文件编码的完整指南》在Python文件处理的高级应用中,我们经常会遇到需要动态处理文件编码的场景,本文将深入探讨Python中动态处理文件编码的技术,有需要的小伙伴可以了解下... 目录引言一、理解python的文件编码体系1.1 Python的IO层次结构1.2 编码问题的常见场景二

如何正确识别一台POE交换机的好坏? 选购可靠的POE交换机注意事项

《如何正确识别一台POE交换机的好坏?选购可靠的POE交换机注意事项》POE技术已经历多年发展,广泛应用于安防监控和无线覆盖等领域,需求量大,但质量参差不齐,市场上POE交换机的品牌繁多,如何正确识... 目录生产标识1. 必须包含的信息2. 劣质设备的常见问题供电标准1. 正规的 POE 标准2. 劣质设

Python函数的基本用法、返回值特性、全局变量修改及异常处理技巧

《Python函数的基本用法、返回值特性、全局变量修改及异常处理技巧》本文将通过实际代码示例,深入讲解Python函数的基本用法、返回值特性、全局变量修改以及异常处理技巧,感兴趣的朋友跟随小编一起看看... 目录一、python函数定义与调用1.1 基本函数定义1.2 函数调用二、函数返回值详解2.1 有返

SpringBoot分段处理List集合多线程批量插入数据方式

《SpringBoot分段处理List集合多线程批量插入数据方式》文章介绍如何处理大数据量List批量插入数据库的优化方案:通过拆分List并分配独立线程处理,结合Spring线程池与异步方法提升效率... 目录项目场景解决方案1.实体类2.Mapper3.spring容器注入线程池bejsan对象4.创建