消息队列在项目中的使用总结

2024-05-09 16:08

本文主要是介绍消息队列在项目中的使用总结,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一、传统通信模式的不足

常用的传统进程通信模式一般是client调用server的服务,等待server的响应。但是在网络情况不好或者在server需要较长的处理时间的时候,就可能导致client的调用失败或超时。业务场景中经常会有一些非常耗时的操作容易阻塞通信,就需要选择独立、耦合性低的消息中间件来完成业务系统间的交互和数据传递。

二、消息队列的优势

消息队列可以作为通信的中介,临时存放发送方信息,等待接收方领取。消息的发送者将消息放进消息队列后可以立即返回,不需要等待接收者的响应,消息会被保存在队列中,直到被接收者取出。消息队列的以下几个优点:

1、屏蔽异构平台的细节:发送方、接收方系统之间不需要了解双方,只需认识消息。

2、异步:消息堆积能力;发送方接收方不需同时在线,发送方接收方不需同时扩容(削峰)。

3、解耦:防止引入过多的API给系统的稳定性带来风险;调用方使用不当会给被调用方系统造成压力,被调用方处理不当会降低调用方系统的响应能力。

4、复用:一次发送多次消费

5、可靠:一次保证消息的传递。如果发送消息时接收者不可用,消息队列会保留消息,直到成功地传递它;

6、提供路由:发送者无需与接收者建立连接,双方通过消息队列保证消息能够从发送者路由到接收者,甚至对于本来网络不易互通的两个服务,也可以提供消息路由。

 

三、消息队列在项目中的使用

在最近一个批量删除公会的需求里,由于批量删除操作涉及大量耗时操作,很容易造成超时和失败。我们通过使用消息队列,把删除请求和实际删除操作解耦开来, 异步处理。Server只负责把接受到删除的请求放进消息队列和限制用户的请求频率,请求放进MQ后迅速返回成功给前端,并不进行实际的业务删除操作。后台的daemon负责不断从消息队列中获取请求并执行删除操作,操作完成后通过公众号消息推送结果给用户。

四、Hippo消息队列的介绍与使用

1)Producer和Consumer在初始化的时候确定自己的Group和读取配置文件;producer发布topic,然后发送消息到该topic。Consumer先订阅某个topic,然后就可以从该topic取消息。这是消息队列的简化模型。

(Hippo系统实际存在四种角色,分别为生产者,消费者,存储层,中心控制节点,这里简化掉控制节点)

单个收发流程模型:

收发消息的基本模式:

对于一个Topic下的一条消息,属于同一个Group的不同consumer只能有一个可以消费,一个consumer消费后其他consumer就不能再消费了;

而不同Group下的consumer可以重复消费同一条消息,某个Group的consumer消费后不影响其他Group的consumer消费同一条消息。

P2P模式: 

所有的consumer都属于同一个Group;每个消息只能有一个消费者 。所有订阅了该topic的消费者都有资格取,先到先得。

publish/subscribe模式:

  每个consumer有自己的Group, 每个消息可以分发给多个消费者。

2)每个Topic都有n个Queue,topic通过负载均衡的算法以确定每个消费者具体消费哪个队列分区,分发队列的消息给consumer。consumer进行消费的过程中对队列分区是以独占的形式存在的,即一个队列在一个消费组中只能被一个消费者占有并消费其他没分配队列的消费者只能轮空为了保证消费的可靠对于每次拉取的数据,都需要consumer端在消费完成之后进行一次确认,否则下次拉取还是从原来的偏移量开始。为了保证可用性,在consumer拉取消息和确认回调这段锁定时间有一个超时机制,超过这个时间队列会自动解锁,被别的消费者消费。也可在配置文件设置为默认处理消息成功,而把处理失败的重试放到业务中实现。目前消息在队列可保存两天。

 3)异步接口和同步接口:producer和consumer都有提供同步接口和异步接口。对于异步模式,应用程序要根据自己业务逻辑写具体的回调处理函数;客户端不缓存消息,发送失败的消息包若业务要重发,则业务程序逻辑实现上要有缓存处理;异步模式下发送和拉取消息立即返回,结果通过回调函数返回。producer的发送接口若采用异步可提高一些性能。

 

