RabbitMQ入门教程 For Java【3】 - Publish/Subscribe

2024-02-25 19:38

本文主要是介绍RabbitMQ入门教程 For Java【3】 - Publish/Subscribe,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

我的开发环境: 
操作系统: Windows7 64bit 
开发环境: JDK 1.7 - 1.7.0_55 
开发工具: Eclipse Kepler SR2 
RabbitMQ版本: 3.6.0 
Elang版本: erl7.2.1 
关于Windows7下安装RabbitMQ的教程请先在网上找一下,有空我再补安装教程。 
源码地址 
https://github.com/chwshuang/rabbitmq.git



 在上一章中,我们学习创建了一个消息队列,她的每个任务消息只发送给一个工人。这一章,我们会将同一个任务消息发送给多个工人。这种模式就是“发布/订阅”。
  • 1
  • 2

为了说明这种模式,我们将以一个日志系统进行讲解:一个日志发送者,两个日志接收者,接收者1可以把这条日志写入到磁盘上,另外一个接收者2可以将这条日志打印到控制台中。

“发布/订阅”模式的基础是将消息广播到所有的接收器上。


交换器


在之前的教程中,我们都是直接在消息队列中进行发送和接收消息,现在开始要介绍RabbitMQ完整的消息模型了。 
首先,我们先来回顾一下之前学到关于RabbitMQ的内容:

  • 生产者是发送消息的应用程序
  • 队列是存储消息的缓冲区
  • 消费者是接收消息的应用程序

实际上,RabbitMQ中消息传递模型的核心思想是:生产者不直接发送消息到队列。实际的运行环境中,生产者是不知道消息会发送到那个队列上,她只会将消息发送到一个交换器,交换器也像一个生产线,她一边接收生产者发来的消息,另外一边则根据交换规则,将消息放到队列中。交换器必须知道她所接收的消息是什么?它应该被放到那个队列中?它应该被添加到多个队列吗?还是应该丢弃?这些规则都是按照交换器的规则来确定的。 
这里写图片描述

交换器的规则有:
  • direct (直连)
  • topic (主题)
  • headers (标题)
  • fanout (分发)也有翻译为扇出的。

我们将使用【fanout】类型创建一个名称为 logs的交换器,

channel.exchangeDeclare("logs", "fanout");
  • 1

分发交换器很简单,你通过名称也能想到,她是广播所有的消息,

交换器列表 
通过rabbitmqctl list_exchanges指令列出服务器上所有可用的交换器

$ sudo rabbitmqctl list_exchanges
Listing exchanges ...direct
amq.direct      direct
amq.fanout      fanout
amq.headers     headers
amq.match       headers
amq.rabbitmq.log        topic
amq.rabbitmq.trace      topic
amq.topic       topic
logs    fanout
...done.
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

这个列表里面所有以【amq.*】开头的交换器都是RabbitMQ默认创建的。在生产环境中,可以自己定义。

匿名交换器 
在之前的教程中,我们知道,发送消息到队列时根本没有使用交换器,但是消息也能发送到队列。这是因为RabbitMQ选择了一个空“”字符串的默认交换器。 
来看看我们之前的代码:

channel.basicPublish("", "hello", null, message.getBytes());
  • 1

第一个参数就是交换器的名称。如果输入“”空字符串,表示使用默认的匿名交换器。 
第二个参数是【routingKey】路由线索 
匿名交换器规则: 
发送到routingKey名称对应的队列。

现在,我们可以发送消息到交换器中:

channel.basicPublish( "logs", "", null, message.getBytes());
  • 1

临时队列


记得前两章中使用的队列指定的名称吗?(Hello World和task_queue). 
如果要在生产者和消费者之间创建一个新的队列,又不想使用原来的队列,临时队列就是为这个场景而生的:

  1. 首先,每当我们连接到RabbitMQ,我们需要一个新的空队列,我们可以用一个随机名称来创建,或者说让服务器选择一个随机队列名称给我们。
  2. 一旦我们断开消费者,队列应该立即被删除。

    在Java客户端,提供queuedeclare()为我们创建一个非持久化、独立、自动删除的队列名称。

