RabbitMQ系列(五)RabbitMQ进阶-Queue队列特性 (三) 发布/订阅 模式(1)-直连交换机

本文主要是介绍RabbitMQ系列(五)RabbitMQ进阶-Queue队列特性 (三) 发布/订阅 模式(1)-直连交换机,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

RabbitMQ进阶-Queue队列特性 (三) 发布/订阅 模式(1)-直连交换机

文章目录

    • RabbitMQ进阶-Queue队列特性 (三) 发布/订阅 模式(1)-直连交换机
        • 1.发布/订阅 模式
          • 1.1 发布/订阅模式工作原理
        • 2.基本概念 交换机
        • 3.代码实战
          • 3.1 直连交换机Direct
            • 3.1.1 生产者
            • 3.1.2 消费者1
            • 3.1.3 消费者2
            • 3.1.4 执行结果
          • 3.2 相同的routingKey,绑定到不同的队列 结果
            • 3.2.1运行结果
          • 3.3 不同的routingKey,绑定到同一个队列 结果
            • 3.3.1 运行结果

1.发布/订阅 模式

发布/订阅模式就是以将消息发送给不同类型的消费者。做到发布一次,消费多个。下图是(RabbitMQ)的发布/订阅模式的图,可以满足不同的业务来处理不同的消息

乍一看,这不就是工作队列么,一个生产者,多个消费者???区别在哪里!!!
仔细看看 工作队列 RabbitMQ系列(四)RabbitMQ进阶-Queue队列特性 (二)工作队列 Work模式

看仔细喽,工作队列是一个队列,多个消费者
这个 发布订阅是 多个队列、多个队列可以对应多个消费者!中间还多个一个多了 exchange 交换机!!!

另外:
工作队列,两个消费者平均消费,能者多劳,但是消息只被消费1次
但是发布订阅的场景是 A队列需要接受到消息后,进行A业务的处理 ,B队列需要接收到消息后,进行B业务的处理

类似于一个员工离职了,财务部门收到离职消息要给他结算工资、账号部门接到离职消息要给他消除账号信息、党支部接到离职消息要给他转移组织关系,各个部门都接到消息后有自己的逻辑业务

1.1 发布/订阅模式工作原理

一个消费者将消息首先发送到交换器,交换器绑定到多个队列,然后被监听订阅该队列的消费者所接收并消费
对比 工作模式 可以看出来,我们当前的发布/订阅模式,多了 exchange 和多个队列,工作模式是1个队列绑定多个消费者,发布/订阅模式是 经过Exchange交换机后,绑定多个队列,多个消费者
在这里插入图片描述

2.基本概念 交换机

1. 发布者(producer)是发布消息的应用程序。
2. 队列(queue)用于消息存储的缓冲。
3. 消费者(consumer)是接收消息的应用程序。

RabbitMQ消息模型的核心理念是:发布者(producer)不会直接发送任何消息给队列。事实上,发布者(producer)甚至不知道消息是否已经被投递到队列。

发布者(producer)只需要把消息发送给一个交换机(exchange)。交换机非常简单,它一边从发布者方接收消息,一边把消息推送到队列。交换机必须知道如何处理它接收到的消息,是应该推送到指定的队列还是是多个队列,或者是直接忽略消息。这些规则是通过交换机类型(exchange type)来定义的。
  在这里插入图片描述
最新版本的RabbitMQ有四种交换机类型,分别是Direct exchange、Fanout exchange、Topic exchange、Headers exchange。

  • Direct Exchange

直连交换机。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。这是一个完整的匹配。
如果一个队列绑定到该交换机上要求路由键 “abc”,则只有被标记为“abc”的消息才被转发,不会转发abc.def,也不会转发dog.ghi,只会转发abc。
在这里插入图片描述

  • Fanout Exchange

扇形交换机。你只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout交换机转发消息是最快的。
在这里插入图片描述

  • Topic Exchange

