RabbitMQ二、RabbitMQ的六种模式

2024-06-01 14:44
文章标签 模式 rabbitmq 六种

本文主要是介绍RabbitMQ二、RabbitMQ的六种模式,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一、RabbitMQ的六种模式

  1. RabbitMQ共有六种工作模式:
    • 简单模式(Simple)
    • 工作队列模式(Work Queue)
    • 发布订阅模式(Publish/Subscribe)
    • 路由模式(Routing)
    • 通配符模式(Topics)
    • 远程调用模式(RPC,不常用,不对此模式进行讲解)

1、RabbitMQ的简单模式

在这里插入图片描述

  1. rabbitmq简单模式的特点:
      1. 一个生产者对应一个消费者,通过队列进行消息传递。
      1. 该模式使用direct交换机,direct交换机是RabbitMQ默认交换机

JMS

  1. 由于MQ产品很多,操作方式各有不同,于是JAVA提供了一套规则——JMS,用于操作消息中间件。
  2. JMS即Java消息服务(JavaMessage Service)应用程序接口,是一个Java平台中关于面向消息中间件的API。
  3. JMS是JavaEE规范中的一种,类比JDBC。很多MQ产品都实现了JMS规范,例如ActiveMQ等产品。
    • RabbitMQ官方并没有实现JMS规范,但是开源社区有JMS的实现包。

java操作rabbitmq的简单模式

  1. 操作之前记得启动rabbitmq服务(rabbitmq的客户端不用开启也行)
docker start rabbitmq
  1. 注意:启动rabbitmq的web客户端是为了能访问它的客户端界面
docker exec -it rabbitmq rabbitmq-plugins enable rabbitmq_management
第一步:创建项目(使用简单的maven项目即可)并添加RabbitMQ依赖

在这里插入图片描述

  • 依赖
<dependencies><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.14.0</version></dependency>
</dependencies>
第二步:生产者和消费者代码的编写
  • 创建生产者(producer)代码
package com.knife.demo01.simple;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;// 生产者
public class Producer {public static void main(String[] args) throws IOException, TimeoutException {// 1.创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.70.130"); // 虚拟机ipconnectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");connectionFactory.setVirtualHost("/");// 2.创建连接Connection connection = connectionFactory.newConnection();// 3.建立信道Channel channel = connection.createChannel();// 4.创建队列,如果队列已存在,则使用该队列/*** 参数1:队列名* 参数2:是否持久化,true表示MQ重启后队列还在。* 参数3:是否私有化,false表示所有消费者都可以访问,true表示只有第一次拥有它的消费者才能访问* 参数4:是否自动删除,true表示不再使用队列时自动删除队列* 参数5:其他额外参数*/channel.queueDeclare("simple_queue",false,false,false,null);// 5.发送消息String message = "hello!rabbitmq!";/*** 参数1:交换机名,""表示默认交换机direct* 参数2:路由键,简单模式就是队列名* 参数3:其他额外参数* 参数4:要传递的消息字节数组*/channel.basicPublish("","simple_queue",null,message.getBytes());// 6.关闭信道和连接channel.close();connection.close();System.out.println("===发送成功===");}
}

运行生产者代码结果:
在这里插入图片描述
在这里插入图片描述

  • 创建消费者(consumer)代码
package com.knife.demo01.simple;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;// 消费者
public class Consumer {public static void main(String[] args) throws IOException, TimeoutException {// 1.创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.70.130"); // 虚拟机ipconnectionFactory.setPort(5672); // 端口号connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");connectionFactory.setVirtualHost("/");// 2.创建连接Connection connection = connectionFactory.newConnection();// 3.建立信道Channel channel = connection.createChannel();// 4.监听队列/*** 参数1:监听的队列名* 参数2:是否自动签收,如果设置为false,则需要手动确认消息已收到,否则MQ会一直发送消息* 参数3:Consumer的实现类,重写该类方法表示接受到消息后如何消费*/channel.basicConsume("simple_queue",true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//                接收消息String message = new String(body, "UTF-8");System.out.println("接受消息,消息为:"+message);}});}
}

运行消费者代码结果:
在这里插入图片描述

2、RabbitMQ的工作队列模式(相比简单模式,处理消息的消费者增多了)

在这里插入图片描述