String queueName = channel.queueDeclare().getQueue();
  • 1

通过上面的代码就能获取到一个随机队列名称。 
例如:它可能是:amq.gen-jzty20brgko-hjmujj0wlg。


绑定


这里写图片描述 
如果我们已经创建了一个分发交换器和队列,现在我们就可以就将我们的队列跟交换器进行绑定。

channel.queueBind(queueName, "logs", "");
  • 1

执行完这段代码后,日志交换器会将消息添加到我们的队列中。

绑定列表 
如果要查看绑定列表,可以执行【rabbitmqctl list_bindings】命令


全部代码


这里写图片描述

目录

这里写图片描述

生产者程序,他负责发送日志消息,与之前不同的是它不是将消息发送到匿名交换器中,而是发送到一个名为【logs】的交换器中。我们提供一个空字符串的routingkey,它的功能被交换器的分发类型代替了。下面是EmitLog.java的代码:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class EmitLog {private static final String EXCHANGE_NAME = "logs";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, "fanout");//      分发消息for(int i = 0 ; i < 5; i++){String message = "Hello World! " + i;channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");}channel.close();connection.close();}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

上面的代码中,在建立连接后,我们声明了一个交互。如果当前没有队列被绑定到交换器,消息将被丢弃,因为没有消费者监听,这条消息将被丢弃。

下面的代码是接收日志ReceiveLogs1.java 和ReceiveLogs2.java:

import com.rabbitmq.client.*;import java.io.IOException;public class ReceiveLogs1 {private static final String EXCHANGE_NAME = "logs";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, "fanout");String queueName = channel.queueDeclare().getQueue();channel.queueBind(queueName, EXCHANGE_NAME, "");System.out.println(" [*] Waiting for messages. To exit press CTRL+C");Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println(" [x] Received '" + message + "'");}};channel.basicConsume(queueName, true, consumer);}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
import com.rabbitmq.client.*;import java.io.IOException;public class ReceiveLogs1 {private static final String EXCHANGE_NAME = "logs";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, "fanout");String queueName = channel.queueDeclare().getQueue();channel.queueBind(queueName, EXCHANGE_NAME, "");System.out.println(" [*] Waiting for messages. To exit press CTRL+C");Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println(" [x] Received '" + message + "'");}};channel.basicConsume(queueName, true, consumer);}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30

运行


先运行ReceiveLogs1和ReceiveLogs2可以看到日志:

 [*] Waiting for messages. To exit press CTRL+C
  • 1

然后运行EmitLog:

EmitLog日志:[x] Sent 'Hello World! 0'[x] Sent 'Hello World! 1'[x] Sent 'Hello World! 2'[x] Sent 'Hello World! 3'[x] Sent 'Hello World! 4'ReceiveLogs1和ReceiveLogs2日志[*] Waiting for messages. To exit press CTRL+C[x] Received 'Hello World! 0'[x] Received 'Hello World! 1'[x] Received 'Hello World! 2'[x] Received 'Hello World! 3'[x] Received 'Hello World! 4'
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

看到这里,说明我们的程序运行正常,消费者通过声明【logs】交换器和【fanout】类型,接收到了来自【logs】交换器的所有消息。

使用【rabbitmqctl list_bindings】命令可以看到两个临时队列的名称

$ sudo rabbitmqctl list_bindings
Listing bindings ...
logs    exchange        amq.gen-JzTY20BRgKO-HjmUJj0wLg  queue           []
logs    exchange        amq.gen-vso0PVvyiRIL2WoV3i48Yg  queue           []
...done.
  • 1
  • 2
  • 3
  • 4
  • 5

以上就是这一章讲的发布/订阅模式,下一章将介绍消息路由(Routing)

