ActiveMQ源码架构解析第二节

2024-05-07 09:18

本文主要是介绍ActiveMQ源码架构解析第二节,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

ActiveMQ源码架构解析第二节

  • 博客分类:
  • ActiveMQ

 

    本节主要内容就是讲解消息的传递方式,上一节已经讲解完客户端和broker端连接的建立方式,在Connection、Session、Producer类对象建立的同时,客户端和broker端会进行一些消息交互,ActiveMQ中把所有的消息交互的内容都叫做Command,每条消息对应一个Command,例如客户端刚连接到broker,broker会发送一个BrokerInfo信息到客户端,接着客户端会发送ConnectionInfo连接信息、ProducerInfo生产者信息等等到服务端,如下图所示:

 



 

 

 

 

这些命令都继承于BaseCommand抽象类并实现于Command接口,类图如下,这里使用了访问者设计模式以及适配器设计模式。



 

适配器模式分为三种实现,第一种是通过继承实现,第二种是通过组合实现,第三种也就是类图中画的叫做默认适配器,CommandVisiter接口中定义了处理所有消息的方法,是broker和客户端api公用的一个接口,但是客户端用不到全部接口,如果实现这个接口那又必须实现全部的方法,所以此处在接口和具体类中间新增了一个CommandVisiterAdapter抽象类实现了全部的处理消息的方法并且全部都是空实现,这样在新建DefaultVisiter的时候就可以根据自己的需要来选择相应的方法进行实现了,访问者设计模式的体现请看下面的代码:

 

/**

 

     * reads packets from a Socket

 

     */

 

    publicvoid run() {

 

        LOG.trace("TCP consumer thread for " + this + " starting");

 

        this.runnerThread=Thread.currentThread();

 

        try {

 

            while (!isStopped()) {

 

                doRun();

 

            }

 

        } catch (IOException e) {

 

            stoppedLatch.get().countDown();

 

            onException(e);

 

        } catch (Throwable e){

 

            stoppedLatch.get().countDown();

 

            IOException ioe=new IOException("Unexpected error occured: " + e);

 

            ioe.initCause(e);

 

            onException(ioe);

 

        }finally {

 

            stoppedLatch.get().countDown();

 

        }

 

    }

 

 

 

    protectedvoid doRun() throws IOException {

 

        try {

 

            Object command = readCommand();

 

            doConsume(command);

 

        } catch (SocketTimeoutException e) {

 

        } catch (InterruptedIOException e) {

 

        }

 

    }

 

 

 

    protected Object readCommand() throws IOException {

 

        return wireFormat.unmarshal(dataIn);

 

}

 

 

 

这段代码是来自TcpTransport.java中,上一节已经讲解完TcpTransport的建立所以此处不在熬述,可以看到线程启动后一直在调用doRun方法,doRun方法则调用readCommand来读取客户端发送过来的信息,读到之后就会调用ActiveMQConnection.javaonCommand方法,如下代码:

 

 

 

