MQ - RabbitMQ - 4种Exchange类型

2024-04-30 19:38
文章标签 类型 rabbitmq exchange mq

本文主要是介绍MQ - RabbitMQ - 4种Exchange类型,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

 

但在具体的使用中,我们还需知道exchange的类型,因为不同的类型对应不同的队列和路由规则。

在rabbitmq中,exchange有4个类型:direct,topic,fanout,header。

direct exchange

此类型的exchange路由规则很简单:

exchange在和queue进行binding时会设置routingkey

channel.QueueBind(queue: "create_pdf_queue",exchange: "pdf_events",routingKey: "pdf_create",arguments: null);

 然后我们在将消息发送到exchange时会设置对应的routingkey:

channel.BasicPublish(exchange: "pdf_events",routingKey: "pdf_create",basicProperties: properties,body: body);

 在direct类型的exchange中,只有这两个routingkey完全相同,exchange才会选择对应的binging进行消息路由。

具体的流程如下:

通过代码可以会理解好一点:

var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{// Direct类型的exchange, 名称 pdf_eventschannel.ExchangeDeclare(exchange: "pdf_events",type: ExchangeType.Direct,durable: true,autoDelete: false,arguments: null);// 创建create_pdf_queue队列channel.QueueDeclare(queue: "create_pdf_queue",durable: true,exclusive: false,autoDelete: false,arguments: null);//创建 pdf_log_queue队列channel.QueueDeclare(queue: "pdf_log_queue",durable: true,exclusive: false,autoDelete: false,arguments: null);//绑定 pdf_events --> create_pdf_queue 使用routingkey:pdf_createchannel.QueueBind(queue: "create_pdf_queue",exchange: "pdf_events",routingKey: "pdf_create",arguments: null);//绑定 pdf_events --> pdf_log_queue 使用routingkey:pdf_logchannel.QueueBind(queue: "pdf_log_queue",exchange: "pdf_events",routingKey: "pdf_log",arguments: null);var message = "Demo some pdf creating...";var body = Encoding.UTF8.GetBytes(message);var properties = channel.CreateBasicProperties();properties.Persistent = true;//发送消息到exchange :pdf_events ,使用routingkey: pdf_create//通过binding routinekey的比较,次消息会路由到队列 create_pdf_queuechannel.BasicPublish(exchange: "pdf_events",routingKey: "pdf_create",basicProperties: properties,body: body);message = "pdf loging ...";body = Encoding.UTF8.GetBytes(message);properties = channel.CreateBasicProperties();properties.Persistent = true;//发送消息到exchange :pdf_events ,使用routingkey: pdf_log//通过binding routinekey的比较,次消息会路由到队列 pdf_log_queuechannel.BasicPublish(exchange: "pdf_events",routingKey: "pdf_log",basicProperties: properties,body: body);}

 topic exchange

此类型exchange和上面的direct类型差不多,但direct类型要求routingkey完全相等,这里的routingkey可以有通配符:'*','#'.

其中'*'表示匹配一个单词, '#'则表示匹配没有或者多个单词

如上图第一个binding:

  • exchange: agreements
  • queue A:  berlin_agreements
  • binding routingkey: agreements.eu.berlin.#

第二个binding: 

  • exchange: agreements
  • queue B: all_agreements
  • binding routingkey: agreements.#

第三个binding:

  • exchange: agreements
  • queue c: headstore_agreements
  • binding routingkey: agreements.eu.*.headstore

所以如果我们消息的routingkey为agreements.eu.berlin那么符合第一和第二个binding,但最后一个不符合,具体的代码如下:

var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{// Topic类型的exchange, 名称 agreementschannel.ExchangeDeclare(exchange: "agreements",type: ExchangeType.Topic,durable: true,autoDelete: false,arguments: null);// 创建berlin_agreements队列channel.QueueDeclare(queue: "berlin_agreements",durable: true,exclusive: false,autoDelete: false,arguments: null);//创建 all_agreements 队列channel.QueueDeclare(queue: "all_agreements",durable: true,exclusive: false,autoDelete: false,arguments: null);//创建 headstore_agreements 队列channel.QueueDeclare(queue: "headstore_agreements",durable: true,exclusive: false,autoDelete: false,arguments: null);//绑定 agreements --> berlin_agreements 使用routingkey:agreements.eu.berlin.#channel.QueueBind(queue: "berlin_agreements",exchange: "agreements",routingKey: "agreements.eu.berlin.#",arguments: null);//绑定 agreements --> all_agreements 使用routingkey:agreements.#channel.QueueBind(queue: "all_agreements",exchange: "agreements",routingKey: "agreements.#",arguments: null);//绑定 agreements --> headstore_agreements 使用routingkey:agreements.eu.*.headstorechannel.QueueBind(queue: "headstore_agreements",exchange: "agreements",routingKey: "agreements.eu.*.headstore",arguments: null);var message = "hello world";var body = Encoding.UTF8.GetBytes(message);var properties = channel.CreateBasicProperties();properties.Persistent = true;//发送消息到exchange :agreements ,使用routingkey: agreements.eu.berlin//agreements.eu.berlin 匹配  agreements.eu.berlin.# 和agreements.#//agreements.eu.berlin 不匹配  agreements.eu.*.headstore//最终次消息会路由到队里:berlin_agreements(agreements.eu.berlin.#) 和 all_agreements(agreements.#)channel.BasicPublish(exchange: "agreements",routingKey: "agreements.eu.berlin",basicProperties: properties,body: body);}

 fanout exchange

此exchange的路由规则很简单直接将消息路由到所有绑定的队列中,无须对消息的routingkey进行匹配操作。

header exchange

 此类型的exchange和以上三个都不一样,其路由的规则是根据header来判断,其中的header就是以下方法的arguments参数:

Dictionary<string, object> aHeader = new Dictionary<string, object>();
aHeader.Add("format", "pdf");
aHeader.Add("type", "report");
aHeader.Add("x-match", "all");
channel.QueueBind(queue: "queue.A",exchange: "agreements",routingKey: string.Empty,arguments: aHeader);

其中的x-match为特殊的header,可以为all则表示要匹配所有的header,如果为any则表示只要匹配其中的一个header即可。

在发布消息的时候就需要传入header值:

var properties = channel.CreateBasicProperties();
properties.Persistent = true;
Dictionary<string, object> mHeader1 = new Dictionary<string, object>();
mHeader1.Add("format", "pdf");
mHeader1.Add("type", "report");
properties.Headers = mHeader1;

 具体的规则可以看以下代码:

var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{// Headers类型的exchange, 名称 agreementschannel.ExchangeDeclare(exchange: "agreements",type: ExchangeType.Headers,durable: true,autoDelete: false,arguments: null);// 创建queue.A队列channel.QueueDeclare(queue: "queue.A", durable: true, exclusive: false, autoDelete: false, arguments: null);//创建 queue.B 队列channel.QueueDeclare(queue: "queue.B", durable: true, exclusive: false, autoDelete: false, arguments: null);//创建 queue.C 队列channel.QueueDeclare(queue: "queue.C", durable: true, exclusive: false, autoDelete: false, arguments: null);//绑定 agreements --> queue.A 使用arguments (format=pdf, type=report, x-match=all)Dictionary<string, object> aHeader = new Dictionary<string, object>();aHeader.Add("format", "pdf");aHeader.Add("type", "report");aHeader.Add("x-match", "all");channel.QueueBind(queue: "queue.A",exchange: "agreements",routingKey: string.Empty,arguments: aHeader);//绑定 agreements --> queue.B 使用arguments (format=pdf, type=log, x-match=any)Dictionary<string, object> bHeader = new Dictionary<string, object>();bHeader.Add("format", "pdf");bHeader.Add("type", "log");bHeader.Add("x-match", "any");channel.QueueBind(queue: "queue.B",exchange: "agreements",routingKey: string.Empty,arguments: bHeader);//绑定 agreements --> queue.C 使用arguments (format=zip, type=report, x-match=all)Dictionary<string, object> cHeader = new Dictionary<string, object>();cHeader.Add("format", "zip");cHeader.Add("type", "report");cHeader.Add("x-match", "all");channel.QueueBind(queue: "queue.C",exchange: "agreements",routingKey: string.Empty,arguments: cHeader);string message1 = "hello world";var body = Encoding.UTF8.GetBytes(message1);var properties = channel.CreateBasicProperties();properties.Persistent = true;Dictionary<string, object> mHeader1 = new Dictionary<string, object>();mHeader1.Add("format", "pdf");mHeader1.Add("type", "report");properties.Headers = mHeader1;//此消息路由到 queue.A 和 queue.B//queue.A 的binding (format=pdf, type=report, x-match=all)//queue.B 的binding (format = pdf, type = log, x - match = any)channel.BasicPublish(exchange: "agreements",routingKey: string.Empty,basicProperties: properties,body: body);string message2 = "hello world";body = Encoding.UTF8.GetBytes(message2);properties = channel.CreateBasicProperties();properties.Persistent = true;Dictionary<string, object> mHeader2 = new Dictionary<string, object>();mHeader2.Add("type", "log");properties.Headers = mHeader2;//x-match 配置queue.B //queue.B 的binding (format = pdf, type = log, x-match = any)channel.BasicPublish(exchange: "agreements",routingKey: string.Empty,basicProperties: properties,body: body);string message3= "hello world";body = Encoding.UTF8.GetBytes(message3);properties = channel.CreateBasicProperties();properties.Persistent = true;Dictionary<string, object> mHeader3 = new Dictionary<string, object>();mHeader3.Add("format", "zip");properties.Headers = mHeader3;//配置失败,不会被路由channel.BasicPublish(exchange: "agreements",routingKey: string.Empty,basicProperties: properties,body: body);}

 总计

以上就是exchange 类型的总结,一般来说direct和topic用来具体的路由消息,如果要用广播的消息一般用fanout的exchange。

header类型用的比较少,但还是知道一点好。

这篇关于MQ - RabbitMQ - 4种Exchange类型的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/949749

相关文章

MyBatis的xml中字符串类型判空与非字符串类型判空处理方式(最新整理)

《MyBatis的xml中字符串类型判空与非字符串类型判空处理方式(最新整理)》本文给大家介绍MyBatis的xml中字符串类型判空与非字符串类型判空处理方式,本文给大家介绍的非常详细,对大家的学习或... 目录完整 Hutool 写法版本对比优化为什么status变成Long?为什么 price 没事?怎

C#之枚举类型与随机数详解

《C#之枚举类型与随机数详解》文章讲解了枚举类型的定义与使用方法,包括在main外部声明枚举,用于表示游戏状态和周几状态,枚举值默认从0开始递增,也可手动设置初始值以生成随机数... 目录枚举类型1.定义枚举类型(main外)2.使用生成随机数总结枚举类型1.定义枚举类型(main外)enum 类型名字

Python lambda函数(匿名函数)、参数类型与递归全解析

《Pythonlambda函数(匿名函数)、参数类型与递归全解析》本文详解Python中lambda匿名函数、灵活参数类型和递归函数三大进阶特性,分别介绍其定义、应用场景及注意事项,助力编写简洁高效... 目录一、lambda 匿名函数:简洁的单行函数1. lambda 的定义与基本用法2. lambda

C语言自定义类型之联合和枚举解读

《C语言自定义类型之联合和枚举解读》联合体共享内存,大小由最大成员决定,遵循对齐规则;枚举类型列举可能值,提升可读性和类型安全性,两者在C语言中用于优化内存和程序效率... 目录一、联合体1.1 联合体类型的声明1.2 联合体的特点1.2.1 特点11.2.2 特点21.2.3 特点31.3 联合体的大小1

MySQL 索引简介及常见的索引类型有哪些

《MySQL索引简介及常见的索引类型有哪些》MySQL索引是加速数据检索的特殊结构,用于存储列值与位置信息,常见的索引类型包括:主键索引、唯一索引、普通索引、复合索引、全文索引和空间索引等,本文介绍... 目录什么是 mysql 的索引?常见的索引类型有哪些?总结性回答详细解释1. MySQL 索引的概念2

RabbitMQ消费端单线程与多线程案例讲解

《RabbitMQ消费端单线程与多线程案例讲解》文章解析RabbitMQ消费端单线程与多线程处理机制,说明concurrency控制消费者数量,max-concurrency控制最大线程数,prefe... 目录 一、基础概念详细解释:举个例子:✅ 单消费者 + 单线程消费❌ 单消费者 + 多线程消费❌ 多

Java获取当前时间String类型和Date类型方式

《Java获取当前时间String类型和Date类型方式》:本文主要介绍Java获取当前时间String类型和Date类型方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,... 目录Java获取当前时间String和Date类型String类型和Date类型输出结果总结Java获取

SpringBoot改造MCP服务器的详细说明(StreamableHTTP 类型)

《SpringBoot改造MCP服务器的详细说明(StreamableHTTP类型)》本文介绍了SpringBoot如何实现MCPStreamableHTTP服务器,并且使用CherryStudio... 目录SpringBoot改造MCP服务器(StreamableHTTP)1 项目说明2 使用说明2.1

RabbitMQ消息总线方式刷新配置服务全过程

《RabbitMQ消息总线方式刷新配置服务全过程》SpringCloudBus通过消息总线与MQ实现微服务配置统一刷新,结合GitWebhooks自动触发更新,避免手动重启,提升效率与可靠性,适用于配... 目录前言介绍环境准备代码示例测试验证总结前言介绍在微服务架构中,为了更方便的向微服务实例广播消息,

在Spring Boot中集成RabbitMQ的实战记录

《在SpringBoot中集成RabbitMQ的实战记录》本文介绍SpringBoot集成RabbitMQ的步骤,涵盖配置连接、消息发送与接收,并对比两种定义Exchange与队列的方式:手动声明(... 目录前言准备工作1. 安装 RabbitMQ2. 消息发送者(Producer)配置1. 创建 Spr