rabbitmq 学习三-spring-boot中配置交换器和队列

2024-03-14 21:48

本文主要是介绍rabbitmq 学习三-spring-boot中配置交换器和队列,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

rabbitmq主要的交换器类型有fanout、direct、topic、headers
下面分别介绍三种常用的交换器使用方法
1.fanout交换器
将所有发送到该交换器的消息会路由到所有与该交换器绑定的队列中
2.direct交换器
将消息路由到RoutingKey完全匹配的队列中
3.topic 交换器
将消息路由到RoutingKey匹配的队列中,匹配的规则支持特殊字符 ”*“和“#”
一、fanout交换器使用和配置
1.声明队列、交换器并将队列绑定到交换器

@Configuration
public class RabbitFanoutExchangeConfiguration {/*** 声明队列* @return*/@Bean(name = "q.test1")public Queue queue() {return new Queue("q.test1");}@Bean("q.test2")public Queue queue2() {return new Queue("q.test2");}/*** 声明交换器* @return*/@Beanpublic FanoutExchange fanoutExchange() {return new FanoutExchange("x.test1");}/*** 绑定队列到交换器* @return*/@Beanpublic Binding bindingQueue2Exchange() {return BindingBuilder.bind(queue()).to(fanoutExchange());}@Beanpublic Binding bindingQueue2FanoutExchange() {return BindingBuilder.bind(queue2()).to(fanoutExchange());}
}

2.创建消息生产者