  1. 工作队列模式(Work Queue)与简单模式相比,多了一些消费者,该模式也使用direct交换机(默认使用的交换机),应用于处理消息较多的情况。特点如下:

      1. 一个队列对应多个消费者。
      1. 一条消息只会被一个消费者消费。
      1. 消息队列默认采用轮询的方式将消息平均发送给消费者。
      1. 在一个队列中如果有多个消费者,那么消费者之间对于同一个消息的关系是竞争的关系
  2. Work Queues 对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。例如:短信服务部署多个,只需要有一个节点成功发送即可
    在这里插入图片描述

java操作rabbitmq的工作队列模式

  1. Work Queues 的入门程序与上面简单模式的代码几乎是一样的。可以完全复制,只是比简单模式的消费者多几个而已,可以让多个消费者同时对消费消息的测试。
  2. 在简单模式的基础下编写代码(直接用同一个项目)
    在这里插入图片描述
  • 消息生产者代码:
public class Producer {public static void main(String[] args) throws IOException, TimeoutException {
//        创建工厂连接ConnectionFactory cf = new ConnectionFactory();cf.setHost("192.168.70.130"); //虚拟机地址cf.setPort(5672); //rabbitmq的端口号cf.setUsername("admin"); // 用户名cf.setPassword("admin"); // 密码cf.setVirtualHost("/"); //        创建连接Connection connection = cf.newConnection();//        创建信道Channel channel = connection.createChannel();//        创建队列/*** 参数1:队列名称* 参数二:是否持久化* 参数三:是否私有化* 参数4:队列使用完毕后是否自动删除*/channel.queueDeclare("work_queue",true,false,false,null);//        发送消息/*** 参数1:交换机名,""表示默认交换机direct* 参数2:路由键,简单模式就是队列名* 参数3:其他额外参数* 参数4:要传递的消息字节数组*/for (int i = 1; i <= 10; i++) {channel.basicPublish("","work_queue", MessageProperties.PERSISTENT_TEXT_PLAIN,("你好,这是今天的第"+i+"条消息").getBytes());}//        关闭资源channel.close();connection.close();System.out.println("消息发送成功====");}
}
  • 消息消费者代码(多个消费者可以代码复用,复用下面的代码,多创几个类)
public class Consumer {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工程ConnectionFactory cf = new ConnectionFactory();cf.setHost("192.168.70.130");cf.setPort(5672);cf.setUsername("admin");cf.setPassword("admin");cf.setVirtualHost("/");//2.创建连接Connection conn = cf.newConnection();//3.创建信道Channel channel = conn.createChannel();//        4. 接收消息channel.basicConsume("work_queue",true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg = new String(body, StandardCharsets.UTF_8);System.out.println("消费者1 = " + msg);}});channel.close();conn.close();}}

代码运行的效果和上面简单模式的类似。

3、RabbitMQ的发布订阅模式(相比工作队列模式,交换机使用的类型是Fanout(广播类型))

在这里插入图片描述

  • 图解说明
    • P:生产者,向 Exchange 发送消息
    • X:Exchange(交换机),接收生产者的消息,然后把消息递交给与交换机绑定的队列
    • C1、C2:消费者,其所在队列要与交换机进行绑定
  1. 在开发过程中,有一些消息需要不同的消费者进行不同的处理
    • 如电商网站的同一条促销信息需要短信发送邮件发送站内信发送等。此时可以使用发布订阅模式(Publish/Subscribe)
  2. 在订阅模型中,多了一个Exchange角色,而且过程略有变化:
    • Producer:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
    • Consumer:消费者,消息的接收者,会一直等待消息到来
    • Queue:消息队列,接收消息、缓存消息
    • Exchange:交换机(X)。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型:
      • Fanout:广播类型,将消息交给所有绑定到交换机的队列
      • Direct:定向类型,把消息交给符合指定routing key 的队列
      • Topic:通配符类型,把消息交给符合routing pattern(路由模式) 的队列,Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与 Exchange 绑定,或者没有符合路由规则的队列,那么消息会丢失!
  3. 特点:
      1. 生产者(Producer)将消息发送给交换机,交换机将消息转发到绑定此交换机的每个队列中(即可以转发到多个队列中)。
      1. 工作队列模式的交换机只能将消息发送给一个队列,发布订阅模式的交换机能将消息发送给多个队列。发布订阅模式使用fanout交换机

java操作rabbitmq的发布订阅模式

