springboot2+activemq(pub+sub单机版本)

2024-06-20 06:08

本文主要是介绍springboot2+activemq(pub+sub单机版本),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1.项目依赖,pom.xml配置

<!--activemq-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<!--activemq连接池-->
<dependency><groupId>org.apache.activemq</groupId><artifactId>activemq-pool</artifactId><version>5.14.5</version>
</dependency>

2.yml文件配置:

spring:activemq:user: adminpassword: XXXXXXbroker-url: tcp://XXXXXXXXXXXXX:61616pool:enabled: truemax-connections: 10

3.对应的启动应用需要加注解@EnableJms

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.jms.annotation.EnableJms;@SpringBootApplication
@EnableJms
public class LenderApplication {private final static Logger logger = LoggerFactory.getLogger(LenderApplication.class);public static void main(String[] args) {SpringApplication.run(LenderApplication.class, args);logger.info("LenderApplication is success!");}
}

 

4.对应的配置bean的创建

package com.stylefeng.guns.config.mq;import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.listener.DefaultMessageListenerContainer;import javax.jms.Destination;@Configuration
public class ActiveMQConfig {@Value("${loanTopic}")private String loanTopic;@Value("${repayTopic}")private String repayTopic;@Value("${spring.activemq.user}")private String usrName;@Value("${spring.activemq.password}")private String password;@Value("${spring.activemq.broker-url}")private String brokerUrl;@Beanpublic Destination topic() {return new ActiveMQTopic(loanTopic);}/*@Beanpublic Queue queue(){return new ActiveMQQueue(queueName);}*/@Beanpublic Destination topic2() {return new ActiveMQTopic(repayTopic);}@Beanpublic RedeliveryPolicy redeliveryPolicy() {RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();//是否在每次尝试重新发送失败后,增长这个等待时间redeliveryPolicy.setUseExponentialBackOff(true);//重发次数,默认为6次   这里设置为10次redeliveryPolicy.setMaximumRedeliveries(2);//重发时间间隔,默认为1秒redeliveryPolicy.setInitialRedeliveryDelay(1);//第一次失败后重新发送之前等待500毫秒,第二次失败再等待500 * 2毫秒,这里的2就是valueredeliveryPolicy.setBackOffMultiplier(2);//是否避免消息碰撞redeliveryPolicy.setUseCollisionAvoidance(false);//设置重发最大拖延时间-1 表示没有拖延只有UseExponentialBackOff(true)为true时生效redeliveryPolicy.setMaximumRedeliveryDelay(-1);return redeliveryPolicy;}@Beanpublic ActiveMQConnectionFactory connectionFactory(RedeliveryPolicy redeliveryPolicy) {ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(usrName, password, brokerUrl);activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);activeMQConnectionFactory.setTrustAllPackages(true);return activeMQConnectionFactory;}@Beanpublic JmsTemplate jmsTemplate(ActiveMQConnectionFactory connectionFactory, Destination topic) {JmsTemplate jmsTemplate = new JmsTemplate();jmsTemplate.setDeliveryMode(2);//进行持久化配置 1表示非持久化,2表示持久化jmsTemplate.setConnectionFactory(connectionFactory);
//        jmsTemplate.setDefaultDestination(queue); //此处可不设置默认,在发送消息时也可设置队列jmsTemplate.setDefaultDestination(topic);jmsTemplate.setSessionAcknowledgeMode(4);//客户端签收模式
//        jmsTemplate.setDefaultDestinationName(loanTopic);return jmsTemplate;}@Beanpublic JmsTemplate jmsTemplate2(ActiveMQConnectionFactory connectionFactory, Destination topic2) {JmsTemplate jmsTemplate = new JmsTemplate();jmsTemplate.setDeliveryMode(2);//进行持久化配置 1表示非持久化,2表示持久化jmsTemplate.setConnectionFactory(connectionFactory);
//        jmsTemplate.setDefaultDestination(queue); //此处可不设置默认,在发送消息时也可设置队列jmsTemplate.setDefaultDestination(topic2);jmsTemplate.setSessionAcknowledgeMode(4);//客户端签收模式
//        jmsTemplate.setDefaultDestinationName(repayTopic);return jmsTemplate;}//定义一个消息监听器连接工厂,这里定义的是点对点模式的监听器连接工厂
//    @Bean(name = "jmsQueueListener")public DefaultJmsListenerContainerFactory jmsQueueListenerContainerFactory(ActiveMQConnectionFactory connectionFactory) {DefaultJmsListenerContainerFactory factory =new DefaultJmsListenerContainerFactory();factory.setConnectionFactory(connectionFactory);//设置连接数factory.setConcurrency("1-10");//重连间隔时间factory.setRecoveryInterval(1000L);factory.setSessionAcknowledgeMode(4);factory.setPubSubDomain(false);return factory;}@Bean(name = "jmsTopicListener")public JmsListenerContainerFactory<?> jmsTopicListenerContainerFactory(ActiveMQConnectionFactory connectionFactory) {DefaultJmsListenerContainerFactory factory =new DefaultJmsListenerContainerFactory();factory.setSessionTransacted(true);factory.setAutoStartup(true);factory.setConnectionFactory(connectionFactory);//设置连接数factory.setConcurrency("1-10");//重连间隔时间factory.setRecoveryInterval(1000L);//客户端签收模式factory.setSessionAcknowledgeMode(4);//使用监听模式factory.setPubSubDomain(true);//使用持久化factory.setSubscriptionDurable(true);//客户端ID,根据业务名称定义factory.setClientId("lenderLoadClient");return factory;}@Bean(name = "jmsTopicListener2")public JmsListenerContainerFactory<?> jmsTopicListenerContainerFactory2(ActiveMQConnectionFactory connectionFactory) {DefaultJmsListenerContainerFactory factory =new DefaultJmsListenerContainerFactory();factory.setSessionTransacted(true);factory.setAutoStartup(true);factory.setConnectionFactory(connectionFactory);//设置连接数factory.setConcurrency("1-10");//重连间隔时间factory.setRecoveryInterval(1000L);factory.setSessionAcknowledgeMode(4);factory.setPubSubDomain(true);factory.setSubscriptionDurable(true);factory.setClientId("lenderRepayClient");return factory;}}

ps:这里面使用持久化的监听模式,因为业务需要创建了两个监听工厂!

activemq安全性配置需要

http://activemq.apache.org/objectmessage.html