@Component
public class RabbitProducer {@AutowiredAmqpTemplate amqpTemplate;public void sendMessage(Object object) {try {Message message= MessageBuilder.withBody(JSON.toJSONString(object).getBytes("UTF-8")).setContentType(MessageProperties.CONTENT_TYPE_JSON).setContentEncoding("UTF-8").setMessageId(UUID.randomUUID().toString()).build();// 指定exchange 为x.test1amqpTemplate.send("x.test1","",message);} catch (UnsupportedEncodingException e) {e.printStackTrace();}}
}

3.创建消息消费者

@Component
@RabbitListener(queues = "q.test1")
public class FanoutExchangeConsumer1 {@RabbitHandlerpublic void receiveMessage(Object object) {Message message=(Message)object;byte bytes[]=null;if (message != null) {bytes=message.getBody();}try {String msg=new String(bytes,"UTF-8");System.out.println("q.test1------------------------"+msg);} catch (UnsupportedEncodingException e) {e.printStackTrace();}}
}@Component
@RabbitListener(queues = "q.test2")
public class FanoutExchangeConsumer2 {@RabbitHandlerpublic void receiveMessage(Object object) {Message message=(Message)object;byte bytes[]=null;if (message != null) {bytes=message.getBody();}try {String msg=new String(bytes,"UTF-8");System.out.println("q.test2------------------------"+msg);} catch (UnsupportedEncodingException e) {e.printStackTrace();}}
}

二、direct交换器使用和配置
1.声明队列、交换器并将队列绑定到交换器

@Configuration
public class RabbitDirectExchangeConfiguration {/*** 动态声明队列* @return*/@Bean(name = "q.direct1")public Queue queue1() {return new Queue("q.direct1");}/*** 动态声明队列* @return*/@Bean(name = "q.direct2")public Queue queue2() {return new Queue("q.direct2");}/*** 动态声明交换器* @return*/@Beanpublic DirectExchange directExchange() {return new DirectExchange("x.direct");}/*** 使用路由键r.direct.routingKey1将交换器绑定到队列 q.direct1* @return*/@Beanpublic Binding bindingQueue1Exchange() {return BindingBuilder.bind(queue1()).to(directExchange()).with("r.direct.routingKey1");}/*** 使用路由键r.direct.routingKey2 将交换器绑定到队列 q.direct1* @return*/@Beanpublic Binding bindingExchange2Queue2() {return BindingBuilder.bind(queue1()).to(directExchange()).with("r.direct.routingKey2");}/*** 使用路由键 r.direct.routingKey1将交换器绑定到队列 q.direct2* @return*/@Beanpublic Binding bindingExchange2Queue() {return BindingBuilder.bind(queue2()).to(directExchange()).with("r.direct.routingKey1");}
}

2.创建消息生产者

public void sendMessage2DirectExchange(Object object) {try {Message message= MessageBuilder.withBody(JSON.toJSONString(object).getBytes("UTF-8")).setContentType(MessageProperties.CONTENT_TYPE_JSON).setContentEncoding("UTF-8").setMessageId(UUID.randomUUID().toString()).build();// 指定exchange 为x.direct, 路由键为 r.direct.routingKey1amqpTemplate.send("x.direct","r.direct.routingKey1",message);} catch (UnsupportedEncodingException e) {e.printStackTrace();}}public void sendMessage2DirectDirectExchange(Object object) {try {Message message= MessageBuilder.withBody(JSON.toJSONString(object).getBytes("UTF-8")).setContentType(MessageProperties.CONTENT_TYPE_JSON).setContentEncoding("UTF-8").setMessageId(UUID.randomUUID().toString()).build();// 指定exchange 为x.direct, 路由键为 r.direct.routingKey2amqpTemplate.send("x.direct","r.direct.routingKey2",message);} catch (UnsupportedEncodingException e) {e.printStackTrace();}}

3.创建消息消费者

@Component
@RabbitListener(queues = "q.direct1")
public class DirectExchangeConsumer1 {@RabbitHandlerpublic void receiveMessage(Object object) {Message message=(Message)object;byte bytes[]=null;if (message != null) {bytes=message.getBody();}try {String msg=new String(bytes,"UTF-8");System.out.println("q.direct1------------------------"+msg);} catch (UnsupportedEncodingException e) {e.printStackTrace();}}
}@Component
@RabbitListener(queues = "q.direct2")
public class DirectExchangeConsumer2 {@RabbitHandlerpublic void receiveMessage(Object object) {Message message=(Message)object;byte bytes[]=null;if (message != null) {bytes=message.getBody();}try {String msg=new String(bytes,"UTF-8");System.out.println("q.direct2------------------------"+msg);} catch (UnsupportedEncodingException e) {e.printStackTrace();}}
}

三、topic交换器配置和使用
1.声明队列、交换器并将队列绑定到交换器

@Configuration
public class TopicExchangeConfiguration {/*** 动态声明队列* @return*/@Beanpublic Queue topicQueue1() {return new Queue("q.topic1");}/*** 动态声明队列* @return*/@Beanpublic Queue topicQueue2() {return new Queue("q.topic2");}/*** 动态声明交换器* @return*/@Beanpublic TopicExchange topicExchange() {return new TopicExchange("x.topic");}@Beanpublic Binding bindingTopicExchange2Queue1() {return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("*.*.test");}@Beanpublic Binding bindingTopicExchange2Queue2() {return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("*.topic.*");}@Beanpublic Binding bindingTopicExchange2Queue3() {return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("com.#");}
}

2.创建消息生产者

public void sendMessage2TopicMessage(Object object) {try {Message message= MessageBuilder.withBody(JSON.toJSONString(object).getBytes("UTF-8")).setContentType(MessageProperties.CONTENT_TYPE_JSON).setContentEncoding("UTF-8").setMessageId(UUID.randomUUID().toString()).build();// 指定exchange 为x.direct, 路由键为 r.direct.routingKey2amqpTemplate.send("x.topic","com.test",message);} catch (UnsupportedEncodingException e) {e.printStackTrace();}}public void sendMessage2TopicMessage2(Object object) {try {Message message= MessageBuilder.withBody(JSON.toJSONString(object).getBytes("UTF-8")).setContentType(MessageProperties.CONTENT_TYPE_JSON).setContentEncoding("UTF-8").setMessageId(UUID.randomUUID().toString()).build();// 指定exchange 为x.direct, 路由键为 r.direct.routingKey2amqpTemplate.send("x.topic","client.topic.test",message);} catch (UnsupportedEncodingException e) {e.printStackTrace();}}

3.创建消息消费者

@Component
@RabbitListener(queues = "q.topic1")
public class TopicExchangeConsumer1 {@RabbitHandlerpublic void receiveMessage(Object object) {Message message=(Message)object;byte bytes[]=null;if (message != null) {bytes=message.getBody();}try {String msg=new String(bytes,"UTF-8");System.out.println("q.topic1------------------------"+msg);} catch (UnsupportedEncodingException e) {e.printStackTrace();}}
}@Component
@RabbitListener(queues = "q.topic2")
public class TopicExchangeConsumer2 {@RabbitHandlerpublic void receiveMessage(Object object) {Message message=(Message)object;byte bytes[]=null;if (message != null) {bytes=message.getBody();}try {String msg=new String(bytes,"UTF-8");System.out.println("q.topic2------------------------"+msg);} catch (UnsupportedEncodingException e) {e.printStackTrace();}}
}

四、创建消息发送测试controller

@RestController
@RequestMapping(value = "/messages")
public class MessageController {@AutowiredRabbitProducer rabbitProducer;@PostMapping(value = "/fanout")public Map<String,Object> sendMessage() {Map<String,Object> map=new HashMap<>();map.put("username","test1");map.put("password","123456");map.put("name","我是fanout 交换器测试人员");rabbitProducer.sendMessage(map);Map<String,Object> resultMap=new HashMap<>();resultMap.put("code","200");resultMap.put("message","success");return resultMap;}@PostMapping(value = "/direct")public Map<String,Object> sendMessage2DirectExchange() {Map<String,Object> map=new HashMap<>();map.put("username","test2");map.put("password","123456");map.put("name","我是direct 交换器测试人员");rabbitProducer.sendMessage2DirectExchange(map);Map<String,Object> resultMap=new HashMap<>();resultMap.put("code","200");resultMap.put("message","success");return resultMap;}@PostMapping(value = "/topic")public Map<String,Object> sendMessage2TopicExchange() {Map<String,Object> map=new HashMap<>();map.put("username","test3");map.put("password","123456");map.put("name","我是topic 交换器测试人员");rabbitProducer.sendMessage2TopicMessage(map);Map<String,Object> resultMap=new HashMap<>();resultMap.put("code","200");resultMap.put("message","success");return resultMap;}@PostMapping(value = "/topic1")public Map<String,Object> sendMessage2TopicExchange1() {Map<String,Object> map=new HashMap<>();map.put("username","test3");map.put("password","123456");map.put("name","我是topic 交换器测试人员");rabbitProducer.sendMessage2TopicMessage2(map);Map<String,Object> resultMap=new HashMap<>();resultMap.put("code","200");resultMap.put("message","success");return resultMap;}
}

源码下载地址
https://github.com/tangyajun/spring-boot-rabbit-consumer
https://github.com/tangyajun/rabbitmq-spring-demo

这篇关于rabbitmq 学习三-spring-boot中配置交换器和队列的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/809812

相关文章

SpringBoot中四种AOP实战应用场景及代码实现

《SpringBoot中四种AOP实战应用场景及代码实现》面向切面编程(AOP)是Spring框架的核心功能之一,它通过预编译和运行期动态代理实现程序功能的统一维护,在SpringBoot应用中,AO... 目录引言场景一:日志记录与性能监控业务需求实现方案使用示例扩展:MDC实现请求跟踪场景二:权限控制与

Android开发环境配置避坑指南

《Android开发环境配置避坑指南》本文主要介绍了Android开发环境配置过程中遇到的问题及解决方案,包括VPN注意事项、工具版本统一、Gerrit邮箱配置、Git拉取和提交代码、MergevsR... 目录网络环境:VPN 注意事项工具版本统一:android Studio & JDKGerrit的邮

Java NoClassDefFoundError运行时错误分析解决

《JavaNoClassDefFoundError运行时错误分析解决》在Java开发中,NoClassDefFoundError是一种常见的运行时错误,它通常表明Java虚拟机在尝试加载一个类时未能... 目录前言一、问题分析二、报错原因三、解决思路检查类路径配置检查依赖库检查类文件调试类加载器问题四、常见

Java注解之超越Javadoc的元数据利器详解

《Java注解之超越Javadoc的元数据利器详解》本文将深入探讨Java注解的定义、类型、内置注解、自定义注解、保留策略、实际应用场景及最佳实践,无论是初学者还是资深开发者,都能通过本文了解如何利用... 目录什么是注解?注解的类型内置注编程解自定义注解注解的保留策略实际用例最佳实践总结在 Java 编程

Java 实用工具类Spring 的 AnnotationUtils详解

《Java实用工具类Spring的AnnotationUtils详解》Spring框架提供了一个强大的注解工具类org.springframework.core.annotation.Annot... 目录前言一、AnnotationUtils 的常用方法二、常见应用场景三、与 JDK 原生注解 API 的

Java controller接口出入参时间序列化转换操作方法(两种)

《Javacontroller接口出入参时间序列化转换操作方法(两种)》:本文主要介绍Javacontroller接口出入参时间序列化转换操作方法,本文给大家列举两种简单方法,感兴趣的朋友一起看... 目录方式一、使用注解方式二、统一配置场景:在controller编写的接口,在前后端交互过程中一般都会涉及

Java中的StringBuilder之如何高效构建字符串

《Java中的StringBuilder之如何高效构建字符串》本文将深入浅出地介绍StringBuilder的使用方法、性能优势以及相关字符串处理技术,结合代码示例帮助读者更好地理解和应用,希望对大家... 目录关键点什么是 StringBuilder?为什么需要 StringBuilder?如何使用 St

使用Java将各种数据写入Excel表格的操作示例

《使用Java将各种数据写入Excel表格的操作示例》在数据处理与管理领域,Excel凭借其强大的功能和广泛的应用,成为了数据存储与展示的重要工具,在Java开发过程中,常常需要将不同类型的数据,本文... 目录前言安装免费Java库1. 写入文本、或数值到 Excel单元格2. 写入数组到 Excel表格

Java并发编程之如何优雅关闭钩子Shutdown Hook

《Java并发编程之如何优雅关闭钩子ShutdownHook》这篇文章主要为大家详细介绍了Java如何实现优雅关闭钩子ShutdownHook,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起... 目录关闭钩子简介关闭钩子应用场景数据库连接实战演示使用关闭钩子的注意事项开源框架中的关闭钩子机制1.

Maven中引入 springboot 相关依赖的方式(最新推荐)

《Maven中引入springboot相关依赖的方式(最新推荐)》:本文主要介绍Maven中引入springboot相关依赖的方式(最新推荐),本文给大家介绍的非常详细,对大家的学习或工作具有... 目录Maven中引入 springboot 相关依赖的方式1. 不使用版本管理(不推荐)2、使用版本管理(推