  1. 在简单模式的基础下编写代码(直接用同一个项目)
    在这里插入图片描述
    其实代码还是差不多的,只是使用到的交换机不一样了而已。
  • Producer
public class Producer {// 生产者public static void main(String[] args) throws IOException, TimeoutException {// 1.创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.70.130");connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");connectionFactory.setVirtualHost("/");// 2.创建连接Connection connection = connectionFactory.newConnection();// 3.建立信道Channel channel = connection.createChannel();// 4.创建交换机/*** 参数1:交换机名* 参数2:交换机类型* 参数3:交换机持久化*/
//        BuiltinExchangeType.FANOUT:表示广播模式channel.exchangeDeclare("exchange_fanout", BuiltinExchangeType.FANOUT, true);// 5.创建队列
//        队列1channel.queueDeclare("SEND_MAIL", true, false, false, null);
//        队列2channel.queueDeclare("SEND_MESSAGE", true, false, false, null);
//        队列3channel.queueDeclare("SEND_STATION", true, false, false, null);// 6.交换机绑定队列/*** 参数1:队列的名称* 参数2:交换机名* 参数3:路由关键字,发布订阅模式写""即可*/channel.queueBind("SEND_MAIL", "exchange_fanout", "");channel.queueBind("SEND_MESSAGE", "exchange_fanout", "");channel.queueBind("SEND_STATION", "exchange_fanout", "");// 7.发送消息channel.basicPublish("exchange_fanout", "", null,("618商品开抢了!").getBytes(StandardCharsets.UTF_8));
//        for (int i = 1; i <= 10; i++) {
//            channel.basicPublish("exchange_fanout", "", null,
//                    ("你好,尊敬的用户,秒杀商品开抢了!" + i).getBytes(StandardCharsets.UTF_8));
//        }// 8.关闭资源channel.close();connection.close();}
}
  • ConsumerMail(消费者代码都类似)
public class ConsumerMail {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工程ConnectionFactory cf = new ConnectionFactory();cf.setHost("192.168.70.130");cf.setPort(5672);cf.setUsername("admin");cf.setPassword("admin");cf.setVirtualHost("/");//2.创建连接Connection conn = cf.newConnection();//3.创建信道Channel channel = conn.createChannel();//4.监听队列channel.basicConsume("SEND_MAIL",true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg = new String(body, StandardCharsets.UTF_8);System.out.println("发送邮件消息 = " + msg);}});}
}
  • ConsumerMesage
public class ConsumerMessage {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工程ConnectionFactory cf = new ConnectionFactory();cf.setHost("192.168.70.130");cf.setPort(5672);cf.setUsername("admin");cf.setPassword("admin");cf.setVirtualHost("/");//2.创建连接Connection conn = cf.newConnection();//3.创建信道Channel channel = conn.createChannel();//4.监听队列channel.basicConsume("SEND_MESSAGE",true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg = new String(body, StandardCharsets.UTF_8);System.out.println("发送短信消息 = " + msg);}});}
}
  • ConsumerStation
public class ConsumerStation {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工程ConnectionFactory cf = new ConnectionFactory();cf.setHost("192.168.70.130");cf.setPort(5672);cf.setUsername("admin");cf.setPassword("admin");cf.setVirtualHost("/");//2.创建连接Connection conn = cf.newConnection();//3.创建信道Channel channel = conn.createChannel();//4.监听队列channel.basicConsume("SEND_STATION",true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg = new String(body, StandardCharsets.UTF_8);System.out.println("发送站內信 = " + msg);}});}
}

4、RabbitMQ的路由模式(相比发布订阅模式,多了个Route Key)

  1. 使用发布订阅模式时,所有消息都会发送到绑定的队列中,但很多时候,不是所有消息都无差别的发布到所有队列中。比如电商网站的促销活动,双十一大促可能会发布到所有队列;而一些小的促销活动为了节约成本,只发布到站内信队列。此时需要使用路由模式(Routing)完成这一需求。
    在这里插入图片描述
  • 图解说明
    • P:生产者,向 Exchange 发送消息,发送消息时,会指定一个routing key
    • X:Exchange(交换机),接收生产者的消息,然后把消息递交给与 routing key 完全匹配的队列
    • C1:消费者,其所在队列指定了需要 routing key 为 error 的消息
    • C2:消费者,其所在队列指定了需要 routing key 为 info、error、warning 的消息
  1. 队列与交换机的绑定,不能是任意绑定了,而是要指定一个 RoutingKey(路由key),消息的发送方在向Exchange发送消息时,也必须指定消息的 RoutingKey,Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey 与消息的 Routing key 完全一致,才会接收到消息。

java操作rabbitmq的路由模式

