Yahoo pulsar -1 Producer send和Consumer receive 过程源码剖析

2024-02-18 06:58

本文主要是介绍Yahoo pulsar -1 Producer send和Consumer receive 过程源码剖析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!



1、同步发送消息  


 MessageId send(byte[] message) throws PulsarClientException;



   private static final AtomicLongFieldUpdater<ProducerImpl> msgIdGeneratorUpdater = AtomicLongFieldUpdater
            .newUpdater(ProducerImpl.class, "msgIdGenerator");



msgIdGeneratorUpdater 用于生成请求ID,每次发送请求ID都不一样,用来区分消息。


 private final BlockingQueue<OpSendMsg> pendingMessages  = Queues.newArrayBlockingQueue(conf.getMaxPendingMessages());   阻塞的消息队列,用于缓存发送消息。



从Channel所在的线程  eventLoop()来执行发送请求。避免线程切换。


// Read the connection before validating if it's still connected, so that we avoid reading a null
                    // value
                    ClientCnx cnx = cnx();
                    if (isConnected()) {
                        // If we do have a connection, the message is sent immediately, otherwise we'll try again once a
                        // new
                        // connection is established
                        cmd.retain();
                        cnx.ctx().channel().eventLoop().execute(WriteInEventLoopCallback.create(this, cnx, op));

                   }



    private static final class WriteInEventLoopCallback implements Runnable {private ProducerImpl producer;private ClientCnx cnx;private OpSendMsg op;static WriteInEventLoopCallback create(ProducerImpl producer, ClientCnx cnx, OpSendMsg op) {WriteInEventLoopCallback c = RECYCLER.get();c.producer = producer;c.cnx = cnx;c.op = op;return c;}@Overridepublic void run() {if (log.isDebugEnabled()) {log.debug("[{}] [{}] Sending message cnx {}, sequenceId {}", producer.topic, producer.producerName, cnx,op.sequenceId);}try {cnx.ctx().writeAndFlush(op.cmd, cnx.ctx().voidPromise());} finally {recycle();}}private void recycle() {producer = null;cnx = null;op = null;RECYCLER.recycle(this, recyclerHandle);}private final Handle recyclerHandle;private WriteInEventLoopCallback(Handle recyclerHandle) {this.recyclerHandle = recyclerHandle;}private static final Recycler<WriteInEventLoopCallback> RECYCLER = new Recycler<WriteInEventLoopCallback>() {@Overrideprotected WriteInEventLoopCallback newObject(Handle handle) {return new WriteInEventLoopCallback(handle);}};}

发送消息后接受响应消息:




 @Over

这篇关于Yahoo pulsar -1 Producer send和Consumer receive 过程源码剖析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/720399

相关文章

Redis中Hash从使用过程到原理说明

《Redis中Hash从使用过程到原理说明》RedisHash结构用于存储字段-值对,适合对象数据,支持HSET、HGET等命令,采用ziplist或hashtable编码,通过渐进式rehash优化... 目录一、开篇:Hash就像超市的货架二、Hash的基本使用1. 常用命令示例2. Java操作示例三

Redis中Set结构使用过程与原理说明

《Redis中Set结构使用过程与原理说明》本文解析了RedisSet数据结构,涵盖其基本操作(如添加、查找)、集合运算(交并差)、底层实现(intset与hashtable自动切换机制)、典型应用场... 目录开篇:从购物车到Redis Set一、Redis Set的基本操作1.1 编程常用命令1.2 集

Linux下利用select实现串口数据读取过程

《Linux下利用select实现串口数据读取过程》文章介绍Linux中使用select、poll或epoll实现串口数据读取,通过I/O多路复用机制在数据到达时触发读取,避免持续轮询,示例代码展示设... 目录示例代码(使用select实现)代码解释总结在 linux 系统里,我们可以借助 select、

k8s中实现mysql主备过程详解

《k8s中实现mysql主备过程详解》文章讲解了在K8s中使用StatefulSet部署MySQL主备架构,包含NFS安装、storageClass配置、MySQL部署及同步检查步骤,确保主备数据一致... 目录一、k8s中实现mysql主备1.1 环境信息1.2 部署nfs-provisioner1.2.

MyBatis/MyBatis-Plus同事务循环调用存储过程获取主键重复问题分析及解决

《MyBatis/MyBatis-Plus同事务循环调用存储过程获取主键重复问题分析及解决》MyBatis默认开启一级缓存,同一事务中循环调用查询方法时会重复使用缓存数据,导致获取的序列主键值均为1,... 目录问题原因解决办法如果是存储过程总结问题myBATis有如下代码获取序列作为主键IdMappe

linux部署NFS和autofs自动挂载实现过程

《linux部署NFS和autofs自动挂载实现过程》文章介绍了NFS(网络文件系统)和Autofs的原理与配置,NFS通过RPC实现跨系统文件共享,需配置/etc/exports和nfs.conf,... 目录(一)NFS1. 什么是NFS2.NFS守护进程3.RPC服务4. 原理5. 部署5.1安装NF

java 恺撒加密/解密实现原理(附带源码)

《java恺撒加密/解密实现原理(附带源码)》本文介绍Java实现恺撒加密与解密,通过固定位移量对字母进行循环替换,保留大小写及非字母字符,由于其实现简单、易于理解,恺撒加密常被用作学习加密算法的入... 目录Java 恺撒加密/解密实现1. 项目背景与介绍2. 相关知识2.1 恺撒加密算法原理2.2 Ja

MySQL使用EXISTS检查记录是否存在的详细过程

《MySQL使用EXISTS检查记录是否存在的详细过程》EXISTS是SQL中用于检查子查询是否返回至少一条记录的运算符,它通常用于测试是否存在满足特定条件的记录,从而在主查询中进行相应操作,本文给大... 目录基本语法示例数据库和表结构1. 使用 EXISTS 在 SELECT 语句中2. 使用 EXIS

Nginx屏蔽服务器名称与版本信息方式(源码级修改)

《Nginx屏蔽服务器名称与版本信息方式(源码级修改)》本文详解如何通过源码修改Nginx1.25.4,移除Server响应头中的服务类型和版本信息,以增强安全性,需重新配置、编译、安装,升级时需重复... 目录一、背景与目的二、适用版本三、操作步骤修改源码文件四、后续操作提示五、注意事项六、总结一、背景与

Android实现图片浏览功能的示例详解(附带源码)

《Android实现图片浏览功能的示例详解(附带源码)》在许多应用中,都需要展示图片并支持用户进行浏览,本文主要为大家介绍了如何通过Android实现图片浏览功能,感兴趣的小伙伴可以跟随小编一起学习一... 目录一、项目背景详细介绍二、项目需求详细介绍三、相关技术详细介绍四、实现思路详细介绍五、完整实现代码