NSQ消息队列---总结篇

2024-04-25 17:36
文章标签 总结 队列 消息 nsq

本文主要是介绍NSQ消息队列---总结篇,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

架构

概念

nsqlookup:存储了nsqd的元数据和服务信息(endpoind),向消费者提供服务发现功能, 向nsqadmin提供数据查询功能。

nsqd: 是接收、队列和传送消息到客户端的守护进程。

nsqadmin:简单的管理界面,展示了topic, channel以及channel上的消费者,也可以创建topic,channel。

消息可靠性

(1)生产者不保证消息可靠

(2)消费者保证至少一次消费

发送逻辑

(1)根据配置指定的nsqd的ip, 选择一个机器,通过 HTTP API(也可以TCP)将消息发布到 nsqd的指定 topic

(2)当 producer初次发布的消息的 topic不存在,则会创建。

(3)对topic加锁,将消息发送给 memoryMsgChan中,然后释放锁。如果 memoryMsgChan满了,申请一个buff,把消息写到 Backend,后期被 backendMsgChan接收。

(4)messagePump 不断从 memoryChan/backend队列中读消息,并将消息每个复制一遍,发送给 topic下的所有channel

//nsqd/topic.go:220
func (t *Topic) messagePump() {...for {select {case msg = <-memoryMsgChan:case buf = <-backendChan:msg, err = decodeMessage(buf)...case <-t.channelUpdateChan:...case pause := <-t.pauseChan:...case <-t.exitChan:goto exit}for i, channel := range chans {chanMsg := msgif i > 0 {chanMsg = NewMessage(msg.ID, msg.Body)chanMsg.Timestamp = msg.TimestampchanMsg.deferred = msg.deferred}if chanMsg.deferred != 0 {channel.PutMessageDeferred(chanMsg, chanMsg.deferred)continue}err := channel.PutMessage(chanMsg)...}}
}

(5)channel的PutMessage和 topic类似,首先先写 memoryMsgChan, 满了写入 backend.

(6)protocol实例的messagePump方法从memoryMsgChan或backendMsgChan读取消息并通过p.SendMessage(client, msg)发送到客户端 ,消息写入client.Writer。

消费逻辑

(1)consumer将会从 nsqlookup 服务器节点上发现所有包含事件 topic的 nsqd节点。每个consumer向每个 nsqd主机进行订阅操作。

(2)根据获取到的机器信息,通过 TCPsubscribe 自己需要的channel。如果 topic 或者 channel没有创建,则会创建

(3)多个 consumer对应一个 channel, 每个消息将被传递到一个随机的 consumer中。

消费特点:

(1)支持延时消息。

(2)channel在consumer退出后并不会删除。

消息消费失败

这个担保是作为协议和工作流的一部分,工作原理如下(假设客户端成功连接并订阅一个话题):

1)客户表示已经准备好接收消息

2)NSQ 发送一条消息,并暂时将数据存储在本地(在 re-queue 或 timeout,采用大小堆【超时时间排序】和map存储)

3)客户端回复 FIN(结束)或 REQ(重新排队)分别指示成功或失败。如果客户端没有回复, NSQ 会在设定的时间超时,自动重新排队消息 (有个协程定时查看,根据当前时间跟最大堆的顶元素比较)。

消费者限流

就是客户端连接上nsqd之后,会告诉nsqd它的可接受的消息数量是多少,每当nsqd给客户端推送一条消息这个RDY就会减一,而客户端消费完毕并且发送一个FIN之后,这个RDY又会加一(其实这个设计有点类似tcp中的用来控制流量的窗口机制)。

客户端库的被设计成在 RDY 数达到配置 max-in-flight 的 25% 发送一个命令来更新 RDY 计数(并适当考虑连接到多个 nsqd 情况下,适当地分配)。

本地初始化consumer时配置 maxInFlight, 在服务端也配置一个 max_in_flight。每个连接默认:2500

客户端的职责:

  1. 引导并均匀地将max_in_flight配置分配给所有连接。

  2. 绝不允许所有连接的RDY计数总和(total_rdy_count)超过所配置的max_in_flight。

  3. 永远不要超过nsqd配置的每个连接的max_rdy_count。

  4. 公开API方法以可靠地指示消息流的不足(message flow starvation)。

客户端库应始终尝试在所有连接之间平均分配RDY计数。通常,此实现为max_in_flight / num_conns。

特性:

1.采用push机制,保证至少一次推送

性能:

单个节点性能

声明:请牢记 NSQ 设计的初衷是分布式。单个节点的性能非常重要,但这并不是我们所追求的。

  • 2012 MacBook Air i7 2ghz

  • go1.2

  • NSQ v0.2.24

  • 200 byte messages

GOMAXPROCS=1 (单个生产者,单个消费者)

