Yahoo pulsar -1 Producer send和Consumer receive 过程源码剖析

2024-02-18 06:58

本文主要是介绍Yahoo pulsar -1 Producer send和Consumer receive 过程源码剖析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!



1、同步发送消息  


 MessageId send(byte[] message) throws PulsarClientException;



   private static final AtomicLongFieldUpdater<ProducerImpl> msgIdGeneratorUpdater = AtomicLongFieldUpdater
            .newUpdater(ProducerImpl.class, "msgIdGenerator");



msgIdGeneratorUpdater 用于生成请求ID,每次发送请求ID都不一样,用来区分消息。


 private final BlockingQueue<OpSendMsg> pendingMessages  = Queues.newArrayBlockingQueue(conf.getMaxPendingMessages());   阻塞的消息队列,用于缓存发送消息。



从Channel所在的线程  eventLoop()来执行发送请求。避免线程切换。


// Read the connection before validating if it's still connected, so that we avoid reading a null
                    // value
                    ClientCnx cnx = cnx();
                    if (isConnected()) {
                        // If we do have a connection, the message is sent immediately, otherwise we'll try again once a
                        // new
                        // connection is established
                        cmd.retain();
                        cnx.ctx().channel().eventLoop().execute(WriteInEventLoopCallback.create(this, cnx, op));

                   }



    private static final class WriteInEventLoopCallback implements Runnable {private ProducerImpl producer;private ClientCnx cnx;private OpSendMsg op;static WriteInEventLoopCallback create(ProducerImpl producer, ClientCnx cnx, OpSendMsg op) {WriteInEventLoopCallback c = RECYCLER.get();c.producer = producer;c.cnx = cnx;c.op = op;return c;}@Overridepublic void run() {if (log.isDebugEnabled()) {log.debug("[{}] [{}] Sending message cnx {}, sequenceId {}", producer.topic, producer.producerName, cnx,op.sequenceId);}try {cnx.ctx().writeAndFlush(op.cmd, cnx.ctx().voidPromise());} finally {recycle();}}private void recycle() {producer = null;cnx = null;op = null;RECYCLER.recycle(this, recyclerHandle);}private final Handle recyclerHandle;private WriteInEventLoopCallback(Handle recyclerHandle) {this.recyclerHandle = recyclerHandle;}private static final Recycler<WriteInEventLoopCallback> RECYCLER = new Recycler<WriteInEventLoopCallback>() {@Overrideprotected WriteInEventLoopCallback newObject(Handle handle) {return new WriteInEventLoopCallback(handle);}};}

发送消息后接受响应消息:




 @Over

这篇关于Yahoo pulsar -1 Producer send和Consumer receive 过程源码剖析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/720399

相关文章

canal实现mysql数据同步的详细过程

《canal实现mysql数据同步的详细过程》:本文主要介绍canal实现mysql数据同步的详细过程,本文通过实例图文相结合给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的... 目录1、canal下载2、mysql同步用户创建和授权3、canal admin安装和启动4、canal

MySQL存储过程之循环遍历查询的结果集详解

《MySQL存储过程之循环遍历查询的结果集详解》:本文主要介绍MySQL存储过程之循环遍历查询的结果集,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录前言1. 表结构2. 存储过程3. 关于存储过程的SQL补充总结前言近来碰到这样一个问题:在生产上导入的数据发现

SpringBoot集成LiteFlow实现轻量级工作流引擎的详细过程

《SpringBoot集成LiteFlow实现轻量级工作流引擎的详细过程》LiteFlow是一款专注于逻辑驱动流程编排的轻量级框架,它以组件化方式快速构建和执行业务流程,有效解耦复杂业务逻辑,下面给大... 目录一、基础概念1.1 组件(Component)1.2 规则(Rule)1.3 上下文(Conte

Spring Boot 整合 Apache Flink 的详细过程

《SpringBoot整合ApacheFlink的详细过程》ApacheFlink是一个高性能的分布式流处理框架,而SpringBoot提供了快速构建企业级应用的能力,下面给大家介绍Spri... 目录Spring Boot 整合 Apache Flink 教程一、背景与目标二、环境准备三、创建项目 & 添

pytest+allure环境搭建+自动化实践过程

《pytest+allure环境搭建+自动化实践过程》:本文主要介绍pytest+allure环境搭建+自动化实践过程,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐... 目录一、pytest下载安装1.1、安装pytest1.2、检测是否安装成功二、allure下载安装2.

Pytorch介绍与安装过程

《Pytorch介绍与安装过程》PyTorch因其直观的设计、卓越的灵活性以及强大的动态计算图功能,迅速在学术界和工业界获得了广泛认可,成为当前深度学习研究和开发的主流工具之一,本文给大家介绍Pyto... 目录1、Pytorch介绍1.1、核心理念1.2、核心组件与功能1.3、适用场景与优势总结1.4、优

Redis指南及6.2.x版本安装过程

《Redis指南及6.2.x版本安装过程》Redis是完全开源免费的,遵守BSD协议,是一个高性能(NOSQL)的key-value数据库,Redis是一个开源的使用ANSIC语言编写、支持网络、... 目录概述Redis特点Redis应用场景缓存缓存分布式会话分布式锁社交网络最新列表Redis各版本介绍旧

SpringBoot整合Sa-Token实现RBAC权限模型的过程解析

《SpringBoot整合Sa-Token实现RBAC权限模型的过程解析》:本文主要介绍SpringBoot整合Sa-Token实现RBAC权限模型的过程解析,本文给大家介绍的非常详细,对大家的学... 目录前言一、基础概念1.1 RBAC模型核心概念1.2 Sa-Token核心功能1.3 环境准备二、表结

Jvm sandbox mock机制的实践过程

《Jvmsandboxmock机制的实践过程》:本文主要介绍Jvmsandboxmock机制的实践过程,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、背景二、定义一个损坏的钟1、 Springboot工程中创建一个Clock类2、 添加一个Controller

python多线程并发测试过程

《python多线程并发测试过程》:本文主要介绍python多线程并发测试过程,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、并发与并行?二、同步与异步的概念?三、线程与进程的区别?需求1:多线程执行不同任务需求2:多线程执行相同任务总结一、并发与并行?1、