  1. 在简单模式的基础下编写代码(直接用同一个项目)
    在这里插入图片描述

其实代码还是差不多的,只是使用到的交换机不一样了,多了一个route Key。

  • Producer
public class Producer {// 生产者public static void main(String[] args) throws IOException, TimeoutException {// 1.创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.70.130");connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");connectionFactory.setVirtualHost("/");// 2.创建连接Connection connection = connectionFactory.newConnection();// 3.建立信道Channel channel = connection.createChannel();// 4.创建交换机/*** 参数1:交换机名* 参数2:交换机类型* 参数3:交换机持久化*/
//      BuiltinExchangeType.DIRECTchannel.exchangeDeclare("routing_exchange", BuiltinExchangeType.DIRECT, true);// 5.创建队列
//        队列1channel.queueDeclare("SEND_MAILRoute", true, false, false, null);
//        队列2channel.queueDeclare("SEND_MESSAGERoute", true, false, false, null);
//        队列3channel.queueDeclare("SEND_STATIONRoute", true, false, false, null);// 6.交换机绑定队列/*** 参数1:队列的名称* 参数2:交换机名* 参数3:路由关键字(路由名称)*/channel.queueBind("SEND_MAILRoute", "routing_exchange", "import");channel.queueBind("SEND_MESSAGERoute", "routing_exchange", "import");channel.queueBind("SEND_STATIONRoute", "routing_exchange", "import");channel.queueBind("SEND_STATIONRoute", "routing_exchange", "normal");// 7.发送消息channel.basicPublish("routing_exchange", "import", null,("618商品开抢了!").getBytes(StandardCharsets.UTF_8));channel.basicPublish("routing_exchange", "normal", null,("normal路由的息").getBytes(StandardCharsets.UTF_8));
//        for (int i = 1; i <= 10; i++) {
//            channel.basicPublish("exchange_fanout", "", null,
//                    ("你好,尊敬的用户,秒杀商品开抢了!" + i).getBytes(StandardCharsets.UTF_8));
//        }// 8.关闭资源channel.close();connection.close();}
}
  • ConsumerMail(消费者代码都类似)
public class ConsumerMail {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工程ConnectionFactory cf = new ConnectionFactory();cf.setHost("192.168.70.130");cf.setPort(5672);cf.setUsername("admin");cf.setPassword("admin");cf.setVirtualHost("/");//2.创建连接Connection conn = cf.newConnection();//3.创建信道Channel channel = conn.createChannel();//4.监听队列channel.basicConsume("SEND_MAILRoute",true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg = new String(body, StandardCharsets.UTF_8);System.out.println("发送邮件消息 = " + msg);}});}
}
  • ConsumerMesage
public class ConsumerMessage {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工程ConnectionFactory cf = new ConnectionFactory();cf.setHost("192.168.70.130");cf.setPort(5672);cf.setUsername("admin");cf.setPassword("admin");cf.setVirtualHost("/");//2.创建连接Connection conn = cf.newConnection();//3.创建信道Channel channel = conn.createChannel();//4.监听队列channel.basicConsume("SEND_MESSAGERoute",true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg = new String(body, StandardCharsets.UTF_8);System.out.println("发送短信消息 = " + msg);}});}
}
  • ConsumerStation
public class ConsumerStation {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工程ConnectionFactory cf = new ConnectionFactory();cf.setHost("192.168.70.130");cf.setPort(5672);cf.setUsername("admin");cf.setPassword("admin");cf.setVirtualHost("/");//2.创建连接Connection conn = cf.newConnection();//3.创建信道Channel channel = conn.createChannel();//4.监听队列channel.basicConsume("SEND_STATIONRoute",true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg = new String(body, StandardCharsets.UTF_8);System.out.println("发送站內信 = " + msg);}});}
}

5、RabbitMQ的通配符模式(相比路由模式,路由的组成中带上了通配符)

在这里插入图片描述