五、hippo的申请步骤

1)上tdbank.oa.com登记提单申请,根据业务需要填写业务信息,数据信息选hippo接入,提交申请。

 

2)提交申请后等待相关负责人审核(审核联系gosonzhang,carylu,kennyjiang),拿到测试环境的topic和正式环境的topic与master_addr,就可以进行配置。

六、hippoclient配置文件

 

1)hippo_auto_confirm_in_getmessage作用是减少一次消息交互提高消费性能。为0表示需要主动调用confirm才会认为数据已消费,为1就是消费消息的时候连带确认之前信息已处理,但是最后一次消费请求需要业务手动确认。

2)hippo_consume_from_where有三个参数:-1,0,1。

为-1是指consumer可以接收到保存在所订阅topic的所有消息,不管有没有被消费过;

为0是指consumer可以接收所有还没被所属Group消费过消息;

为1是指consumer只接收订阅之后topic的最新消息。

3)hippo_master_addr是定义Hippo服务器Master地址的列表,该部分需要与Hippo业务负责人联系提供。

 

七、后台daemon的编写

为了让hippo的一些接口支持我们小组项目的协议,我们对hippo做了一些封装,支持了wup协议和jce编码,添加登录态存储,模调和流水。业务层使用过程中只需要调用SendMessage(放消息到MQ)和BlockGetMessage(从MQ拉取消息)两个接口。

收发消息前先要初始化三个公共参数:

conf_file 是hippoclient.conf(hippo的配置文件)的路径;

Group是客户端组名,按照设计生产和消费是由一组客户端完成,这里用于标记每个客户端所属的组;

Topic是消息的主题,一个Topic里有多个queue,一个queue可有多个message。

1)发送消息到MQ的函数

string sConfFile ="conf/hippoclient.conf";
string sGroup = “gc_league_cond_del”;
string sTopic = “gc_league_cond_del”;//初始化producer
CGcHippoProducer producer(sConfFile, sGroup); STestReq stReq;
SUinSession stSession
//发送消息到MQ
iRet = producer.SendMessage(sTopic, stSession, stReq);
if (iRet != 0)
{
ERROR_LOG("consumer.GetMessage error, iRet:%d, sTopic:%s", iRet, sTopic.c_str());
return iRet;
}
//Session是登录态,stReq放进MQ的消息,支持jce和string类型。调用sendMessage把请求信息stReq放进MQ里。

2)从MQ中拉取消息的函数:

string sConfFile ="conf/hippoclient.conf";
string sGroup = “gc_league_cond_del”;
string sTopic = “gc_league_cond_del”;//初始化consumer
CGcHippoConsumer consumer(sConfFile, sGroup);string sReq; 
SUinSession stSession;
//阻塞式拉取消息
iRet = consumer.BlockGetMessage(sTopic, stSession, _sReq);
if (iRet != 0)
{
ERROR_LOG("consumer.GetMessage error, iRet:%d, sTopic:%s", iRet, sTopic.c_str());
return iRet;
}
//stSession是登录态,sReq是从MQ里取得的消息,同时支持jce和string类型。调用BlockGetMessage接口阻塞地从MQ中取出登录态和消息。

3)使用进程池封装后的MQ函数:

由于hippo的并发能力受到配置的queue限制,拉取消息的consumer数量不能多于queue数(多余的consumer做无用功)。所以增加一个模型:consumer进程单纯拉取消息,放进共享内存,由进程池从共享内存中获取消息进行逻辑处理。一个consumer进程占用一个进程池,所以不同consumer进程使用的共享内存的配置文件名要不一样!进程池的封装实现:

 