主题交换机。将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号“#”匹配一个或多个词,符号“”匹配不多不少一个词。因此“abc.#”能够匹配到“abc.def.ghi”,但是“abc.” 只会匹配到“abc.def”。
在这里插入图片描述

  • Headers Exchanges

头交换机。很少使用,头交换机根据发送的消息内容中的headers属性进行匹配。头交换机类似与主题交换机,但是却和主题交换机有着很大的不同。主题交换机使用路由键来进行消息的路由,而头交换机使用消息属性来进行消息的分发,通过判断消息头的值能否与指定的绑定相匹配来确立路由规则

当消息发送到RabbitMQ时会取到该消息的headers与Exchange绑定时指定的键值对进行匹配;如果完全匹配则消息会路由到该队列,否则不会路由到该队列。headers属性是一个键值对,可以是Hashtable,键值对的值可以是任何类型。而fanout,direct,topic 的路由键都需要要字符串形式的。

匹配规则x-match有下列两种类型:

x-match = all :表示所有的键值对都匹配才能接受到消息

x-match = any :表示只要有键值对匹配就能接受到消息

3.代码实战

依旧是Maven项目,项目 pom还是参考之前 RabbitMQ系列(三)RabbitMQ进阶-Queue队列特性 (一)简单队列

新建交换机枚举类

package subscrib3;public enum ExchangeTypeEnum {DIRECT("exchange-direct-name", "direct"),FANOUT("exchange-fanout-name", "fanout"),TOPIC("exchange-topic-name", "topic"),HEADER("exchange-header-name", "headers"),UNKNOWN("unknown-exchange-name", "direct");/*** 交换机名字*/private String name;/*** 交换机类型*/private String type;ExchangeTypeEnum(String name, String type) {this.name = name;this.type = type;}public String getName() {return name;}public String getType() {return type;}public static ExchangeTypeEnum getEnum(String type) {ExchangeTypeEnum[] exchangeArrays = ExchangeTypeEnum.values();for (ExchangeTypeEnum exchange : exchangeArrays) {if (exchange.getName().equals(type)) {return exchange;}}return ExchangeTypeEnum.UNKNOWN;}}
3.1 直连交换机Direct

直连交换机就是让不同的Key精确的路由到不同的队列中,队列C的Key只有队列C可以接收到消息,队列D的key只有队列D可以收到消息,精确路由

3.1.1 生产者

在java包下 新建 subscribe3/direct包,然后在包下
新建 SubscribeConst

package subscrib3.direct;public class SubscribeConst {/*** 消息订阅队列 C*/public final static String SUBSCRIBE_QUEUE_NAME_DIRECT_C = "subscribe_queue_direct_C";/*** 消息订阅队列 D*/public final static String SUBSCRIBE_QUEUE_NAME_DIRECT_D = "subscribe_queue_direct_D";/*** 路由RoutingKey*/public final static String ROUTINGKEY_C = "rk_subscribe_queue_C";/*** 路由RoutingKey*/public final static String ROUTINGKEY_D = "rk_subscribe_queue_D";}

新建生产者 SubscribeProducerDirect

