springboot2+activemq(pub+sub单机版本)

2024-06-20 06:08

本文主要是介绍springboot2+activemq(pub+sub单机版本),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1.项目依赖,pom.xml配置

<!--activemq-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<!--activemq连接池-->
<dependency><groupId>org.apache.activemq</groupId><artifactId>activemq-pool</artifactId><version>5.14.5</version>
</dependency>

2.yml文件配置:

spring:activemq:user: adminpassword: XXXXXXbroker-url: tcp://XXXXXXXXXXXXX:61616pool:enabled: truemax-connections: 10

3.对应的启动应用需要加注解@EnableJms

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.jms.annotation.EnableJms;@SpringBootApplication
@EnableJms
public class LenderApplication {private final static Logger logger = LoggerFactory.getLogger(LenderApplication.class);public static void main(String[] args) {SpringApplication.run(LenderApplication.class, args);logger.info("LenderApplication is success!");}
}

 

4.对应的配置bean的创建

package com.stylefeng.guns.config.mq;import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.listener.DefaultMessageListenerContainer;import javax.jms.Destination;@Configuration
public class ActiveMQConfig {@Value("${loanTopic}")private String loanTopic;@Value("${repayTopic}")private String repayTopic;@Value("${spring.activemq.user}")private String usrName;@Value("${spring.activemq.password}")private String password;@Value("${spring.activemq.broker-url}")private String brokerUrl;@Beanpublic Destination topic() {return new ActiveMQTopic(loanTopic);}/*@Beanpublic Queue queue(){return new ActiveMQQueue(queueName);}*/@Beanpublic Destination topic2() {return new ActiveMQTopic(repayTopic);}@Beanpublic RedeliveryPolicy redeliveryPolicy() {RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();//是否在每次尝试重新发送失败后,增长这个等待时间redeliveryPolicy.setUseExponentialBackOff(true);//重发次数,默认为6次   这里设置为10次redeliveryPolicy.setMaximumRedeliveries(2);//重发时间间隔,默认为1秒redeliveryPolicy.setInitialRedeliveryDelay(1);//第一次失败后重新发送之前等待500毫秒,第二次失败再等待500 * 2毫秒,这里的2就是valueredeliveryPolicy.setBackOffMultiplier(2);//是否避免消息碰撞redeliveryPolicy.setUseCollisionAvoidance(false);//设置重发最大拖延时间-1 表示没有拖延只有UseExponentialBackOff(true)为true时生效redeliveryPolicy.setMaximumRedeliveryDelay(-1);return redeliveryPolicy;}@Beanpublic ActiveMQConnectionFactory connectionFactory(RedeliveryPolicy redeliveryPolicy) {ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(usrName, password, brokerUrl);activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);activeMQConnectionFactory.setTrustAllPackages(true);return activeMQConnectionFactory;}@Beanpublic JmsTemplate jmsTemplate(ActiveMQConnectionFactory connectionFactory, Destination topic) {JmsTemplate jmsTemplate = new JmsTemplate();jmsTemplate.setDeliveryMode(2);//进行持久化配置 1表示非持久化,2表示持久化jmsTemplate.setConnectionFactory(connectionFactory);
//        jmsTemplate.setDefaultDestination(queue); //此处可不设置默认,在发送消息时也可设置队列jmsTemplate.setDefaultDestination(topic);jmsTemplate.setSessionAcknowledgeMode(4);//客户端签收模式
//        jmsTemplate.setDefaultDestinationName(loanTopic);return jmsTemplate;}@Beanpublic JmsTemplate jmsTemplate2(ActiveMQConnectionFactory connectionFactory, Destination topic2) {JmsTemplate jmsTemplate = new JmsTemplate();jmsTemplate.setDeliveryMode(2);//进行持久化配置 1表示非持久化,2表示持久化jmsTemplate.setConnectionFactory(connectionFactory);
//        jmsTemplate.setDefaultDestination(queue); //此处可不设置默认,在发送消息时也可设置队列jmsTemplate.setDefaultDestination(topic2);jmsTemplate.setSessionAcknowledgeMode(4);//客户端签收模式
//        jmsTemplate.setDefaultDestinationName(repayTopic);return jmsTemplate;}//定义一个消息监听器连接工厂,这里定义的是点对点模式的监听器连接工厂
//    @Bean(name = "jmsQueueListener")public DefaultJmsListenerContainerFactory jmsQueueListenerContainerFactory(ActiveMQConnectionFactory connectionFactory) {DefaultJmsListenerContainerFactory factory =new DefaultJmsListenerContainerFactory();factory.setConnectionFactory(connectionFactory);//设置连接数factory.setConcurrency("1-10");//重连间隔时间factory.setRecoveryInterval(1000L);factory.setSessionAcknowledgeMode(4);factory.setPubSubDomain(false);return factory;}@Bean(name = "jmsTopicListener")public JmsListenerContainerFactory<?> jmsTopicListenerContainerFactory(ActiveMQConnectionFactory connectionFactory) {DefaultJmsListenerContainerFactory factory =new DefaultJmsListenerContainerFactory();factory.setSessionTransacted(true);factory.setAutoStartup(true);factory.setConnectionFactory(connectionFactory);//设置连接数factory.setConcurrency("1-10");//重连间隔时间factory.setRecoveryInterval(1000L);//客户端签收模式factory.setSessionAcknowledgeMode(4);//使用监听模式factory.setPubSubDomain(true);//使用持久化factory.setSubscriptionDurable(true);//客户端ID,根据业务名称定义factory.setClientId("lenderLoadClient");return factory;}@Bean(name = "jmsTopicListener2")public JmsListenerContainerFactory<?> jmsTopicListenerContainerFactory2(ActiveMQConnectionFactory connectionFactory) {DefaultJmsListenerContainerFactory factory =new DefaultJmsListenerContainerFactory();factory.setSessionTransacted(true);factory.setAutoStartup(true);factory.setConnectionFactory(connectionFactory);//设置连接数factory.setConcurrency("1-10");//重连间隔时间factory.setRecoveryInterval(1000L);factory.setSessionAcknowledgeMode(4);factory.setPubSubDomain(true);factory.setSubscriptionDurable(true);factory.setClientId("lenderRepayClient");return factory;}}

ps:这里面使用持久化的监听模式,因为业务需要创建了两个监听工厂!

activemq安全性配置需要

http://activemq.apache.org/objectmessage.html