//进程池的逻辑处理函数
int PLineDoJob(const char* pData,int iLen)
{
     int iRet = 0;               iRet=DoJobWrap<STesteq>(*this, &CTest::DoLogic, pData, iLen);return 0;
}
//执行逻辑调用
int DoLogic(STestReq stReq, SUinSession stSession)
{doSomething();return 0;
}
string sConfFile ="conf/hippoclient.conf";
string sGroup = “gc_league_cond_del”;
string sTopic = “gc_league_cond_del”;
//循环拉取信息放进本地内存消息队列
Int AddJobToShmCycle(sConfFile, sGroup, sTopic, iMsgNum)
{CGcHippoConsumer consumer(sConfFile, sGroup);
    while(!gbExitFlag)              {
    //循环拉取消息TJceReq stReq;SUinSession stSession;iRet = consumer.BlockGetMessage(sTopic, stSession, stReq);
    if (iRet != 0){ERROR_LOG("consumer.GetMessage error,iRet:%d, sTopic:%s", iRet, sTopic.c_str());
        return iRet;}//将消息放进本地内存消息队列,当队列满了就睡眠等待
iRet=PLineAddJob((char*)it->getData(),it->getDataLength());
//省略错误处理...
}

详细的原生接口信息请参考:

http://km.oa.com/group/20523/articles/show/230802

八、DNS的配置

现网上线需要配置DNS,参考文章:

http://km.oa.com/group/20523/docs/show/140446

修改配置需要root权限,可自己编写后置脚本使用织云的root用户权限下发到服务器,

配置完成验证ping tl-if-nn-tdw, 能ping通证明配置成功了。

 

九、在织云上打包和部署

1、首先在织云创建 后台server包

 

2、bin目录下上传daemon可执行文件

3、进程的一些配置

监控进程列表的gc_league_cond_del_daemon_new:2,20中,前面是进程名称,2,20是指保持进程数在2~20的范围。如果进程数超出范围,监控脚本会kill掉所有gc_league_cond_del_daemon_new进程,然后重启程序。

kill进程信号为停止进程发出的信号,为保证daemon程序安全退出,可为daemon注册一个信号处理函数,执行完一个操作再退出。

启动方式是启动daemon的脚本,可根据具体需要修改。

4、保存数据,提交版本

后面几个选项根据具体需要修改,

完成之后就可以保存提交了。

 

十、测试报告

1)QPS与consumer个数的关系

条件: topic队列数固定为4个;(理论上支持4个consumer进程同时并发执行) ,consumer单次拉取消息量固定为100;业务处理时间为10ms。

不同进程数下拉取消息和发送消息的QPS数据如下:

 

结论:

consumer数量在和topic的队列数相同的时候consumer拉取消息的qps接近峰值,之后consumer继续增加对qps的增长便不太明显了,因为消费者对队列是独占的,多于队列数的消费者进程都在做无用功。

最佳Consumer数= 队列数。

 

2) QPS与队列数的关系

条件:

consumer个数等于队列数,单次拉取消息量固定为10;

结论:

队列数和consumer数的越大,拉取消息qps越大,但不是线性增长。

 

 

3)QPS与单次拉取消息数的关系

条件:

topic队列数固定为4个;consumer个数固定为4个。

结论:

在进程数不变的情况下,单次拉取消息越多,拉取消息的QPS越大,增长速度变慢,并非线性关系。

 

4)QPS和进程池大小/业务处理时间的关系

条件:

consumer数量设置为4,和队列数一致,consumer每次拉取10条消息,每个consumer占用一个进程池。

下面是使用进程池后不同消费者数下的处理消息的QPS:


结论:

可看出处理消息耗时的瓶颈主要是业务处理总时间和拉取消息总时间,总耗时是业务总时间和拉取消息总时间的最大值,而拉取消息总时间是不可控的,通过调整进程数可减少业务总时间,使得总耗时接近拉取消息总时间,达到最佳,qps也就达到峰值,因此要根据业务处理时间确定进程池进程数。

每个consumer开一个进程池,

进程池进程数=处理时间*单次拉取数/单次拉取时间

这篇关于消息队列在项目中的使用总结的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/973850

相关文章

使用Array实现Java堆栈

