MQ - RabbitMQ - 4种Exchange类型

2024-04-30 19:38
文章标签 类型 rabbitmq exchange mq

本文主要是介绍MQ - RabbitMQ - 4种Exchange类型,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

 

但在具体的使用中,我们还需知道exchange的类型,因为不同的类型对应不同的队列和路由规则。

在rabbitmq中,exchange有4个类型:direct,topic,fanout,header。

direct exchange

此类型的exchange路由规则很简单:

exchange在和queue进行binding时会设置routingkey

channel.QueueBind(queue: "create_pdf_queue",exchange: "pdf_events",routingKey: "pdf_create",arguments: null);

 然后我们在将消息发送到exchange时会设置对应的routingkey:

channel.BasicPublish(exchange: "pdf_events",routingKey: "pdf_create",basicProperties: properties,body: body);

 在direct类型的exchange中,只有这两个routingkey完全相同,exchange才会选择对应的binging进行消息路由。

具体的流程如下:

通过代码可以会理解好一点:

var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{// Direct类型的exchange, 名称 pdf_eventschannel.ExchangeDeclare(exchange: "pdf_events",type: ExchangeType.Direct,durable: true,autoDelete: false,arguments: null);// 创建create_pdf_queue队列channel.QueueDeclare(queue: "create_pdf_queue",durable: true,exclusive: false,autoDelete: false,arguments: null);//创建 pdf_log_queue队列channel.QueueDeclare(queue: "pdf_log_queue",durable: true,exclusive: false,autoDelete: false,arguments: null);//绑定 pdf_events --> create_pdf_queue 使用routingkey:pdf_createchannel.QueueBind(queue: "create_pdf_queue",exchange: "pdf_events",routingKey: "pdf_create",arguments: null);//绑定 pdf_events --> pdf_log_queue 使用routingkey:pdf_logchannel.QueueBind(queue: "pdf_log_queue",exchange: "pdf_events",routingKey: "pdf_log",arguments: null);var message = "Demo some pdf creating...";var body = Encoding.UTF8.GetBytes(message);var properties = channel.CreateBasicProperties();properties.Persistent = true;//发送消息到exchange :pdf_events ,使用routingkey: pdf_create//通过binding routinekey的比较,次消息会路由到队列 create_pdf_queuechannel.BasicPublish(exchange: "pdf_events",routingKey: "pdf_create",basicProperties: properties,body: body);message = "pdf loging ...";body = Encoding.UTF8.GetBytes(message);properties = channel.CreateBasicProperties();properties.Persistent = true;//发送消息到exchange :pdf_events ,使用routingkey: pdf_log//通过binding routinekey的比较,次消息会路由到队列 pdf_log_queuechannel.BasicPublish(exchange: "pdf_events",routingKey: "pdf_log",basicProperties: properties,body: body);}

 topic exchange

此类型exchange和上面的direct类型差不多,但direct类型要求routingkey完全相等,这里的routingkey可以有通配符:'*','#'.

其中'*'表示匹配一个单词, '#'则表示匹配没有或者多个单词

如上图第一个binding:

  • exchange: agreements
  • queue A:  berlin_agreements
  • binding routingkey: agreements.eu.berlin.#

第二个binding: 

  • exchange: agreements
  • queue B: all_agreements
  • binding routingkey: agreements.#

第三个binding:

  • exchange: agreements
  • queue c: headstore_agreements
  • binding routingkey: agreements.eu.*.headstore

所以如果我们消息的routingkey为agreements.eu.berlin那么符合第一和第二个binding,但最后一个不符合,具体的代码如下:

var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{// Topic类型的exchange, 名称 agreementschannel.ExchangeDeclare(exchange: "agreements",type: ExchangeType.Topic,durable: true,autoDelete: false,arguments: null);// 创建berlin_agreements队列channel.QueueDeclare(queue: "berlin_agreements",durable: true,exclusive: false,autoDelete: false,arguments: null);//创建 all_agreements 队列channel.QueueDeclare(queue: "all_agreements",durable: true,exclusive: false,autoDelete: false,arguments: null);//创建 headstore_agreements 队列channel.QueueDeclare(queue: "headstore_agreements",durable: true,exclusive: false,autoDelete: false,arguments: null);//绑定 agreements --> berlin_agreements 使用routingkey:agreements.eu.berlin.#channel.QueueBind(queue: "berlin_agreements",exchange: "agreements",routingKey: "agreements.eu.berlin.#",arguments: null);//绑定 agreements --> all_agreements 使用routingkey:agreements.#channel.QueueBind(queue: "all_agreements",exchange: "agreements",routingKey: "agreements.#",arguments: null);//绑定 agreements --> headstore_agreements 使用routingkey:agreements.eu.*.headstorechannel.QueueBind(queue: "headstore_agreements",exchange: "agreements",routingKey: "agreements.eu.*.headstore",arguments: null);var message = "hello world";var body = Encoding.UTF8.GetBytes(message);var properties = channel.CreateBasicProperties();properties.Persistent = true;//发送消息到exchange :agreements ,使用routingkey: agreements.eu.berlin//agreements.eu.berlin 匹配  agreements.eu.berlin.# 和agreements.#//agreements.eu.berlin 不匹配  agreements.eu.*.headstore//最终次消息会路由到队里:berlin_agreements(agreements.eu.berlin.#) 和 all_agreements(agreements.#)channel.BasicPublish(exchange: "agreements",routingKey: "agreements.eu.berlin",basicProperties: properties,body: body);}

 fanout exchange

此exchange的路由规则很简单直接将消息路由到所有绑定的队列中,无须对消息的routingkey进行匹配操作。

header exchange

 此类型的exchange和以上三个都不一样,其路由的规则是根据header来判断,其中的header就是以下方法的arguments参数:

Dictionary<string, object> aHeader = new Dictionary<string, object>();
aHeader.Add("format", "pdf");
aHeader.Add("type", "report");
aHeader.Add("x-match", "all");
channel.QueueBind(queue: "queue.A",exchange: "agreements",routingKey: string.Empty,arguments: aHeader);

其中的x-match为特殊的header,可以为all则表示要匹配所有的header,如果为any则表示只要匹配其中的一个header即可。

在发布消息的时候就需要传入header值:

var properties = channel.CreateBasicProperties();
properties.Persistent = true;
Dictionary<string, object> mHeader1 = new Dictionary<string, object>();
mHeader1.Add("format", "pdf");
mHeader1.Add("type", "report");
properties.Headers = mHeader1;

 具体的规则可以看以下代码:

var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{// Headers类型的exchange, 名称 agreementschannel.ExchangeDeclare(exchange: "agreements",type: ExchangeType.Headers,durable: true,autoDelete: false,arguments: null);// 创建queue.A队列channel.QueueDeclare(queue: "queue.A", durable: true, exclusive: false, autoDelete: false, arguments: null);//创建 queue.B 队列channel.QueueDeclare(queue: "queue.B", durable: true, exclusive: false, autoDelete: false, arguments: null);//创建 queue.C 队列channel.QueueDeclare(queue: "queue.C", durable: true, exclusive: false, autoDelete: false, arguments: null);//绑定 agreements --> queue.A 使用arguments (format=pdf, type=report, x-match=all)Dictionary<string, object> aHeader = new Dictionary<string, object>();aHeader.Add("format", "pdf");aHeader.Add("type", "report");aHeader.Add("x-match", "all");channel.QueueBind(queue: "queue.A",exchange: "agreements",routingKey: string.Empty,arguments: aHeader);//绑定 agreements --> queue.B 使用arguments (format=pdf, type=log, x-match=any)Dictionary<string, object> bHeader = new Dictionary<string, object>();bHeader.Add("format", "pdf");bHeader.Add("type", "log");bHeader.Add("x-match", "any");channel.QueueBind(queue: "queue.B",exchange: "agreements",routingKey: string.Empty,arguments: bHeader);//绑定 agreements --> queue.C 使用arguments (format=zip, type=report, x-match=all)Dictionary<string, object> cHeader = new Dictionary<string, object>();cHeader.Add("format", "zip");cHeader.Add("type", "report");cHeader.Add("x-match", "all");channel.QueueBind(queue: "queue.C",exchange: "agreements",routingKey: string.Empty,arguments: cHeader);string message1 = "hello world";var body = Encoding.UTF8.GetBytes(message1);var properties = channel.CreateBasicProperties();properties.Persistent = true;Dictionary<string, object> mHeader1 = new Dictionary<string, object>();mHeader1.Add("format", "pdf");mHeader1.Add("type", "report");properties.Headers = mHeader1;//此消息路由到 queue.A 和 queue.B//queue.A 的binding (format=pdf, type=report, x-match=all)//queue.B 的binding (format = pdf, type = log, x - match = any)channel.BasicPublish(exchange: "agreements",routingKey: string.Empty,basicProperties: properties,body: body);string message2 = "hello world";body = Encoding.UTF8.GetBytes(message2);properties = channel.CreateBasicProperties();properties.Persistent = true;Dictionary<string, object> mHeader2 = new Dictionary<string, object>();mHeader2.Add("type", "log");properties.Headers = mHeader2;//x-match 配置queue.B //queue.B 的binding (format = pdf, type = log, x-match = any)channel.BasicPublish(exchange: "agreements",routingKey: string.Empty,basicProperties: properties,body: body);string message3= "hello world";body = Encoding.UTF8.GetBytes(message3);properties = channel.CreateBasicProperties();properties.Persistent = true;Dictionary<string, object> mHeader3 = new Dictionary<string, object>();mHeader3.Add("format", "zip");properties.Headers = mHeader3;//配置失败,不会被路由channel.BasicPublish(exchange: "agreements",routingKey: string.Empty,basicProperties: properties,body: body);}

 总计

以上就是exchange 类型的总结,一般来说direct和topic用来具体的路由消息,如果要用广播的消息一般用fanout的exchange。

header类型用的比较少,但还是知道一点好。

这篇关于MQ - RabbitMQ - 4种Exchange类型的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/949749

相关文章

RabbitMQ工作模式中的RPC通信模式详解

《RabbitMQ工作模式中的RPC通信模式详解》在RabbitMQ中,RPC模式通过消息队列实现远程调用功能,这篇文章给大家介绍RabbitMQ工作模式之RPC通信模式,感兴趣的朋友一起看看吧... 目录RPC通信模式概述工作流程代码案例引入依赖常量类编写客户端代码编写服务端代码RPC通信模式概述在R

C++ RabbitMq消息队列组件详解

《C++RabbitMq消息队列组件详解》:本文主要介绍C++RabbitMq消息队列组件的相关知识,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录1. RabbitMq介绍2. 安装RabbitMQ3. 安装 RabbitMQ 的 C++客户端库4. A

SpringCloud整合MQ实现消息总线服务方式

《SpringCloud整合MQ实现消息总线服务方式》:本文主要介绍SpringCloud整合MQ实现消息总线服务方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐... 目录一、背景介绍二、方案实践三、升级版总结一、背景介绍每当修改配置文件内容,如果需要客户端也同步更新,

MySQL 中查询 VARCHAR 类型 JSON 数据的问题记录

《MySQL中查询VARCHAR类型JSON数据的问题记录》在数据库设计中,有时我们会将JSON数据存储在VARCHAR或TEXT类型字段中,本文将详细介绍如何在MySQL中有效查询存储为V... 目录一、问题背景二、mysql jsON 函数2.1 常用 JSON 函数三、查询示例3.1 基本查询3.2

Pydantic中Optional 和Union类型的使用

《Pydantic中Optional和Union类型的使用》本文主要介绍了Pydantic中Optional和Union类型的使用,这两者在处理可选字段和多类型字段时尤为重要,文中通过示例代码介绍的... 目录简介Optional 类型Union 类型Optional 和 Union 的组合总结简介Pyd

Oracle数据库常见字段类型大全以及超详细解析

《Oracle数据库常见字段类型大全以及超详细解析》在Oracle数据库中查询特定表的字段个数通常需要使用SQL语句来完成,:本文主要介绍Oracle数据库常见字段类型大全以及超详细解析,文中通过... 目录前言一、字符类型(Character)1、CHAR:定长字符数据类型2、VARCHAR2:变长字符数

Python FastAPI+Celery+RabbitMQ实现分布式图片水印处理系统

《PythonFastAPI+Celery+RabbitMQ实现分布式图片水印处理系统》这篇文章主要为大家详细介绍了PythonFastAPI如何结合Celery以及RabbitMQ实现简单的分布式... 实现思路FastAPI 服务器Celery 任务队列RabbitMQ 作为消息代理定时任务处理完整

Spring Boot 配置文件之类型、加载顺序与最佳实践记录

《SpringBoot配置文件之类型、加载顺序与最佳实践记录》SpringBoot的配置文件是灵活且强大的工具,通过合理的配置管理,可以让应用开发和部署更加高效,无论是简单的属性配置,还是复杂... 目录Spring Boot 配置文件详解一、Spring Boot 配置文件类型1.1 applicatio

Python如何查看数据的类型

《Python如何查看数据的类型》:本文主要介绍Python如何查看数据的类型方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录python查看数据的类型1. 使用 type()2. 使用 isinstance()3. 检查对象的 __class__ 属性4.

Python容器类型之列表/字典/元组/集合方式

《Python容器类型之列表/字典/元组/集合方式》:本文主要介绍Python容器类型之列表/字典/元组/集合方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录1. 列表(List) - 有序可变序列1.1 基本特性1.2 核心操作1.3 应用场景2. 字典(D