RabbitMQ(1) 三种exchange总结

2024-08-21 15:32

本文主要是介绍RabbitMQ(1) 三种exchange总结,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

……List……

1.AMQP

2.几个基本概念

3.三种 exchange

4.思考总结



……1.AMQP……


        在AMQP(高级消息队列协议)协议中,queue、exchange、binding构成了协议的核心。exchange是一个重要的组件,那么如何理解exchange呢?刚刚开始学习消息队列的时候,想起作品展时候做的一个基于SMTP协议的邮箱。不同的是MQ中没有发送者和接受者的概念,也不是客户端服务器的概念。MQ专注于应用程序之间的消息通信,是一种生产者和消费者之间的关系。exchange是producer和consumer之间消息交换机。




……2.几个基本概念……


        在介绍exchange类型之前,需要了解几个基本的概念。

  • Broker:消息队列服务器实体
  • exchange:消息交换机,指定消息规则,处理消息和队列之间的关系
  • queue:队列载体,消息投入队列中
  • binding:绑定,把exchange和queue按照路由规则绑定起来
  • Routing Key:路由关键字。exchange根据这个进行消息投递
  • vhost:虚拟消息服务器,每个RabbitMQ服务器都能够创建虚拟消息服务器。
  • producer:生产者,投递消息的程序
  • consumer:消费者,接受消息的程序
  • channel:信道(重要概念),打开信道才能进行通信,一个channel代码一个会话任务。



……3.三种exchange……

        

           vhost通过创建channel连接程序,会根据routingKey将M从exchange到queue,那根据什么规则进行投递消息?如果有好多不同类型的M,那么应该如何选择?AMQP中定义了不同类型的exchange就可以发挥作用了。一种三种类型:fanout,topic,direct。每一种类型实现不同的路由算法。



3.1 Fanout Exchange 不规则路由


         将收到的消息广播到绑定的队列上。当你发送一条消息到fanout exchange上,它会把消息投递给所有附加到此交换机上面的队列。很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout交换机转发消息是最快的。 


生产者(提供服务的系统):