这篇关于RabbitMQ入门教程 For Java【3】 - Publish/Subscribe的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/746489

相关文章

Java中流式并行操作parallelStream的原理和使用方法

《Java中流式并行操作parallelStream的原理和使用方法》本文详细介绍了Java中的并行流(parallelStream)的原理、正确使用方法以及在实际业务中的应用案例,并指出在使用并行流... 目录Java中流式并行操作parallelStream0. 问题的产生1. 什么是parallelS

Java中Redisson 的原理深度解析

《Java中Redisson的原理深度解析》Redisson是一个高性能的Redis客户端,它通过将Redis数据结构映射为Java对象和分布式对象,实现了在Java应用中方便地使用Redis,本文... 目录前言一、核心设计理念二、核心架构与通信层1. 基于 Netty 的异步非阻塞通信2. 编解码器三、

SpringBoot基于注解实现数据库字段回填的完整方案

《SpringBoot基于注解实现数据库字段回填的完整方案》这篇文章主要为大家详细介绍了SpringBoot如何基于注解实现数据库字段回填的相关方法,文中的示例代码讲解详细,感兴趣的小伙伴可以了解... 目录数据库表pom.XMLRelationFieldRelationFieldMapping基础的一些代

一篇文章彻底搞懂macOS如何决定java环境

《一篇文章彻底搞懂macOS如何决定java环境》MacOS作为一个功能强大的操作系统,为开发者提供了丰富的开发工具和框架,下面:本文主要介绍macOS如何决定java环境的相关资料,文中通过代码... 目录方法一:使用 which命令方法二:使用 Java_home工具(Apple 官方推荐)那问题来了,

Java HashMap的底层实现原理深度解析

《JavaHashMap的底层实现原理深度解析》HashMap基于数组+链表+红黑树结构,通过哈希算法和扩容机制优化性能,负载因子与树化阈值平衡效率,是Java开发必备的高效数据结构,本文给大家介绍... 目录一、概述:HashMap的宏观结构二、核心数据结构解析1. 数组(桶数组)2. 链表节点(Node

Java AOP面向切面编程的概念和实现方式

《JavaAOP面向切面编程的概念和实现方式》AOP是面向切面编程,通过动态代理将横切关注点(如日志、事务)与核心业务逻辑分离,提升代码复用性和可维护性,本文给大家介绍JavaAOP面向切面编程的概... 目录一、AOP 是什么?二、AOP 的核心概念与实现方式核心概念实现方式三、Spring AOP 的关

详解SpringBoot+Ehcache使用示例

《详解SpringBoot+Ehcache使用示例》本文介绍了SpringBoot中配置Ehcache、自定义get/set方式,并实际使用缓存的过程,文中通过示例代码介绍的非常详细,对大家的学习或者... 目录摘要概念内存与磁盘持久化存储:配置灵活性:编码示例引入依赖:配置ehcache.XML文件:配置

Java 虚拟线程的创建与使用深度解析

《Java虚拟线程的创建与使用深度解析》虚拟线程是Java19中以预览特性形式引入,Java21起正式发布的轻量级线程,本文给大家介绍Java虚拟线程的创建与使用,感兴趣的朋友一起看看吧... 目录一、虚拟线程简介1.1 什么是虚拟线程?1.2 为什么需要虚拟线程?二、虚拟线程与平台线程对比代码对比示例:三

Java中的.close()举例详解

《Java中的.close()举例详解》.close()方法只适用于通过window.open()打开的弹出窗口,对于浏览器的主窗口,如果没有得到用户允许是不能关闭的,:本文主要介绍Java中的.... 目录当你遇到以下三种情况时,一定要记得使用 .close():用法作用举例如何判断代码中的 input

Spring Gateway动态路由实现方案

《SpringGateway动态路由实现方案》本文主要介绍了SpringGateway动态路由实现方案,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随... 目录前沿何为路由RouteDefinitionRouteLocator工作流程动态路由实现尾巴前沿S