 activeMQConnectionFactory.setTrustAllPackages(true);

 

5.消费者消息监听

package com.stylefeng.guns.modular.message;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.stylefeng.guns.modular.orderinfo.service.IOrderInfoService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.TextMessage;@EnableJms
@Component
@Slf4j
public class LoanMessageListener {@Autowiredprivate IOrderInfoService orderInfoService;@JmsListener(destination = "loan.success.topic", containerFactory = "jmsTopicListener")public void onMessage(final TextMessage message, Session session) throws JMSException {try {log.info("=======接收到借款消息=={}", message);if (message instanceof TextMessage) {log.info("=======接收到借款消息內容=={}", message.getText());String result = message.getText();JSONObject jsonObject = JSON.parseObject(result);Long borrowId = jsonObject.getLongValue("borrowId");orderInfoService.updateOrderInfo(borrowId);log.info("===========borrowId={}", borrowId);message.acknowledge();}} catch (Exception e) {session.recover();}}}

7:一些说明总结

ps:使用了持久化订阅(消费在离线之后这点时间生产者产成的消息仍然可以在重新上线之后监听到)(防止消息丢失)

订阅模式消息能被多个监听的消息者获取到所有的消息:1:N(同时生产的所有消息都能被消费)(保证消息被消费者全部消费)

消费的重复消费问题:客户端去重

1.第一种,数据库保持数据的唯一性,如果往数据库里面新增一条数据可以根据id主键保证唯一性(单体应用)

2.分布式系统要消息的生产需要用到分布式ID,然后消费者可以把这个监听到的消费过的消息ID存入redis里面,再次监听消费时先查看redis里面是否存在消费过的消息ID。存在就不重复消费。

 

 

这篇关于springboot2+activemq(pub+sub单机版本)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1077315

相关文章

SpringBoot中四种AOP实战应用场景及代码实现

《SpringBoot中四种AOP实战应用场景及代码实现》面向切面编程(AOP)是Spring框架的核心功能之一,它通过预编译和运行期动态代理实现程序功能的统一维护,在SpringBoot应用中,AO... 目录引言场景一:日志记录与性能监控业务需求实现方案使用示例扩展:MDC实现请求跟踪场景二:权限控制与

Java 实用工具类Spring 的 AnnotationUtils详解

《Java实用工具类Spring的AnnotationUtils详解》Spring框架提供了一个强大的注解工具类org.springframework.core.annotation.Annot... 目录前言一、AnnotationUtils 的常用方法二、常见应用场景三、与 JDK 原生注解 API 的

Maven中引入 springboot 相关依赖的方式(最新推荐)

《Maven中引入springboot相关依赖的方式(最新推荐)》:本文主要介绍Maven中引入springboot相关依赖的方式(最新推荐),本文给大家介绍的非常详细,对大家的学习或工作具有... 目录Maven中引入 springboot 相关依赖的方式1. 不使用版本管理(不推荐)2、使用版本管理(推

在 Spring Boot 中实现异常处理最佳实践

《在SpringBoot中实现异常处理最佳实践》本文介绍如何在SpringBoot中实现异常处理,涵盖核心概念、实现方法、与先前查询的集成、性能分析、常见问题和最佳实践,感兴趣的朋友一起看看吧... 目录一、Spring Boot 异常处理的背景与核心概念1.1 为什么需要异常处理?1.2 Spring B

如何在 Spring Boot 中实现 FreeMarker 模板

《如何在SpringBoot中实现FreeMarker模板》FreeMarker是一种功能强大、轻量级的模板引擎,用于在Java应用中生成动态文本输出(如HTML、XML、邮件内容等),本文... 目录什么是 FreeMarker 模板?在 Spring Boot 中实现 FreeMarker 模板1. 环

SpringMVC 通过ajax 前后端数据交互的实现方法

《SpringMVC通过ajax前后端数据交互的实现方法》:本文主要介绍SpringMVC通过ajax前后端数据交互的实现方法,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价... 在前端的开发过程中,经常在html页面通过AJAX进行前后端数据的交互,SpringMVC的controll

Spring Security自定义身份认证的实现方法

《SpringSecurity自定义身份认证的实现方法》:本文主要介绍SpringSecurity自定义身份认证的实现方法,下面对SpringSecurity的这三种自定义身份认证进行详细讲解,... 目录1.内存身份认证(1)创建配置类(2)验证内存身份认证2.JDBC身份认证(1)数据准备 (2)配置依

SpringBoot整合OpenFeign的完整指南

《SpringBoot整合OpenFeign的完整指南》OpenFeign是由Netflix开发的一个声明式Web服务客户端,它使得编写HTTP客户端变得更加简单,本文为大家介绍了SpringBoot... 目录什么是OpenFeign环境准备创建 Spring Boot 项目添加依赖启用 OpenFeig

Java Spring 中 @PostConstruct 注解使用原理及常见场景

《JavaSpring中@PostConstruct注解使用原理及常见场景》在JavaSpring中,@PostConstruct注解是一个非常实用的功能,它允许开发者在Spring容器完全初... 目录一、@PostConstruct 注解概述二、@PostConstruct 注解的基本使用2.1 基本代

springboot使用Scheduling实现动态增删启停定时任务教程

《springboot使用Scheduling实现动态增删启停定时任务教程》:本文主要介绍springboot使用Scheduling实现动态增删启停定时任务教程,具有很好的参考价值,希望对大家有... 目录1、配置定时任务需要的线程池2、创建ScheduledFuture的包装类3、注册定时任务,增加、删