@Override

 

    publicvoid onCommand(final Object o) {

 

        final Command command = (Command)o;

 

        if (!closed.get() && command != null) {

 

            try {

 

                command.visit(new CommandVisitorAdapter() {

 

                    @Override

 

                    public Response processMessageDispatch(MessageDispatch md) throws Exception {

 

                        waitForTransportInterruptionProcessingToComplete();

 

                        ActiveMQDispatcher dispatcher = dispatchers.get(md.getConsumerId());

 

                        if (dispatcher != null) {

 

                            // Copy in case a embedded broker is dispatching via

 

                            // vm://

 

                            // md.getMessage() == null to signal end of queue

 

                            // browse.

 

                            Message msg = md.getMessage();

 

                            if (msg != null) {

 

                                msg = msg.copy();

 

                                msg.setReadOnlyBody(true);

 

                                msg.setReadOnlyProperties(true);

 

                                msg.setRedeliveryCounter(md.getRedeliveryCounter());

 

                                msg.setConnection(ActiveMQConnection.this);

 

                                msg.setMemoryUsage(null);

 

                                md.setMessage(msg);

 

                            }

 

                            dispatcher.dispatch(md);

 

                        }

 

                        returnnull;

 

                    }

 

 

 

                    @Override

 

                    public Response processProducerAck(ProducerAck pa) throws Exception {

 

                        if (pa != null && pa.getProducerId() != null) {

 

                            ActiveMQMessageProducer producer = producers.get(pa.getProducerId());

 

                            if (producer != null) {

 

                                producer.onProducerAck(pa);

 

                            }

 

                        }

 

                        returnnull;

 

                    }

 

 

 

                    @Override

 

                    public Response processBrokerInfo(BrokerInfo info) throws Exception {

 

                        brokerInfo = info;

 

                        brokerInfoReceived.countDown();

 

                        optimizeAcknowledge &= !brokerInfo.isFaultTolerantConfiguration();

 

                        getBlobTransferPolicy().setBrokerUploadUrl(info.getBrokerUploadUrl());

 

                        returnnull;

 

                    }

 

 

 

                    @Override

 

                    public Response processConnectionError(final ConnectionError error) throws Exception {

 

                        executor.execute(new Runnable() {

 

                            @Override

 

                            publicvoid run() {

 

                                onAsyncException(error.getException());

 

                            }

 

                        });

 

                        returnnull;

 

                    }

 

 

 

                    @Override

 

                    public Response processControlCommand(ControlCommand command) throws Exception {

 

                        onControlCommand(command);

 

                        returnnull;

 

                    }

 

 

 

                    @Override

 

                    public Response processConnectionControl(ConnectionControl control) throws Exception {

 

                        onConnectionControl((ConnectionControl)command);

 

                        returnnull;

 

                    }

 

 

 

                    @Override

 

                    public Response processConsumerControl(ConsumerControl control) throws Exception {

 

                        onConsumerControl((ConsumerControl)command);

 

                        returnnull;

 

                    }

 

 

 

                    @Override

 

                    public Response processWireFormat(WireFormatInfo info) throws Exception {

 

                        onWireFormatInfo((WireFormatInfo)command);

 

                        returnnull;

 

                    }

 

                });

 

            } catch (Exception e) {

 

                onClientInternalException(e);

 

            }

 

        }

 

 

 

        for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {

 

            TransportListener listener = iter.next();

 

            listener.onCommand(command);

 

        }

 

}

 

 

 

可以看到Command作为入参传进onCommand方法,然后方法中调用command.visit(new CommandVisitorAdapter() {…}),new CommandVisitorAdapter就是上面介绍过的适配器设计模式,这里可以看到这个匿名类只需实现可以用到的方法,而不是实现所有的接口方法。这里command.visit的好处是传入进来的command程序不需要判断他是什么类型的command然后在决定调用这个command的某个方法,而是直接调用visit方法即可,而所有的业务逻辑也是统一的在CommandVisitorAdaptor中实现,这也算是java中动态多分派的实现。

 

知道了消息接收处理以及broker和客户端的信息交互之后,我们在来看下消息是如何从Command类序列化到字节写入以及字节如何反序列化转成Command类的,在ActiveMQ中,每一个Command消息都对应一个CommandMarshal类,例如ConnectionInfo使用ConnectionInfoMarshal来序列化和反序列化,BrokerInfo使用BrokerInfoMarshal来序列化和反序列化,ConnectionInfoMarshalmarshal的实现就是把字符串或者字节等信息写入到socket的输出流中没有什么可说的,序列化和反序列化又分为两种,一种是tight一种是loosetight会针对cpu来进行优化,先写入大小,在写入具体数据,而loose方式则直接写入数据,两种方式都会使用缓存功能,客户端和服务端都分别存在marshal[]unMarshal[]数组,例如客户端给服务端发送ProducerInfo信息,第一次发送后会把ProducerInfo存放在marshal的第0位,然后服务端接收后会把这个producerInfo放在unMarshal[0]中,如果客户端在次发送ProducerInfo则从缓存中取,找到ProducerInfomarshal[]的第0个,则直接发送0,服务端则从unMarshal[]0个取出使用,这节就到这里吧,想到什么我在补充,欢迎跟帖讨论。

