三、Maven项目搭建及Destination(队列、主题)

2024-02-17 04:04

本文主要是介绍三、Maven项目搭建及Destination(队列、主题),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Maven项目搭建及Destination(队列、主题)

  • 一、Idea中Maven项目准备
    • 1.创建Module
    • 2.创建java包
    • 3.配置pom.xml
  • 二、队列(Queue)
    • 1.JMS编程架构
    • 2.代码实现生产者
    • 3.代码实现消费者
    • 4.队列消费者三大情况
  • 三、消费者类型
    • 1.同步式消费者
      • 1.1 一直阻塞
      • 1.2 超时阻塞
    • 2.异步监听消费者
  • 三、主题(Topic)
    • 1.发布主题生产者
    • 2.订阅主题消费者
  • 四、Queue和Topic对比

一、Idea中Maven项目准备

1.创建Module

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

2.创建java包

在这里插入图片描述
在这里插入图片描述

3.配置pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.qingsi.activemq</groupId><artifactId>activemq_test</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target></properties><dependencies><dependency><!-- activemq所需要的jar包配置 --><groupId>org.apache.activemq</groupId><artifactId>activemq-all</artifactId><version>5.15.11</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.xbean/xbean-spring --><dependency><groupId>org.apache.xbean</groupId><artifactId>xbean-spring</artifactId><version>4.15</version></dependency><!--  下面是junit/logback等基础配置  --><dependency><groupId>ch.qos.logback</groupId><artifactId>logback-classic</artifactId><version>1.2.3</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.16.18</version><scope>provided</scope></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.12</version></dependency></dependencies></project>

二、队列(Queue)

1.JMS编程架构

在这里插入图片描述

2.代码实现生产者