package cn.itcast.rabbitmq.ps;import cn.itcast.rabbitmq.util.ConnectionUtil;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;public class Send {private final static String EXCHANGE_NAME = "test_exchange_fanout";public static void main(String[] argv) throws Exception {// 获取到连接以及mq通道Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 声明exchangechannel.exchangeDeclare(EXCHANGE_NAME, "fanout");// 消息内容String message = "商品已经新增,id = 1000";channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");channel.close();connection.close();}
}


消费者(调用服务的系统):

package cn.itcast.rabbitmq.ps;import cn.itcast.rabbitmq.util.ConnectionUtil;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;public class Recv {private final static String QUEUE_NAME = "test_queue_fanout_1";private final static String EXCHANGE_NAME = "test_exchange_fanout";public static void main(String[] argv) throws Exception {// 获取到连接以及mq通道Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 绑定队列到交换机channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");// 同一时刻服务器只会发一条消息给消费者channel.basicQos(1);// 定义队列的消费者QueueingConsumer consumer = new QueueingConsumer(channel);// 监听队列,手动返回完成channel.basicConsume(QUEUE_NAME, false, consumer);// 获取消息while (true) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();String message = new String(delivery.getBody());System.out.println(" 前台系统: '" + message + "'");Thread.sleep(10);channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}}
}


3.2 Direct Exchange 处理路由


     生产者在发送消息的时候都需要指定一个routingkey和exchange,exchange在接到rooutingkey的时候与该exchange关联的所有binding中的bindingkey进行比较,如果相等,则发送到binding对应的queue中。


生产者(提供服务的系统):

package cn.itcast.rabbitmq.routing;import cn.itcast.rabbitmq.util.ConnectionUtil;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;public class Send {private final static String EXCHANGE_NAME = "test_exchange_direct";public static void main(String[] argv) throws Exception {// 获取到连接以及mq通道Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 声明exchangechannel.exchangeDeclare(EXCHANGE_NAME, "direct");// 消息内容String message = "删除商品, id = 1001";channel.basicPublish(EXCHANGE_NAME, "delete", null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");channel.close();connection.close();}
}


消费者(调用服务的系统):
package cn.itcast.rabbitmq.routing;import cn.itcast.rabbitmq.util.ConnectionUtil;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;public class Recv {private final static String QUEUE_NAME = "test_queue_direct_1";private final static String EXCHANGE_NAME = "test_exchange_direct";public static void main(String[] argv) throws Exception {// 获取到连接以及mq通道Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 绑定队列到交换机channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");// 同一时刻服务器只会发一条消息给消费者channel.basicQos(1);// 定义队列的消费者QueueingConsumer consumer = new QueueingConsumer(channel);// 监听队列,手动返回完成channel.basicConsume(QUEUE_NAME, false, consumer);// 获取消息while (true) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();String message = new String(delivery.getBody());System.out.println(" 前台系统: '" + message + "'");Thread.sleep(10);channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}}
}


3.3 Topic Exchange 通配符路由


         将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。这个很像Javascript中的正则表达式。符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词。因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.*” 只会匹配到“audit.irs”。


生产者(提供服务的系统):

package cn.itcast.rabbitmq.topic;import cn.itcast.rabbitmq.util.ConnectionUtil;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;public class Send {private final static String EXCHANGE_NAME = "test_exchange_topic";public static void main(String[] argv) throws Exception {// 获取到连接以及mq通道Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 声明exchangechannel.exchangeDeclare(EXCHANGE_NAME, "topic");// 消息内容String message = "删除商品,id = 1001";channel.basicPublish(EXCHANGE_NAME, "item.delete", null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");channel.close();connection.close();}
}


消费者(调用服务的系统):

消费者1:

package cn.itcast.rabbitmq.topic;import cn.itcast.rabbitmq.util.ConnectionUtil;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;public class Recv {private final static String QUEUE_NAME = "test_queue_topic_1";private final static String EXCHANGE_NAME = "test_exchange_topic";public static void main(String[] argv) throws Exception {// 获取到连接以及mq通道Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 绑定队列到交换机channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.update");channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.delete");// 同一时刻服务器只会发一条消息给消费者channel.basicQos(1);// 定义队列的消费者QueueingConsumer consumer = new QueueingConsumer(channel);// 监听队列,手动返回完成channel.basicConsume(QUEUE_NAME, false, consumer);// 获取消息while (true) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();String message = new String(delivery.getBody());System.out.println(" 前台系统: '" + message + "'");Thread.sleep(10);channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}}
}


消费者2:

package cn.itcast.rabbitmq.topic;import cn.itcast.rabbitmq.util.ConnectionUtil;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;public class Recv2 {private final static String QUEUE_NAME = "test_queue_topic_2";private final static String EXCHANGE_NAME = "test_exchange_topic";public static void main(String[] argv) throws Exception {// 获取到连接以及mq通道Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 绑定队列到交换机channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.#");// 同一时刻服务器只会发一条消息给消费者channel.basicQos(1);// 定义队列的消费者QueueingConsumer consumer = new QueueingConsumer(channel);// 监听队列,手动返回完成channel.basicConsume(QUEUE_NAME, false, consumer);// 获取消息while (true) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();String message = new String(delivery.getBody());System.out.println(" 搜索系统: '" + message + "'");Thread.sleep(10);channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}}
}



……4.思考总结……


          总结一下生产者和消费者都需要做什么事情。多学习多思考。




这篇关于RabbitMQ(1) 三种exchange总结的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1093551

相关文章

Python版本与package版本兼容性检查方法总结

《Python版本与package版本兼容性检查方法总结》:本文主要介绍Python版本与package版本兼容性检查方法的相关资料,文中提供四种检查方法,分别是pip查询、conda管理、PyP... 目录引言为什么会出现兼容性问题方法一:用 pip 官方命令查询可用版本方法二:conda 管理包环境方法

JavaScript对象转数组的三种方法实现

《JavaScript对象转数组的三种方法实现》本文介绍了在JavaScript中将对象转换为数组的三种实用方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友... 目录方法1:使用Object.keys()和Array.map()方法2:使用Object.entr

pycharm跑python项目易出错的问题总结

《pycharm跑python项目易出错的问题总结》:本文主要介绍pycharm跑python项目易出错问题的相关资料,当你在PyCharm中运行Python程序时遇到报错,可以按照以下步骤进行排... 1. 一定不要在pycharm终端里面创建环境安装别人的项目子模块等,有可能出现的问题就是你不报错都安装

React 记忆缓存的三种方法实现

《React记忆缓存的三种方法实现》本文主要介绍了React记忆缓存的三种方法实现,包含React.memo、useMemo、useCallback,用于避免不必要的组件重渲染和计算,感兴趣的可以... 目录1. React.memo2. useMemo3. useCallback使用场景与注意事项在 Re

Python中logging模块用法示例总结

《Python中logging模块用法示例总结》在Python中logging模块是一个强大的日志记录工具,它允许用户将程序运行期间产生的日志信息输出到控制台或者写入到文件中,:本文主要介绍Pyt... 目录前言一. 基本使用1. 五种日志等级2.  设置报告等级3. 自定义格式4. C语言风格的格式化方法

JavaScript中比较两个数组是否有相同元素(交集)的三种常用方法

《JavaScript中比较两个数组是否有相同元素(交集)的三种常用方法》:本文主要介绍JavaScript中比较两个数组是否有相同元素(交集)的三种常用方法,每种方法结合实例代码给大家介绍的非常... 目录引言:为什么"相等"判断如此重要?方法1:使用some()+includes()(适合小数组)方法2

Spring 依赖注入与循环依赖总结

《Spring依赖注入与循环依赖总结》这篇文章给大家介绍Spring依赖注入与循环依赖总结篇,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录1. Spring 三级缓存解决循环依赖1. 创建UserService原始对象2. 将原始对象包装成工

RabbitMQ 延时队列插件安装与使用示例详解(基于 Delayed Message Plugin)

《RabbitMQ延时队列插件安装与使用示例详解(基于DelayedMessagePlugin)》本文详解RabbitMQ通过安装rabbitmq_delayed_message_exchan... 目录 一、什么是 RabbitMQ 延时队列? 二、安装前准备✅ RabbitMQ 环境要求 三、安装延时队

MySQL中查询和展示LONGBLOB类型数据的技巧总结

《MySQL中查询和展示LONGBLOB类型数据的技巧总结》在MySQL中LONGBLOB是一种二进制大对象(BLOB)数据类型,用于存储大量的二进制数据,:本文主要介绍MySQL中查询和展示LO... 目录前言1. 查询 LONGBLOB 数据的大小2. 查询并展示 LONGBLOB 数据2.1 转换为十

spring AMQP代码生成rabbitmq的exchange and queue教程

《springAMQP代码生成rabbitmq的exchangeandqueue教程》使用SpringAMQP代码直接创建RabbitMQexchange和queue,并确保绑定关系自动成立,简... 目录spring AMQP代码生成rabbitmq的exchange and 编程queue执行结果总结s