这篇关于ActiveMQ源码架构解析第二节的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/966940

相关文章

深度解析Python中递归下降解析器的原理与实现

《深度解析Python中递归下降解析器的原理与实现》在编译器设计、配置文件处理和数据转换领域,递归下降解析器是最常用且最直观的解析技术,本文将详细介绍递归下降解析器的原理与实现,感兴趣的小伙伴可以跟随... 目录引言:解析器的核心价值一、递归下降解析器基础1.1 核心概念解析1.2 基本架构二、简单算术表达

深度解析Java @Serial 注解及常见错误案例

《深度解析Java@Serial注解及常见错误案例》Java14引入@Serial注解,用于编译时校验序列化成员,替代传统方式解决运行时错误,适用于Serializable类的方法/字段,需注意签... 目录Java @Serial 注解深度解析1. 注解本质2. 核心作用(1) 主要用途(2) 适用位置3

Java MCP 的鉴权深度解析

《JavaMCP的鉴权深度解析》文章介绍JavaMCP鉴权的实现方式,指出客户端可通过queryString、header或env传递鉴权信息,服务器端支持工具单独鉴权、过滤器集中鉴权及启动时鉴权... 目录一、MCP Client 侧(负责传递,比较简单)(1)常见的 mcpServers json 配置

从原理到实战解析Java Stream 的并行流性能优化

《从原理到实战解析JavaStream的并行流性能优化》本文给大家介绍JavaStream的并行流性能优化:从原理到实战的全攻略,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的... 目录一、并行流的核心原理与适用场景二、性能优化的核心策略1. 合理设置并行度:打破默认阈值2. 避免装箱

Maven中生命周期深度解析与实战指南

《Maven中生命周期深度解析与实战指南》这篇文章主要为大家详细介绍了Maven生命周期实战指南,包含核心概念、阶段详解、SpringBoot特化场景及企业级实践建议,希望对大家有一定的帮助... 目录一、Maven 生命周期哲学二、default生命周期核心阶段详解(高频使用)三、clean生命周期核心阶

深入解析C++ 中std::map内存管理

《深入解析C++中std::map内存管理》文章详解C++std::map内存管理,指出clear()仅删除元素可能不释放底层内存,建议用swap()与空map交换以彻底释放,针对指针类型需手动de... 目录1️、基本清空std::map2️、使用 swap 彻底释放内存3️、map 中存储指针类型的对象

Java Scanner类解析与实战教程

《JavaScanner类解析与实战教程》JavaScanner类(java.util包)是文本输入解析工具,支持基本类型和字符串读取,基于Readable接口与正则分隔符实现,适用于控制台、文件输... 目录一、核心设计与工作原理1.底层依赖2.解析机制A.核心逻辑基于分隔符(delimiter)和模式匹

Java+AI驱动实现PDF文件数据提取与解析

《Java+AI驱动实现PDF文件数据提取与解析》本文将和大家分享一套基于AI的体检报告智能评估方案,详细介绍从PDF上传、内容提取到AI分析、数据存储的全流程自动化实现方法,感兴趣的可以了解下... 目录一、核心流程:从上传到评估的完整链路二、第一步:解析 PDF,提取体检报告内容1. 引入依赖2. 封装

深度解析Python yfinance的核心功能和高级用法

《深度解析Pythonyfinance的核心功能和高级用法》yfinance是一个功能强大且易于使用的Python库,用于从YahooFinance获取金融数据,本教程将深入探讨yfinance的核... 目录yfinance 深度解析教程 (python)1. 简介与安装1.1 什么是 yfinance?

99%的人都选错了! 路由器WiFi双频合一还是分开好的专业解析与适用场景探讨

《99%的人都选错了!路由器WiFi双频合一还是分开好的专业解析与适用场景探讨》关于双频路由器的“双频合一”与“分开使用”两种模式,用户往往存在诸多疑问,本文将从多个维度深入探讨这两种模式的优缺点,... 在如今“没有WiFi就等于与世隔绝”的时代,越来越多家庭、办公室都开始配置双频无线路由器。但你有没有注