package com.qingsi.activemq;import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;public class JmsProduce {public static final String ACTIVEMQ_URL = "tcp://192.168.86.128:61616";public static final String QUEUE_NAME = "queue01";public static void main(String[] args) throws JMSException {// 1.创建连接工厂, 采用默认的用户名密码ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);// 2.通过连接工厂,获得connection并启动访问Connection connection = activeMQConnectionFactory.createConnection();connection.start();// 3.创建会话session// 两个参数,第一个叫事务,第二个叫签收Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);// 4.创建目的地(是队列还是主题)Queue queue = session.createQueue(QUEUE_NAME);// 5.创建消息的生产者MessageProducer producer = session.createProducer(queue);// 6.使用生产者生成3条消息发送到MQ队列for (int i = 1; i <= 3; i++) {// 7.创建消息TextMessage textMessage = session.createTextMessage("msg--" + i);// 最简单的字符串// 8.通过producer发送给mqproducer.send(textMessage);}// 9.关闭资源producer.close();session.close();connection.close();System.out.println("MQ消息发布完成");}}
  • 运行后,登录ActiveMQ后台查看
    在这里插入图片描述
  • 参数详解:
    • Number Of Pending Messages=等待消费的消息,这个是未出队列的数量,公式=总接收数-总出队列数。
    • Number Of Consumers=消费者数量,消费者端的消费者数量。
    • Messages Enqueued=进队消息数,进队列的总消息量,包括出队列的。这个数只增不减。
    • Messages Dequeued=出队消息数,可以理解为是消费者消费掉的数量。
  • 总结:
    • 当有一个消息进入这个队列时,等待消费的消息是1,进入队列的消息是1。
    • 当消息消费后,等待消费的消息是0,进入队列的消息是1,出队列的消息是1。
    • 当再来一条消息时,等待消费的消息是1,进入队列的消息就是2。

3.代码实现消费者

package com.qingsi.activemq;import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;public class JmsConsumer {public static final String ACTIVEMQ_URL = "tcp://192.168.86.128:61616";public static final String QUEUE_NAME = "queue01";public static void main(String[] args) throws JMSException {// 1.创建连接工厂, 采用默认的用户名密码ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);// 2.通过连接工厂,获得connection并启动访问Connection connection = activeMQConnectionFactory.createConnection();connection.start();// 3.创建会话session// 两个参数,第一个叫事务,第二个叫签收Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);// 4.创建目的地(是队列还是主题)Queue queue = session.createQueue(QUEUE_NAME);// 5.创建消息的消费者MessageConsumer consumer = session.createConsumer(queue);while (true){TextMessage textMessage = (TextMessage)consumer.receive();if(null != textMessage){System.out.println("消费者收到消息:" + textMessage.getText());}else {break;}}consumer.close();session.close();connection.close();}
}

在这里插入图片描述

  • 现象:
    • 还有1个消费者在连接
    • 消息入队3个
    • 消息出队3个

4.队列消费者三大情况

  • 先生产消息,只启动1号消费者,1号消费者可以消费消息。
  • 先生产消息,先启动1号消费者再启动2号消费者,2号消费者不能消费到消息,都被1号消费者消费了。
  • 先启动2个消费者,再生产6条消息。
    • 分析:这6条消息会被发送到同一个队列中。由于队列是点对点的消息传递模型,每条消息只能被一个消费者接收和处理。因此,在这种情况下,两个消费者会以轮询的方式交替接收这6条消息。也就是说,消费者1会接收3条消息,而消费者2会接收另外3条消息。

三、消费者类型

  • 队列和主题都有这种类型

1.同步式消费者

  • 同步消费者是一种阻塞式的消费方式,在接收到消息之后,消费者会暂停执行并等待消息的处理完成,然后再继续执行后续代码。同步消费者使用 receive() 方法来接收消息,它会一直阻塞直到接收到消息或超时。
  • 一直组合和固定时间阻塞:区别在于receive方法,是否传入时间

1.1 一直阻塞

  • 如果一直没有消息,那么会一直阻塞在receive方法
package com.qingsi.activemq;import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;public class JmsConsumer {public static final String ACTIVEMQ_URL = "tcp://192.168.86.128:61616";public static final String QUEUE_NAME = "queue01";public static void main(String[] args) throws JMSException {// 1.创建连接工厂, 采用默认的用户名密码ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);// 2.通过连接工厂,获得connection并启动访问Connection connection = activeMQConnectionFactory.createConnection();connection.start();// 3.创建会话session// 两个参数,第一个叫事务,第二个叫签收Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);// 4.创建目的地(是队列还是主题)Queue queue = session.createQueue(QUEUE_NAME);// 5.创建消息的消费者MessageConsumer consumer = session.createConsumer(queue);while (true){TextMessage textMessage = (TextMessage)consumer.receive();if(null != textMessage){System.out.println("消费者收到消息:" + textMessage.getText());}else {break;}}consumer.close();session.close();connection.close();}
}

1.2 超时阻塞

  • 如果一直没有消息,等待超过设定的时候,那么就结束receive方法的阻塞
package com.qingsi.activemq;import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;public class JmsConsumer {public static final String ACTIVEMQ_URL = "tcp://192.168.86.128:61616";public static final String QUEUE_NAME = "queue01";public static void main(String[] args) throws JMSException {// 1.创建连接工厂, 采用默认的用户名密码ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);// 2.通过连接工厂,获得connection并启动访问Connection connection = activeMQConnectionFactory.createConnection();connection.start();// 3.创建会话session// 两个参数,第一个叫事务,第二个叫签收Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);// 4.创建目的地(是队列还是主题)Queue queue = session.createQueue(QUEUE_NAME);// 5.创建消息的消费者MessageConsumer consumer = session.createConsumer(queue);while (true){TextMessage textMessage = (TextMessage)consumer.receive(4000L);if(null != textMessage){System.out.println("消费者收到消息:" + textMessage.getText());}else {break;}}consumer.close();session.close();connection.close();}
}

2.异步监听消费者

  • 异步消费者是一种非阻塞式的消费方式,它通过注册一个消息监听器(Message Listener)来接收消息,并在消息到达时异步地触发监听器进行消息处理。异步消费者不会暂停执行,而是继续执行后续代码。
package com.qingsi.activemq;import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;
import java.io.IOException;public class JmsConsumer {public static final String ACTIVEMQ_URL = "tcp://192.168.86.128:61616";public static final String QUEUE_NAME = "queue01";public static void main(String[] args) throws JMSException, IOException {// 1.创建连接工厂, 采用默认的用户名密码ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);// 2.通过连接工厂,获得connection并启动访问Connection connection = activeMQConnectionFactory.createConnection();connection.start();// 3.创建会话session// 两个参数,第一个叫事务,第二个叫签收Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);// 4.创建目的地(是队列还是主题)Queue queue = session.createQueue(QUEUE_NAME);// 5.创建消息的消费者MessageConsumer consumer = session.createConsumer(queue);// 6.通过监听的方式来消费消息consumer.setMessageListener(new MessageListener() {@Overridepublic void onMessage(Message message) {if (message instanceof TextMessage){TextMessage textMessage = (TextMessage) message;try {System.out.println("消费者消费消息:" + textMessage.getText());} catch (JMSException e) {e.printStackTrace();}}}});// 保证控制台不关掉System.in.read();consumer.close();session.close();connection.close();}
}

三、主题(Topic)

  • 注意:先启动订阅者再启动生产者,不然发送的消息是废消息

1.发布主题生产者

package com.qingsi.activemq;import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;public class JmsTopicProduce {public static final String ACTIVEMQ_URL = "tcp://192.168.86.128:61616";public static final String TOPIC_NAME = "topic01";public static void main(String[] args) throws JMSException {// 1.创建连接工厂, 采用默认的用户名密码ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);// 2.通过连接工厂,获得connection并启动访问Connection connection = activeMQConnectionFactory.createConnection();connection.start();// 3.创建会话session// 两个参数,第一个叫事务,第二个叫签收Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);// 4.创建目的地(是队列还是主题)Topic topic = session.createTopic(TOPIC_NAME);// 5.创建消息的生产者MessageProducer producer = session.createProducer(topic);// 6.使用生产者生成3条消息发送到MQ主题for (int i = 1; i <= 3; i++) {// 7.创建消息TextMessage textMessage = session.createTextMessage("msg-topic--" + i);// 最简单的字符串// 8.通过producer发送给mqproducer.send(textMessage);}// 9.关闭资源producer.close();session.close();connection.close();System.out.println("MQ消息发布到topic完成");}}

2.订阅主题消费者

package com.qingsi.activemq;import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;
import java.io.IOException;public class JmsTopicConsumer {public static final String ACTIVEMQ_URL = "tcp://192.168.86.128:61616";public static final String TOPIC_NAME = "topic01";public static void main(String[] args) throws JMSException, IOException {System.out.println("我是1号消费者");// 1.创建连接工厂, 采用默认的用户名密码ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);// 2.通过连接工厂,获得connection并启动访问Connection connection = activeMQConnectionFactory.createConnection();connection.start();// 3.创建会话session// 两个参数,第一个叫事务,第二个叫签收Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);// 4.创建目的地(是队列还是主题)Topic topic = session.createTopic(TOPIC_NAME);// 5.创建消息的消费者MessageConsumer consumer = session.createConsumer(topic);// 6.通过监听的方式来消费消息consumer.setMessageListener(new MessageListener() {@Overridepublic void onMessage(Message message) {if (message instanceof TextMessage){TextMessage textMessage = (TextMessage) message;try {System.out.println("我是1号消费者消费消息:" + textMessage.getText());} catch (JMSException e) {e.printStackTrace();}}}});// 保证控制台不关掉System.in.read();consumer.close();session.close();connection.close();}
}

在这里插入图片描述

  • 结果:运行了2个消费者,入队3条消息,每个消费者都要消费一次,那么总共消费了6条

四、Queue和Topic对比

比较的项目Topic队列模式Queue队列模式
工作模式"订阅-发布"模式,如果当前没有订阅者,消息将会被丢弃,如果有多个订阅者,那么这些订阅者都会收到消息"负载均衡"模式,如果当前没有消费者,消息也不会丢弃;如果有多个消费者,那么一条消息也只会发送给其中一个消费者,并且要求消费者ack信息
有无状态无状态Queue数据默认会在mq服务器上已文件形式保存,比如Active MQ一般保存在$AMQ_HOME\data\kr-store\data下面,也可以配置成DB存储
传递完整性如果没有订阅者,消息会被丢弃消息不会被丢弃
处理效率由于消息要按照订阅者的数量进行复制,所以处理性能会随着订阅者的增加而明显降低,并且还要结合不同消息协议自身的性能差异由于一条消息只发送给一个消费者,所以就算消费者再多,性能也不会有明显降低。当然不同消息协议的具体性能也是有差异的

这篇关于三、Maven项目搭建及Destination(队列、主题)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/716621

相关文章

vite搭建vue3项目的搭建步骤

《vite搭建vue3项目的搭建步骤》本文主要介绍了vite搭建vue3项目的搭建步骤,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学... 目录1.确保Nodejs环境2.使用vite-cli工具3.进入项目安装依赖1.确保Nodejs环境

Nginx搭建前端本地预览环境的完整步骤教学

《Nginx搭建前端本地预览环境的完整步骤教学》这篇文章主要为大家详细介绍了Nginx搭建前端本地预览环境的完整步骤教学,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录项目目录结构核心配置文件:nginx.conf脚本化操作:nginx.shnpm 脚本集成总结:对前端的意义很多

idea+spring boot创建项目的搭建全过程

《idea+springboot创建项目的搭建全过程》SpringBoot是Spring社区发布的一个开源项目,旨在帮助开发者快速并且更简单的构建项目,:本文主要介绍idea+springb... 目录一.idea四种搭建方式1.Javaidea命名规范2JavaWebTomcat的安装一.明确tomcat

pycharm跑python项目易出错的问题总结

《pycharm跑python项目易出错的问题总结》:本文主要介绍pycharm跑python项目易出错问题的相关资料,当你在PyCharm中运行Python程序时遇到报错,可以按照以下步骤进行排... 1. 一定不要在pycharm终端里面创建环境安装别人的项目子模块等,有可能出现的问题就是你不报错都安装

uni-app小程序项目中实现前端图片压缩实现方式(附详细代码)

《uni-app小程序项目中实现前端图片压缩实现方式(附详细代码)》在uni-app开发中,文件上传和图片处理是很常见的需求,但也经常会遇到各种问题,下面:本文主要介绍uni-app小程序项目中实... 目录方式一:使用<canvas>实现图片压缩(推荐,兼容性好)示例代码(小程序平台):方式二:使用uni

MyCat分库分表的项目实践

《MyCat分库分表的项目实践》分库分表解决大数据量和高并发性能瓶颈,MyCat作为中间件支持分片、读写分离与事务处理,本文就来介绍一下MyCat分库分表的实践,感兴趣的可以了解一下... 目录一、为什么要分库分表?二、分库分表的常见方案三、MyCat简介四、MyCat分库分表深度解析1. 架构原理2. 分

k8s搭建nfs共享存储实践

《k8s搭建nfs共享存储实践》本文介绍NFS服务端搭建与客户端配置,涵盖安装工具、目录设置及服务启动,随后讲解K8S中NFS动态存储部署,包括创建命名空间、ServiceAccount、RBAC权限... 目录1. NFS搭建1.1 部署NFS服务端1.1.1 下载nfs-utils和rpcbind1.1

linux查找java项目日志查找报错信息方式

《linux查找java项目日志查找报错信息方式》日志查找定位步骤:进入项目,用tail-f实时跟踪日志,tail-n1000查看末尾1000行,grep搜索关键词或时间,vim内精准查找并高亮定位,... 目录日志查找定位在当前文件里找到报错消息总结日志查找定位1.cd 进入项目2.正常日志 和错误日

在.NET项目中嵌入Python代码的实践指南

《在.NET项目中嵌入Python代码的实践指南》在现代开发中,.NET与Python的协作需求日益增长,从机器学习模型集成到科学计算,从脚本自动化到数据分析,然而,传统的解决方案(如HTTPAPI或... 目录一、CSnakes vs python.NET:为何选择 CSnakes?二、环境准备:从 Py

SpringBoot集成redisson实现延时队列教程

《SpringBoot集成redisson实现延时队列教程》文章介绍了使用Redisson实现延迟队列的完整步骤,包括依赖导入、Redis配置、工具类封装、业务枚举定义、执行器实现、Bean创建、消费... 目录1、先给项目导入Redisson依赖2、配置redis3、创建 RedissonConfig 配