  1. 通配符模式(Topic)是在路由模式的基础上,给队列绑定带通配符的路由关键字,只要消息的RoutingKey能实现通配符匹配,就会将消息转发到该队列。通配符模式比路由模式更灵活,使用topic交换机
  2. 通配符规则:
      1. 消息设置RoutingKey时,RoutingKey由多个单词构成,中间以.分割。
      1. 队列设置RoutingKey时,#可以匹配任意多个单词,*可以匹配任意一个单词。
        在这里插入图片描述
  • 红色 Queue:绑定的是 usa.# ,因此凡是以 usa. 开头的 routing key 都会被匹配到
  • 黄色 Queue:绑定的是 #.news ,因此凡是以 .news 结尾的 routing key 都会被匹配

java操作rabbitmq的通配符模式

  1. 在简单模式的基础下编写代码(直接用同一个项目)
    在这里插入图片描述
    其实代码还是差不多的,只是route Key的组成多了通配符。
  • Producer
public class Producer {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工厂ConnectionFactory cf = new ConnectionFactory();cf.setHost("192.168.126.10");cf.setPort(5672);cf.setUsername("admin");cf.setPassword("admin");cf.setVirtualHost("/");//2.创建连接Connection conn = cf.newConnection();//3.创建信道Channel channel = conn.createChannel();//4.创建交换机/*** 参数1:交换机名* 参数2:交换机类型* 参数3:交换机持久化*/channel.exchangeDeclare("exchange_topic", BuiltinExchangeType.TOPIC, true);/*** 参数1:队列名* 参数2:是否持久化,true表示MQ重启后队列还在。* 参数3:是否私有化,false表示所有消费者都可以访问,true表示只有第一次拥有它的消费者才能访问* 参数4:是否自动删除,true表示不再使用队列时自动删除队列* 参数5:其他额外参数*///5.创建队列(举例订单给手机发送,邮箱,站内)发送消息,3个队列channel.queueDeclare("SEND_MAIL3", true, false, false, null);channel.queueDeclare("SEND_MESSAGE3", true, false, false, null);channel.queueDeclare("SEND_STATION3", true, false, false, null);//6.将队列和交换机绑定/*** 参数1:队列名* 参数2:交换机名* 参数3:路由关键字,发布订阅模式写""即可*/channel.queueBind("SEND_MAIL3", "exchange_topic", "#.mail.#");channel.queueBind("SEND_MESSAGE3", "exchange_topic", "#.message.#");channel.queueBind("SEND_STATION3", "exchange_topic", "#.station.#");//8.发送消息channel.basicPublish("exchange_topic", "mail.message.station", null, "618大促销活动".getBytes(StandardCharsets.UTF_8));channel.basicPublish("exchange_topic", "station", null, "618小促销活动".getBytes(StandardCharsets.UTF_8));//9.关闭资源channel.close();conn.close();System.out.println("发送消息成功");}
}
  • ConsumerMail(消费者代码都类似)
public class ConsumerMail {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工程ConnectionFactory cf = new ConnectionFactory();cf.setHost("192.168.126.10");cf.setPort(5672);cf.setUsername("admin");cf.setPassword("admin");cf.setVirtualHost("/");//2.创建连接Connection conn = cf.newConnection();//3.创建信道Channel channel = conn.createChannel();//4.监听队列channel.basicConsume("SEND_MAIL3",true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg = new String(body, StandardCharsets.UTF_8);System.out.println("发送邮件消息 = " + msg);}});}
}
  • ConsumerMesage
public class ConsumerMessage {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工程ConnectionFactory cf = new ConnectionFactory();cf.setHost("192.168.126.10");cf.setPort(5672);cf.setUsername("admin");cf.setPassword("admin");cf.setVirtualHost("/");//2.创建连接Connection conn = cf.newConnection();//3.创建信道Channel channel = conn.createChannel();//4.监听队列channel.basicConsume("SEND_MESSAGE3",true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg = new String(body, StandardCharsets.UTF_8);System.out.println("发送短信消息 = " + msg);}});}
}
  • ConsumerStation
public class ConsumerStation {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工程ConnectionFactory cf = new ConnectionFactory();cf.setHost("192.168.126.10");cf.setPort(5672);cf.setUsername("admin");cf.setPassword("admin");cf.setVirtualHost("/");//2.创建连接Connection conn = cf.newConnection();//3.创建信道Channel channel = conn.createChannel();//4.监听队列channel.basicConsume("SEND_STATION3",true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg = new String(body, StandardCharsets.UTF_8);System.out.println("发送站內信 = " + msg);}});}
}

RabbitMQ一、RabbitMQ的介绍与安装(docker)
RabbitMQ三、springboot整合rabbitmq(消息可靠性、高级特性)

这篇关于RabbitMQ二、RabbitMQ的六种模式的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1021299

相关文章

Redis Cluster模式配置

《RedisCluster模式配置》:本文主要介绍RedisCluster模式配置,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录分片 一、分片的本质与核心价值二、分片实现方案对比 ‌三、分片算法详解1. ‌范围分片(顺序分片)‌2. ‌哈希分片3. ‌虚

RabbitMQ工作模式中的RPC通信模式详解

《RabbitMQ工作模式中的RPC通信模式详解》在RabbitMQ中,RPC模式通过消息队列实现远程调用功能,这篇文章给大家介绍RabbitMQ工作模式之RPC通信模式,感兴趣的朋友一起看看吧... 目录RPC通信模式概述工作流程代码案例引入依赖常量类编写客户端代码编写服务端代码RPC通信模式概述在R

C++ RabbitMq消息队列组件详解

《C++RabbitMq消息队列组件详解》:本文主要介绍C++RabbitMq消息队列组件的相关知识,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录1. RabbitMq介绍2. 安装RabbitMQ3. 安装 RabbitMQ 的 C++客户端库4. A

Python中合并列表(list)的六种方法小结

《Python中合并列表(list)的六种方法小结》本文主要介绍了Python中合并列表(list)的六种方法小结,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋... 目录一、直接用 + 合并列表二、用 extend() js方法三、用 zip() 函数交叉合并四、用

SQL Server身份验证模式步骤和示例代码

《SQLServer身份验证模式步骤和示例代码》SQLServer是一个广泛使用的关系数据库管理系统,通常使用两种身份验证模式:Windows身份验证和SQLServer身份验证,本文将详细介绍身份... 目录身份验证方式的概念更改身份验证方式的步骤方法一:使用SQL Server Management S

Redis高可用-主从复制、哨兵模式与集群模式详解

《Redis高可用-主从复制、哨兵模式与集群模式详解》:本文主要介绍Redis高可用-主从复制、哨兵模式与集群模式的使用,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝... 目录Redis高可用-主从复制、哨兵模式与集群模式概要一、主从复制(Master-Slave Repli

一文带你搞懂Redis Stream的6种消息处理模式

《一文带你搞懂RedisStream的6种消息处理模式》Redis5.0版本引入的Stream数据类型,为Redis生态带来了强大而灵活的消息队列功能,本文将为大家详细介绍RedisStream的6... 目录1. 简单消费模式(Simple Consumption)基本概念核心命令实现示例使用场景优缺点2

Nginx location匹配模式与规则详解

《Nginxlocation匹配模式与规则详解》:本文主要介绍Nginxlocation匹配模式与规则,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、环境二、匹配模式1. 精准模式2. 前缀模式(不继续匹配正则)3. 前缀模式(继续匹配正则)4. 正则模式(大

Python FastAPI+Celery+RabbitMQ实现分布式图片水印处理系统

《PythonFastAPI+Celery+RabbitMQ实现分布式图片水印处理系统》这篇文章主要为大家详细介绍了PythonFastAPI如何结合Celery以及RabbitMQ实现简单的分布式... 实现思路FastAPI 服务器Celery 任务队列RabbitMQ 作为消息代理定时任务处理完整

Linux系统配置NAT网络模式的详细步骤(附图文)

《Linux系统配置NAT网络模式的详细步骤(附图文)》本文详细指导如何在VMware环境下配置NAT网络模式,包括设置主机和虚拟机的IP地址、网关,以及针对Linux和Windows系统的具体步骤,... 目录一、配置NAT网络模式二、设置虚拟机交换机网关2.1 打开虚拟机2.2 管理员授权2.3 设置子