本教程给出了使用Array 实现Stack数据结构的示例。堆栈提供将新对象放在堆栈上(方法push())并从堆栈中获取对象(方法pop())。堆栈根据后进先出(LIFO)返回对象。请注意,JDK提供了一个默认的Java堆栈实现作为类java.util.Stack。 适用于所有堆栈实现的两个强制操作是: push():数据项放置在堆栈指针指向的位置。pop():从堆栈指针指向的位置删除并返回数据

Spring Boot - 使用类类型信息获取所有已加载的bean

Spring启动会在内部加载大量bean,以最少的配置运行您的应用程序。在这个例子中,我们将学习如何找出所有那些Spring boot加载的bean及其类类型信息。 使用ApplicationContext获取所有已加载的bean 要自动执行方法,当应用程序完全加载时,我正在使用CommandLineRunner接口。CommandLineRunner用于指示bean 在Spring应用程序中

使用ThreadPoolExecutor创建线程池有哪些关键参数

1、ThreadPoolExecutor类的全参数构造方法: /*** Creates a new {@code ThreadPoolExecutor} with the given initial* parameters.** @param corePoolSize the number of threads to keep in the pool, even* if they

关于Java的数组的使用

关于一维数组的使用 代码示例一如下: package com;public class test_array {public static void main(String[] args){//1.如何定义 一个 数组//1.1数组的声明String[] names;int[] scores;//1.2数组的初始化://1.2.1静态初始化:初始化数组与数组元素赋值同时进行nam

Android_03_数据库的使用总结

前言: 1>区分SQL和SQLite SQL 是一门 ANSI 的标准计算机语言,用来访问和操作数据库系统。SQL 语句用于取回和更新数据库中的数据。 SQL 可与数据库程序协同工作,比如 MS Access、DB2、Informix、MS SQL Server、Oracle、Sybase 以及其他数据库系统。 不幸地是,存在着很多不同版本的 SQL 语言,但是为了与 ANSI 标准相

Android_02_关于SharePreferences的使用

前言: 我们使用SharePreferences的主要目的是针对一些简单的数据进行存取,其是通过键值对来存取的, 其实质是通过xml文件进行保存的;对于一些简单数据的存取,我们可以用SharePreferences,替代 其他几种复杂的数据存取的方式,比如文件的读写或者数据库的操作; 本示例演示的内容是:通过SharedPreferences来实现记住密码的功能,无需第二次再输入密码

Android图片轮播的实现总结

前言: 在很多app中,我们都可以看到几张图片每隔一段时间就切换一下,这就是我们所称的图片轮播的功能,其主要实现就是用到了ViewPager, 下面我们来着重讲解一下其具体实现 效果图: 步骤一:在XML中添加ViewPager控件 比如: <?xml version="1.0" encoding="utf-8"?><RelativeLayout xmlns:a

自定义ViewGroup的总结(侧滑特效)

前言: 和自定义View控件一样,我们有时也需要自定义我们想要的ViewGroup,那么此时,我们就需要让其继承ViewGroup,然后重写 里边的onMeasure()和onLayout()方法,下面以侧滑特效为例,来讲解一下自定义ViewGroup所需的流程,关于侧滑特效, 其整体效果图如下: 对于自定义ViewGroup,主要有以下几步: 步骤一:编写ViewGroup

自定义View的总结(自定义滑动开关)

前言: 由于有些控件,在android中样式比较挫,并不能满足我们的需求,此时,我们可以将其进行一个自定义,下面一以一个自定义编写的ToggleButton为例, 来简要说明下,自定义所涉及到的一些步骤;以下是自定义控件ToggleButton的效果图: 其是由两张图片组成的: <1>   <2>  下面我们通过这个示例,来说明下,如何编写一个自定义view控件!!

关于隐藏Android标题栏总结

1>区分状态栏/标题栏/导航栏 状态栏(Status Bar) 标题栏(Title Bar) 导航栏(Navigation Bar) 2>区分Title Bar/Action Bar/Tool Bar Title Bar就是我们所俗称的标题栏,在Android 3.0 (API level 11)的时候,引入的Action Bar,其就是用来取代Title Bar的,