 activeMQConnectionFactory.setTrustAllPackages(true);

 

5.消费者消息监听

package com.stylefeng.guns.modular.message;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.stylefeng.guns.modular.orderinfo.service.IOrderInfoService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.TextMessage;@EnableJms
@Component
@Slf4j
public class LoanMessageListener {@Autowiredprivate IOrderInfoService orderInfoService;@JmsListener(destination = "loan.success.topic", containerFactory = "jmsTopicListener")public void onMessage(final TextMessage message, Session session) throws JMSException {try {log.info("=======接收到借款消息=={}", message);if (message instanceof TextMessage) {log.info("=======接收到借款消息內容=={}", message.getText());String result = message.getText();JSONObject jsonObject = JSON.parseObject(result);Long borrowId = jsonObject.getLongValue("borrowId");orderInfoService.updateOrderInfo(borrowId);log.info("===========borrowId={}", borrowId);message.acknowledge();}} catch (Exception e) {session.recover();}}}

7:一些说明总结

ps:使用了持久化订阅(消费在离线之后这点时间生产者产成的消息仍然可以在重新上线之后监听到)(防止消息丢失)

订阅模式消息能被多个监听的消息者获取到所有的消息:1:N(同时生产的所有消息都能被消费)(保证消息被消费者全部消费)

消费的重复消费问题:客户端去重

1.第一种,数据库保持数据的唯一性,如果往数据库里面新增一条数据可以根据id主键保证唯一性(单体应用)

2.分布式系统要消息的生产需要用到分布式ID,然后消费者可以把这个监听到的消费过的消息ID存入redis里面,再次监听消费时先查看redis里面是否存在消费过的消息ID。存在就不重复消费。

 

 

这篇关于springboot2+activemq(pub+sub单机版本)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1077315

相关文章

springboot集成easypoi导出word换行处理过程

《springboot集成easypoi导出word换行处理过程》SpringBoot集成Easypoi导出Word时,换行符n失效显示为空格,解决方法包括生成段落或替换模板中n为回车,同时需确... 目录项目场景问题描述解决方案第一种:生成段落的方式第二种:替换模板的情况,换行符替换成回车总结项目场景s

SpringBoot集成redisson实现延时队列教程

《SpringBoot集成redisson实现延时队列教程》文章介绍了使用Redisson实现延迟队列的完整步骤,包括依赖导入、Redis配置、工具类封装、业务枚举定义、执行器实现、Bean创建、消费... 目录1、先给项目导入Redisson依赖2、配置redis3、创建 RedissonConfig 配

SpringBoot中@Value注入静态变量方式

《SpringBoot中@Value注入静态变量方式》SpringBoot中静态变量无法直接用@Value注入,需通过setter方法,@Value(${})从属性文件获取值,@Value(#{})用... 目录项目场景解决方案注解说明1、@Value("${}")使用示例2、@Value("#{}"php

SpringBoot分段处理List集合多线程批量插入数据方式

《SpringBoot分段处理List集合多线程批量插入数据方式》文章介绍如何处理大数据量List批量插入数据库的优化方案:通过拆分List并分配独立线程处理,结合Spring线程池与异步方法提升效率... 目录项目场景解决方案1.实体类2.Mapper3.spring容器注入线程池bejsan对象4.创建

基于 Cursor 开发 Spring Boot 项目详细攻略

《基于Cursor开发SpringBoot项目详细攻略》Cursor是集成GPT4、Claude3.5等LLM的VSCode类AI编程工具,支持SpringBoot项目开发全流程,涵盖环境配... 目录cursor是什么?基于 Cursor 开发 Spring Boot 项目完整指南1. 环境准备2. 创建

Python一次性将指定版本所有包上传PyPI镜像解决方案

《Python一次性将指定版本所有包上传PyPI镜像解决方案》本文主要介绍了一个安全、完整、可离线部署的解决方案,用于一次性准备指定Python版本的所有包,然后导出到内网环境,感兴趣的小伙伴可以跟随... 目录为什么需要这个方案完整解决方案1. 项目目录结构2. 创建智能下载脚本3. 创建包清单生成脚本4

Spring Security简介、使用与最佳实践

《SpringSecurity简介、使用与最佳实践》SpringSecurity是一个能够为基于Spring的企业应用系统提供声明式的安全访问控制解决方案的安全框架,本文给大家介绍SpringSec... 目录一、如何理解 Spring Security?—— 核心思想二、如何在 Java 项目中使用?——

SpringBoot+RustFS 实现文件切片极速上传的实例代码

《SpringBoot+RustFS实现文件切片极速上传的实例代码》本文介绍利用SpringBoot和RustFS构建高性能文件切片上传系统,实现大文件秒传、断点续传和分片上传等功能,具有一定的参考... 目录一、为什么选择 RustFS + SpringBoot?二、环境准备与部署2.1 安装 RustF

springboot中使用okhttp3的小结

《springboot中使用okhttp3的小结》OkHttp3是一个JavaHTTP客户端,可以处理各种请求类型,比如GET、POST、PUT等,并且支持高效的HTTP连接池、请求和响应缓存、以及异... 在 Spring Boot 项目中使用 OkHttp3 进行 HTTP 请求是一个高效且流行的方式。

SpringBoot全局域名替换的实现

《SpringBoot全局域名替换的实现》本文主要介绍了SpringBoot全局域名替换的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一... 目录 项目结构⚙️ 配置文件application.yml️ 配置类AppProperties.Ja