package subscrib3.direct;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import conn.MqConnectUtil;
import subscrib3.ExchangeTypeEnum;import java.time.LocalDate;
import java.time.LocalTime;import static subscrib3.direct.SubscribeConst.*;public class SubscribeProducerDirect {/*** 生产 Direct直连 交换机的MQ消息*/public static void produceDirectExchangeMessage() throws Exception {// 获取到连接以及mq通道Connection connection = MqConnectUtil.getConnectionDefault();// 从连接中创建通道Channel channel = connection.createChannel();/*声明 直连交换机 交换机 String exchange,* 参数明细* 1、交换机名称* 2、交换机类型,direct*/channel.exchangeDeclare(ExchangeTypeEnum.DIRECT.getName(), ExchangeTypeEnum.DIRECT.getType());/*声明队列* 参数明细:* 1、队列名称* 2、是否持久化* 3、是否独占此队列* 4、队列不用是否自动删除* 5、参数*/channel.queueDeclare(SUBSCRIBE_QUEUE_NAME_DIRECT_C, true, false, false, null);channel.queueDeclare(SUBSCRIBE_QUEUE_NAME_DIRECT_D, true, false, false, null);/*交换机和队列绑定String queue, String exchange, String routingKey* 参数明细* 1、队列名称* 2、交换机名称* 3、路由key rk.subscribe_queue_direct*/channel.queueBind(SUBSCRIBE_QUEUE_NAME_DIRECT_C, ExchangeTypeEnum.DIRECT.getName(), ROUTINGKEY_C);channel.queueBind(SUBSCRIBE_QUEUE_NAME_DIRECT_D, ExchangeTypeEnum.DIRECT.getName(), ROUTINGKEY_D);//定义消息内容(发布多条消息)String messageC = "Hello World! Time:" + LocalDate.now() + " " + LocalTime.now() + " id:C";/* 发送消息 String exchange, String routingKey, BasicProperties props, byte[] body* exchange - 交换机  DirectExchange* queuename - 队列信息* props - 参数信息* message 消息体 byte[]类型*/channel.basicPublish(ExchangeTypeEnum.DIRECT.getName(), ROUTINGKEY_C, null, messageC.getBytes());System.out.println("CCCCCCC ****  Producer  Sent Message: '" + messageC + "'");//        //定义消息内容(发布多条消息)String messageD = "Hello World! Time:" + LocalDate.now() + " " + LocalTime.now() + " id:D";/* 发送消息 String exchange, String routingKey, BasicProperties props, byte[] body* exchange - 交换机  DirectExchange* queuename - 队列信息* props - 参数信息* message 消息体 byte[]类型*/channel.basicPublish(ExchangeTypeEnum.DIRECT.getName(), ROUTINGKEY_D, null, messageD.getBytes());System.out.println("DDDDD ****  Producer  Sent Message: '" + messageD + "'");//关闭通道和连接channel.close();connection.close();}public static void main(String[] argv) throws Exception {//生产 10条 DirectExchange 的队列消息for (int i = 0; i < 10; i++) {produceDirectExchangeMessage();}}
}
3.1.2 消费者1

消费者1绑定 subscribe_queue_direct_C 队列

package subscrib3.direct;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import conn.MqConnectUtil;
import subscrib3.ExchangeTypeEnum;import static subscrib3.direct.SubscribeConst.ROUTINGKEY_C;
import static subscrib3.direct.SubscribeConst.SUBSCRIBE_QUEUE_NAME_DIRECT_C;public class SubscribeQueueConsumerDirect1 {public static void main(String[] argv) throws Exception {Connection connection = null;Channel channel = null;try {connection = MqConnectUtil.getConnectionDefault();channel = connection.createChannel();/*声明交换机 String exchange* 参数明细* 1、交换机名称* 2、交换机类型,fanout、topic、direct、headers*/channel.exchangeDeclare(ExchangeTypeEnum.DIRECT.getName(), ExchangeTypeEnum.DIRECT.getType());/*声明队列* 参数明细:* 1、队列名称* 2、是否持久化* 3、是否独占此队列* 4、队列不用是否自动删除* 5、参数*/channel.queueDeclare(SUBSCRIBE_QUEUE_NAME_DIRECT_C, true, false, false, null);//交换机和队列绑定String queue, String exchange, String routingKey/*** 参数明细* 1、队列名称* 2、交换机名称* 3、路由key*/channel.queueBind(SUBSCRIBE_QUEUE_NAME_DIRECT_C, ExchangeTypeEnum.DIRECT.getName(), ROUTINGKEY_C);System.out.println(" **** Consumer->1 Waiting for messages. To exit press CTRL+C");QueueingConsumer consumer = new QueueingConsumer(channel);/* 消息确认机制* autoAck true:表示自动确认,只要消息从队列中获取,无论消费者获取到消息后是否成功消费,都会认为消息已经成功消费* autoAck false:表示手动确认,消费者获取消息后,服务器会将该消息标记为不可用状态,等待消费者的反馈,如果消费者一直没有反馈,那么该消息将一直处于不可用状态*          并且服务器会认为该消费者已经挂掉,不会再给其发送消息,直到该消费者反馈*          !!!!!! 注意这里是 false,手动确认*/channel.basicConsume(SUBSCRIBE_QUEUE_NAME_DIRECT_C, false, consumer);int count = 0;while (count < 10) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();String message = new String(delivery.getBody());System.out.println(" count:" + count + " **** Consumer->2 Received '" + message + "'");doSomeThing(message);//返回确认状态channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);count++;}} catch (Exception e) {e.printStackTrace();} finally {channel.close();connection.close();}}/*** 模拟处理复杂逻辑:休眠100ms** @param message* @throws Exception*/public static void doSomeThing(String message) throws Exception {//遍历Count ,sleep , 接收一条消息后休眠 100 毫秒,模仿复杂逻辑Thread.sleep(100);}
}
3.1.3 消费者2

消费者2 绑定 subscribe_queue_direct_D 队列

package subscrib3.direct;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import conn.MqConnectUtil;
import subscrib3.ExchangeTypeEnum;import java.io.IOException;import static subscrib3.direct.SubscribeConst.ROUTINGKEY_D;
import static subscrib3.direct.SubscribeConst.SUBSCRIBE_QUEUE_NAME_DIRECT_D;public class SubscribeQueueConsumerDirect2 {public static void main(String[] argv) throws IOException {Connection connection = null;Channel channel = null;try {connection = MqConnectUtil.getConnectionDefault();channel = connection.createChannel();/*声明交换机 String exchange* 参数明细* 1、交换机名称* 2、交换机类型,fanout、topic、direct、headers*/channel.exchangeDeclare(ExchangeTypeEnum.DIRECT.getName(), ExchangeTypeEnum.DIRECT.getType());/*声明队列* 参数明细:* 1、队列名称* 2、是否持久化* 3、是否独占此队列* 4、队列不用是否自动删除* 5、参数*/channel.queueDeclare(SUBSCRIBE_QUEUE_NAME_DIRECT_D, true, false, false, null);//交换机和队列绑定String queue, String exchange, String routingKey/*** 参数明细* 1、队列名称* 2、交换机名称* 3、路由key*/channel.queueBind(SUBSCRIBE_QUEUE_NAME_DIRECT_D, ExchangeTypeEnum.DIRECT.getName(), ROUTINGKEY_D);System.out.println(" **** Consumer->2 Waiting for messages. To exit press CTRL+C");QueueingConsumer consumer = new QueueingConsumer(channel);/* 消息确认机制* autoAck true:表示自动确认,只要消息从队列中获取,无论消费者获取到消息后是否成功消费,都会认为消息已经成功消费* autoAck false:表示手动确认,消费者获取消息后,服务器会将该消息标记为不可用状态,等待消费者的反馈,如果消费者一直没有反馈,那么该消息将一直处于不可用状态*          并且服务器会认为该消费者已经挂掉,不会再给其发送消息,直到该消费者反馈*          !!!!!! 注意这里是 false,手动确认*/channel.basicConsume(SUBSCRIBE_QUEUE_NAME_DIRECT_D, false, consumer);int count = 0;while (count < 10) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();String message = new String(delivery.getBody());System.out.println(" count:" + count + " **** Consumer->2 Received '" + message + "'");doSomeThing(message);//返回确认状态channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);count++;}} catch (Exception e) {e.printStackTrace();} finally {channel.close();connection.close();}}/*** 模拟处理复杂逻辑:休眠100ms** @param message* @throws Exception*/public static void doSomeThing(String message) throws Exception {//遍历Count ,sleep , 接收一条消息后休眠 100 毫秒,模仿复杂逻辑Thread.sleep(100);}
}
3.1.4 执行结果

启动上生产者,生产10条消息,先看下 交换机及交换机绑定
exchange-direct-name 直连交换机通过 routingkey rk.subscribe_queue_direct 绑定了两个队列C和D
在这里插入图片描述
生产10条消息,分别设置不同的routingkey,可以看到 队列C和队列D各有10条,消息分别被分配到了C和D
在这里插入图片描述

队列中各10条数据

启动消费者C/D进行消费,消费完毕,队列消息清零
在这里插入图片描述
队列清零
在这里插入图片描述


到此我们完成了 通过RoutingKey去区分消费不同的消费者的消息订阅模式

3.2 相同的routingKey,绑定到不同的队列 结果

下面我们测试下,我们新建两个不同的队列,使用相同的routingKey,看下是不是每个队列都能消费到该消息,还是说竞争消费到同一条消息,C消费了D就不消费了
SameQueueConst.java

package subscrib3.same;public class SameQueueConst {/*** 消息订阅队列 C*/public final static String SAME_SUBSCRIBE_QUEUE_NAME_DIRECT_C = "same_subscribe_queue_direct_C";/*** 消息订阅队列 D*/public final static String SAME_SUBSCRIBE_QUEUE_NAME_DIRECT_D = "same_subscribe_queue_direct_D";/*** 路由RoutingKey*/public final static String SAME_ROUTINGKEY= "rk_same_subscribe_queue";}

生产者 SubscribeProducerDirect.java

package subscrib3.same;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import conn.MqConnectUtil;
import subscrib3.ExchangeTypeEnum;import java.time.LocalDate;
import java.time.LocalTime;import static subscrib3.direct.SubscribeConst.ROUTINGKEY_C;
import static subscrib3.direct.SubscribeConst.ROUTINGKEY_D;
import static subscrib3.same.SameQueueConst.*;public class SameSubscribeProducerDirect {/*** 生产 Direct直连 交换机的MQ消息*/public static void produceDirectExchangeMessage() throws Exception {// 获取到连接以及mq通道Connection connection = MqConnectUtil.getConnectionDefault();// 从连接中创建通道Channel channel = connection.createChannel();/*声明 直连交换机 交换机 String exchange,* 参数明细* 1、交换机名称* 2、交换机类型,direct*/channel.exchangeDeclare(ExchangeTypeEnum.DIRECT.getName(), ExchangeTypeEnum.DIRECT.getType());/*声明队列* 参数明细:* 1、队列名称* 2、是否持久化* 3、是否独占此队列* 4、队列不用是否自动删除* 5、参数*/channel.queueDeclare(SAME_SUBSCRIBE_QUEUE_NAME_DIRECT_C, true, false, false, null);channel.queueDeclare(SAME_SUBSCRIBE_QUEUE_NAME_DIRECT_D, true, false, false, null);/*交换机和队列绑定String queue, String exchange, String routingKey* 参数明细* 1、队列名称* 2、交换机名称* 3、路由key rk.subscribe_queue_direct*/channel.queueBind(SAME_SUBSCRIBE_QUEUE_NAME_DIRECT_C, ExchangeTypeEnum.DIRECT.getName(), SAME_ROUTINGKEY);channel.queueBind(SAME_SUBSCRIBE_QUEUE_NAME_DIRECT_D, ExchangeTypeEnum.DIRECT.getName(), SAME_ROUTINGKEY);for (int i = 0; i < 10; i++) {//定义消息内容(发布多条消息)String messageC = "id=" + i + " Hello World! Time:" + LocalDate.now() + " " + LocalTime.now();/* 发送消息 String exchange, String routingKey, BasicProperties props, byte[] body* exchange - 交换机  DirectExchange* queuename - 队列信息* props - 参数信息* message 消息体 byte[]类型*/channel.basicPublish(ExchangeTypeEnum.DIRECT.getName(), SAME_ROUTINGKEY, null, messageC.getBytes());System.out.println("****  Producer  Sent Message: '" + messageC + "'");}//关闭通道和连接channel.close();connection.close();}public static void main(String[] argv) throws Exception {//生产 10条 DirectExchange 的队列消息produceDirectExchangeMessage();}
}

消费者1 SameSubscribeQueueConsumerDirect1.java

package subscrib3.same;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import conn.MqConnectUtil;
import subscrib3.ExchangeTypeEnum;import static subscrib3.same.SameQueueConst.SAME_ROUTINGKEY;
import static subscrib3.same.SameQueueConst.SAME_SUBSCRIBE_QUEUE_NAME_DIRECT_C;public class SameSubscribeQueueConsumerDirect1 {public static void main(String[] argv) throws Exception {Connection connection = null;Channel channel = null;try {connection = MqConnectUtil.getConnectionDefault();channel = connection.createChannel();/*声明交换机 String exchange* 参数明细* 1、交换机名称* 2、交换机类型,fanout、topic、direct、headers*/channel.exchangeDeclare(ExchangeTypeEnum.DIRECT.getName(), ExchangeTypeEnum.DIRECT.getType());/*声明队列* 参数明细:* 1、队列名称* 2、是否持久化* 3、是否独占此队列* 4、队列不用是否自动删除* 5、参数*/channel.queueDeclare(SAME_SUBSCRIBE_QUEUE_NAME_DIRECT_C, true, false, false, null);//交换机和队列绑定String queue, String exchange, String routingKey/*** 参数明细* 1、队列名称* 2、交换机名称* 3、路由key*/channel.queueBind(SAME_SUBSCRIBE_QUEUE_NAME_DIRECT_C, ExchangeTypeEnum.DIRECT.getName(), SAME_ROUTINGKEY);System.out.println(" **** Consumer->1 Waiting for messages. To exit press CTRL+C");QueueingConsumer consumer = new QueueingConsumer(channel);/* 消息确认机制* autoAck true:表示自动确认,只要消息从队列中获取,无论消费者获取到消息后是否成功消费,都会认为消息已经成功消费* autoAck false:表示手动确认,消费者获取消息后,服务器会将该消息标记为不可用状态,等待消费者的反馈,如果消费者一直没有反馈,那么该消息将一直处于不可用状态*          并且服务器会认为该消费者已经挂掉,不会再给其发送消息,直到该消费者反馈*          !!!!!! 注意这里是 false,手动确认*/channel.basicConsume(SAME_SUBSCRIBE_QUEUE_NAME_DIRECT_C, false, consumer);int count = 0;while (count < 10) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();String message = new String(delivery.getBody());System.out.println(" count:" + count + " **** Consumer->2 Received '" + message + "'");doSomeThing(message);//返回确认状态channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);count++;}} catch (Exception e) {e.printStackTrace();} finally {channel.close();connection.close();}}/*** 模拟处理复杂逻辑:休眠100ms** @param message* @throws Exception*/public static void doSomeThing(String message) throws Exception {//遍历Count ,sleep , 接收一条消息后休眠 100 毫秒,模仿复杂逻辑Thread.sleep(100);}
}

消费者2 SameSubscribeQueueConsumerDirect2.java

package subscrib3.same;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import conn.MqConnectUtil;
import subscrib3.ExchangeTypeEnum;import java.io.IOException;import static subscrib3.same.SameQueueConst.SAME_ROUTINGKEY;
import static subscrib3.same.SameQueueConst.SAME_SUBSCRIBE_QUEUE_NAME_DIRECT_D;public class SameSubscribeQueueConsumerDirect2 {public static void main(String[] argv) throws IOException {Connection connection = null;Channel channel = null;try {connection = MqConnectUtil.getConnectionDefault();channel = connection.createChannel();/*声明交换机 String exchange* 参数明细* 1、交换机名称* 2、交换机类型,fanout、topic、direct、headers*/channel.exchangeDeclare(ExchangeTypeEnum.DIRECT.getName(), ExchangeTypeEnum.DIRECT.getType());/*声明队列* 参数明细:* 1、队列名称* 2、是否持久化* 3、是否独占此队列* 4、队列不用是否自动删除* 5、参数*/channel.queueDeclare(SAME_SUBSCRIBE_QUEUE_NAME_DIRECT_D, true, false, false, null);//交换机和队列绑定String queue, String exchange, String routingKey/*** 参数明细* 1、队列名称* 2、交换机名称* 3、路由key*/channel.queueBind(SAME_SUBSCRIBE_QUEUE_NAME_DIRECT_D, ExchangeTypeEnum.DIRECT.getName(), SAME_ROUTINGKEY);System.out.println(" **** Consumer->2 Waiting for messages. To exit press CTRL+C");QueueingConsumer consumer = new QueueingConsumer(channel);/* 消息确认机制* autoAck true:表示自动确认,只要消息从队列中获取,无论消费者获取到消息后是否成功消费,都会认为消息已经成功消费* autoAck false:表示手动确认,消费者获取消息后,服务器会将该消息标记为不可用状态,等待消费者的反馈,如果消费者一直没有反馈,那么该消息将一直处于不可用状态*          并且服务器会认为该消费者已经挂掉,不会再给其发送消息,直到该消费者反馈*          !!!!!! 注意这里是 false,手动确认*/channel.basicConsume(SAME_SUBSCRIBE_QUEUE_NAME_DIRECT_D, false, consumer);int count = 0;while (count < 10) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();String message = new String(delivery.getBody());System.out.println(" count:" + count + " **** Consumer->2 Received '" + message + "'");doSomeThing(message);//返回确认状态channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);count++;}} catch (Exception e) {e.printStackTrace();} finally {channel.close();connection.close();}}/*** 模拟处理复杂逻辑:休眠100ms** @param message* @throws Exception*/public static void doSomeThing(String message) throws Exception {//遍历Count ,sleep , 接收一条消息后休眠 100 毫秒,模仿复杂逻辑Thread.sleep(100);}
}
3.2.1运行结果

生产10条 ,C和D全都收到10条
在这里插入图片描述

运行消费者C和D,消费完毕
在这里插入图片描述
由此可以看出,多个队列绑定同一个直连exchange的key,每个消费者都会消费这条消息
类似于广播fanout

3.3 不同的routingKey,绑定到同一个队列 结果

下面我们把上面的代码改下,新建1个队列,绑定两个Routingkey,看看发往 两个routingKey的消息,当前队列是否会收到两个key的,还是说key只能绑定一个,另一个会被覆盖,不会接受消息

3.3.1 运行结果

在这里插入图片描述

开启消费者
每个id有两条消息被消费,消费完毕,队列清零
在这里插入图片描述
看下队列绑定关系
该队列绑定多个Routingkey和Exchange,说明一个队列可以同时绑定多个不同的Routingkey,Routingkey之间互相不影响
在这里插入图片描述

!!! 特别注意一下 channel和connect 建立完毕后要 close,如果不close,会出现意想不到的效果
比如我的消费者代码里, while(count < 10 ) 换成 while(true) 一直循环等待消息消费,然后把 channel及connect的close代码删掉
这样你循环等待消费消息时候,链接是保持的,这样下次再运行 生产者,生产10条消息,channel的链接是保持的,这样就会出现部分消息丢失、并不是10条消息全都分配到C队列中的情况
!!!!

下篇我们介绍 RabbitMQ系列(六)RabbitMQ进阶-Queue队列特性 (三) 发布/订阅 模式(2)-扇形交换机

这篇关于RabbitMQ系列(五)RabbitMQ进阶-Queue队列特性 (三) 发布/订阅 模式(1)-直连交换机的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/965435

相关文章

Redis Cluster模式配置

《RedisCluster模式配置》:本文主要介绍RedisCluster模式配置,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录分片 一、分片的本质与核心价值二、分片实现方案对比 ‌三、分片算法详解1. ‌范围分片(顺序分片)‌2. ‌哈希分片3. ‌虚

从基础到进阶详解Pandas时间数据处理指南

《从基础到进阶详解Pandas时间数据处理指南》Pandas构建了完整的时间数据处理生态,核心由四个基础类构成,Timestamp,DatetimeIndex,Period和Timedelta,下面我... 目录1. 时间数据类型与基础操作1.1 核心时间对象体系1.2 时间数据生成技巧2. 时间索引与数据

Java中常见队列举例详解(非线程安全)

《Java中常见队列举例详解(非线程安全)》队列用于模拟队列这种数据结构,队列通常是指先进先出的容器,:本文主要介绍Java中常见队列(非线程安全)的相关资料,文中通过代码介绍的非常详细,需要的朋... 目录一.队列定义 二.常见接口 三.常见实现类3.1 ArrayDeque3.1.1 实现原理3.1.2

RabbitMQ工作模式中的RPC通信模式详解

《RabbitMQ工作模式中的RPC通信模式详解》在RabbitMQ中,RPC模式通过消息队列实现远程调用功能,这篇文章给大家介绍RabbitMQ工作模式之RPC通信模式,感兴趣的朋友一起看看吧... 目录RPC通信模式概述工作流程代码案例引入依赖常量类编写客户端代码编写服务端代码RPC通信模式概述在R

C++ RabbitMq消息队列组件详解

《C++RabbitMq消息队列组件详解》:本文主要介绍C++RabbitMq消息队列组件的相关知识,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录1. RabbitMq介绍2. 安装RabbitMQ3. 安装 RabbitMQ 的 C++客户端库4. A

golang实现延迟队列(delay queue)的两种实现

《golang实现延迟队列(delayqueue)的两种实现》本文主要介绍了golang实现延迟队列(delayqueue)的两种实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的... 目录1 延迟队列:邮件提醒、订单自动取消2 实现2.1 simplChina编程e简单版:go自带的time

JDK9到JDK21中值得掌握的29个实用特性分享

《JDK9到JDK21中值得掌握的29个实用特性分享》Java的演进节奏从JDK9开始显著加快,每半年一个新版本的发布节奏为Java带来了大量的新特性,本文整理了29个JDK9到JDK21中值得掌握的... 目录JDK 9 模块化与API增强1. 集合工厂方法:一行代码创建不可变集合2. 私有接口方法:接口

SQL Server身份验证模式步骤和示例代码

《SQLServer身份验证模式步骤和示例代码》SQLServer是一个广泛使用的关系数据库管理系统,通常使用两种身份验证模式:Windows身份验证和SQLServer身份验证,本文将详细介绍身份... 目录身份验证方式的概念更改身份验证方式的步骤方法一:使用SQL Server Management S

C#特性(Attributes)和反射(Reflection)详解

《C#特性(Attributes)和反射(Reflection)详解》:本文主要介绍C#特性(Attributes)和反射(Reflection),具有很好的参考价值,希望对大家有所帮助,如有错误... 目录特性特性的定义概念目的反射定义概念目的反射的主要功能包括使用反射的基本步骤特性和反射的关系总结特性

PyTorch高级特性与性能优化方式

《PyTorch高级特性与性能优化方式》:本文主要介绍PyTorch高级特性与性能优化方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、自动化机制1.自动微分机制2.动态计算图二、性能优化1.内存管理2.GPU加速3.多GPU训练三、分布式训练1.分布式数据