$ ./bench.sh results... PUB: 2014/01/12 22:09:08 duration: 2.311925588s - 82.500mb/s - 432539.873ops/s - 2.312us/op SUB: 2014/01/12 22:09:19 duration: 6.009749983s - 31.738mb/s - 166396.273ops/s - 6.010us/op

GOMAXPROCS=4 (4 publishers, 4 consumers)

$ ./bench.sh results... PUB: 2014/01/13 16:58:05 duration: 1.411492441s - 135.130mb/s - 708469.965ops/s - 1.411us/op SUB: 2014/01/13 16:58:16 duration: 5.251380583s - 36.321mb/s - 190426.114ops/s - 5.251us/op

引用:

分布式实时消息平台NSQ - 知乎

https://juejin.cn/post/6932865148784902158

http://nsqio.cn/design.html

这篇关于NSQ消息队列---总结篇的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/935346

相关文章

Qt实现网络数据解析的方法总结

《Qt实现网络数据解析的方法总结》在Qt中解析网络数据通常涉及接收原始字节流,并将其转换为有意义的应用层数据,这篇文章为大家介绍了详细步骤和示例,感兴趣的小伙伴可以了解下... 目录1. 网络数据接收2. 缓冲区管理(处理粘包/拆包)3. 常见数据格式解析3.1 jsON解析3.2 XML解析3.3 自定义

Java的栈与队列实现代码解析

《Java的栈与队列实现代码解析》栈是常见的线性数据结构,栈的特点是以先进后出的形式,后进先出,先进后出,分为栈底和栈顶,栈应用于内存的分配,表达式求值,存储临时的数据和方法的调用等,本文给大家介绍J... 目录栈的概念(Stack)栈的实现代码队列(Queue)模拟实现队列(双链表实现)循环队列(循环数组

Python实现图片分割的多种方法总结

《Python实现图片分割的多种方法总结》图片分割是图像处理中的一个重要任务,它的目标是将图像划分为多个区域或者对象,本文为大家整理了一些常用的分割方法,大家可以根据需求自行选择... 目录1. 基于传统图像处理的分割方法(1) 使用固定阈值分割图片(2) 自适应阈值分割(3) 使用图像边缘检测分割(4)

Redis消息队列实现异步秒杀功能

《Redis消息队列实现异步秒杀功能》在高并发场景下,为了提高秒杀业务的性能,可将部分工作交给Redis处理,并通过异步方式执行,Redis提供了多种数据结构来实现消息队列,总结三种,本文详细介绍Re... 目录1 Redis消息队列1.1 List 结构1.2 Pub/Sub 模式1.3 Stream 结

Windows Docker端口占用错误及解决方案总结

《WindowsDocker端口占用错误及解决方案总结》在Windows环境下使用Docker容器时,端口占用错误是开发和运维中常见且棘手的问题,本文将深入剖析该问题的成因,介绍如何通过查看端口分配... 目录引言Windows docker 端口占用错误及解决方案汇总端口冲突形成原因解析诊断当前端口情况解

SpringKafka错误处理(重试机制与死信队列)

《SpringKafka错误处理(重试机制与死信队列)》SpringKafka提供了全面的错误处理机制,通过灵活的重试策略和死信队列处理,下面就来介绍一下,具有一定的参考价值,感兴趣的可以了解一下... 目录引言一、Spring Kafka错误处理基础二、配置重试机制三、死信队列实现四、特定异常的处理策略五

在Android平台上实现消息推送功能

《在Android平台上实现消息推送功能》随着移动互联网应用的飞速发展,消息推送已成为移动应用中不可或缺的功能,在Android平台上,实现消息推送涉及到服务端的消息发送、客户端的消息接收、通知渠道(... 目录一、项目概述二、相关知识介绍2.1 消息推送的基本原理2.2 Firebase Cloud Me

SpringKafka消息发布之KafkaTemplate与事务支持功能

《SpringKafka消息发布之KafkaTemplate与事务支持功能》通过本文介绍的基本用法、序列化选项、事务支持、错误处理和性能优化技术,开发者可以构建高效可靠的Kafka消息发布系统,事务支... 目录引言一、KafkaTemplate基础二、消息序列化三、事务支持机制四、错误处理与重试五、性能优

SpringIntegration消息路由之Router的条件路由与过滤功能

《SpringIntegration消息路由之Router的条件路由与过滤功能》本文详细介绍了Router的基础概念、条件路由实现、基于消息头的路由、动态路由与路由表、消息过滤与选择性路由以及错误处理... 目录引言一、Router基础概念二、条件路由实现三、基于消息头的路由四、动态路由与路由表五、消息过滤

java常见报错及解决方案总结

《java常见报错及解决方案总结》:本文主要介绍Java编程中常见错误类型及示例,包括语法错误、空指针异常、数组下标越界、类型转换异常、文件未找到异常、除以零异常、非法线程操作异常、方法未定义异常... 目录1. 语法错误 (Syntax Errors)示例 1:解决方案:2. 空指针异常 (NullPoi