【RocketMQ】消息过滤机制介绍及实践(TAG、SQL92、FilterServer)

2023-11-23 15:59

本文主要是介绍【RocketMQ】消息过滤机制介绍及实践(TAG、SQL92、FilterServer),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

  • 表达式过滤
    • TAG
      • 使用
      • 原理
    • SQL92
      • 使用
      • 原理
  • 类过滤
    • 使用
    • 原理

RocketMQ支持 表达式过滤类过滤两种模式 。

表达式过滤

其中表达式模式分为 TAGSQL92表达式两种。

TAG

顾名思义,TAG 模式就是简单地为消息定义标签,根据消息的标签进行匹配。

使用

1、在消息发送时,我们可以为每一条消息设置一个TAG标签,消息消费者订阅自己感兴趣的TAG, 一般使用的场景是,对于同一类的功能(如:数据同步)创建一个主题,但对于该主题下的数据,可能不同的系统关心的数据不一样,基础数据各个系统都需要同步,设置标签为ALL,而订单数据只有订单下游子系统关心,其他系统并不关心,则设置标签为ORDER,库存子系统则关注库存相关的数据,设置标签为CAPCITY

2、消费者组订阅相同的主题不同的TAG,多个TAG 用“|”分隔,注意 :同一个消费组订阅的主题, TAG必须相同。

Producer代码示例:

@Slf4j
public class TestTagFilterProducer {public static void main(String[] args) throws MQClientException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("zhurunhua", false);producer.setNamesrvAddr("172.20.10.42:9976;172.20.10.43:9976");producer.start();long l = System.currentTimeMillis();try {Message msg = new Message("test_topic","ORDER","OrderID188","Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult = producer.send(msg);System.out.printf("cost:%s-->%s%n", (System.currentTimeMillis() - l), sendResult);} catch (Exception e) {log.error("", e);}producer.shutdown();}
}

Consumer代码示例:

public class TestTagFilterConsumer {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test-filter");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.subscribe("test_topic", "ORDER");consumer.setNamesrvAddr("172.20.10.42:9976;172.20.10.43:9976");consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});consumer.start();System.out.printf("Consumer Started.%n");}
}

原理

消息发送者在消息发送时如果设置了消息的tags属性,存储在消息属性中,先存储在CommitLog文件中,然后转发到消息消费队列ConsumeQueue,消息消费队列会用 8 个字节存储消息tag的 hashcode,之所以不直接存储tag字符串,是因为将ConumeQueue设计为定长结构,加快消息消费的加载性能。在Broker端拉取消息时,遍历ConsumeQueue,只对比消息tag的hashoode,如果匹配则返回,否则忽略该消息。Consumer在收到消息后,同样需要先对消息进行过滤,只是此时比较的是消息tag的值而不再是hashcode。

为什么过滤要这样做?

  • Message Tag存储Hashcode,是为了在ConsumeQueue定长方式存储,节约空间;

  • 过滤过程中不会访问CommitLog数据,可以保证堆积情况下也能高效过滤;

  • 即使存在Hash冲突,也可以在Consumer端进行修正,保证万无一失;

  • 优点是简单高效,缺点就是在Hash冲突时,并不是消费者订阅的消息,还会向消费者发送 。

流程图

SQL92

TAG模式一个消息只能有一个标签,这对于复杂的场景可能不起作用。在这种情况下,可以使用SQL表达式筛选消息。SQL特性可以通过发送消息时的属性来进行计算,从而过滤出客户端订阅的消息。

使用

1、只有使用push模式的消费者才能用使用SQL92标准的sql语句;

2、使用Filter功能,需要在启动配置文件当中配置以下选项:enablePropertyFilter=true,否则会报错:

3、基本语法:

RocketMQ 仅仅提供了一些基本的语法来支持此特性:

  1. 数值比较: >, >=, <, <=, BETWEEN, =;
  2. 字符比较: =, <>, IN;
  3. IS NULL or IS NOT NULL;
  4. 逻辑: AND, OR, NOT;

常量类型如下:

  1. 数字, 如: 123, 3.1415;
  2. 字符, 如: ‘abc’, 必须是单引号;
  3. NULL, 特殊常量;
  4. 布尔, TRUE or FALSE;

Producer示例:

@Slf4j
public class TestSQL92FilterProducer {public static void main(String[] args) throws MQClientException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("zhurunhua", false);producer.setNamesrvAddr("172.20.10.42:9976;172.20.10.43:9976");producer.start();long l = System.currentTimeMillis();try {for (int i = 1; i <= 10; i++) {Message msg = new Message("test_topic","Apple","iPhone","测试过滤消息:SQL92".getBytes(StandardCharsets.UTF_8));//设置属性(模拟10台不同序列号的苹果12手机)msg.putUserProperty("name", "IPhone");msg.putUserProperty("serial", "12");msg.putUserProperty("color", "blue");msg.putUserProperty("sequence", String.valueOf(i));SendResult sendResult = producer.send(msg);System.out.printf("cost:%s-->%s%n", (System.currentTimeMillis() - l), sendResult);}} catch (Exception e) {log.error("", e);}producer.shutdown();}
}

Consumer示例:

public class TestSQL92FilterConsumer {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test-filter");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.setNamesrvAddr("172.20.10.42:9976;172.20.10.43:9976");//过滤蓝色 序列号大于5的String sql = "color = 'blue' and sequence > 5";consumer.subscribe("test_topic", MessageSelector.bySql(sql));consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});consumer.start();System.out.printf("Consumer Started.%n");}
}

原理

功能设计图:

1、Broker通过心跳请求收集Consumer的SQL表达式,并保存在ConsumerFilterManager中;

2、当使用者拉取消息时,Broker将构造一个带有已编译好的表达式和订阅数据的MessageFilter(接口),用以在CommitLog中选择匹配的消息;

但是这样拉取消息时做过滤性能差,所以采用BloomFilter和预计算的方法进行优化:

1、在Broker理注册时,每个Consumer都被分配了BloomFilter的某个bit上;

2、当消息写入CommitLog后,Broker会构建ConsumeQueue,此时会计算消费者对应的过滤结果,所有的结果会保存到ConsumeQueueExt的bit数组中;

3、ConsumeQueueExt是链接到ConsumeQueue的存储文件,ConsumeQueue会根据tagsCode找到数据,tagsCode存的是ConsumeQueueExt生成的地址信息;

4、ExpressionMessageFilter使用bit数组检查消息是否匹配。由于BloomFilter的冲突,它还需要解码消息属性来计算匹配的消息;

消费流程图

类过滤

类过滤模式,其实就相当于是启动一个FilterServer做消息中转,FilterServer本身就是一个Consumer,在拉取到消息后,经过用户自定义的过滤器,返回给消费者过滤好的数据,这个模式在2019年之后的版本中已经去掉了,大概是因为太麻烦,且对性能影响大。

使用

1、部署FilterServer

一般部署在Broker所在的机器,减少网络延迟,一个Broker最好部署多个FilterServer,

启动脚本为{ROCKETMQ_HOME}/bin/startfsrv.sh,需要在{ROCKETMQ_HOME}/conf下新增filtersrv.properites文件:

#nameServer 地址 分号分割
namesrvAddr=127.0.0.1:9876 
connectWhichBroker=127.0.0.1:10911

之后执行启动脚本即可,启动成功日志:

load config properties file OK, d:/rocketmq/conf/filtersrv.properties 
The Filter Server boot success, 192.168.1.3:62832

2、编写自定义FIlter类:

public class IphoneColorFIlter implements MessageFilter {@Overridepublic boolean match(MessageExt messageExt, FilterContext filterContext) {String color = messageExt.getUserProperty("color");if ("blue".equals(color)) {return true;}return false;}
}

3、Consumer代码:

public class TestClassFilterConsumer {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test-filter");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.setNamesrvAddr("172.20.10.42:9976;172.20.10.43:9976");//加载过滤器文件ClassLoader classLoader = Thread.currentThread().getContextClassLoader();File classFile = new File(classLoader.getResource("IphoneColorFilter.java").getFile());String file2String = MixAll.file2String(classFile);consumer.subscribe("test_topic", file2String);consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});consumer.start();System.out.printf("Consumer Started.%n");}
}

原理

1、Broker所在的服务器会启动多个FilterServer进程;

2、消费者在订阅消息主题时会上传一个自定义的消息过滤实现类,FilterServer加载并实例化;

3、Consumer向FilterServer发送消息拉取请求,FilterServer接收到消费者消息拉取请求后,FilterServer将消息拉取请求转发给Broker, Broker返回消息后,在FilterServer端执行消息过滤逻辑,然后返回符合订阅信息的消息给消息消费者进行消费;

通常消息消费者是直接向Broker订阅主题然后从Broker上拉取消息,类过滤模式的一个特别之处在于消息消费者是从FilterServer拉取消息。

流程图

参考:《RocketMQ技术内幕》

这篇关于【RocketMQ】消息过滤机制介绍及实践(TAG、SQL92、FilterServer)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/419075

相关文章

Spring WebFlux 与 WebClient 使用指南及最佳实践

《SpringWebFlux与WebClient使用指南及最佳实践》WebClient是SpringWebFlux模块提供的非阻塞、响应式HTTP客户端,基于ProjectReactor实现,... 目录Spring WebFlux 与 WebClient 使用指南1. WebClient 概述2. 核心依

MyBatis-Plus 中 nested() 与 and() 方法详解(最佳实践场景)

《MyBatis-Plus中nested()与and()方法详解(最佳实践场景)》在MyBatis-Plus的条件构造器中,nested()和and()都是用于构建复杂查询条件的关键方法,但... 目录MyBATis-Plus 中nested()与and()方法详解一、核心区别对比二、方法详解1.and()

Spring Boot @RestControllerAdvice全局异常处理最佳实践

《SpringBoot@RestControllerAdvice全局异常处理最佳实践》本文详解SpringBoot中通过@RestControllerAdvice实现全局异常处理,强调代码复用、统... 目录前言一、为什么要使用全局异常处理?二、核心注解解析1. @RestControllerAdvice2

Spring事务传播机制最佳实践

《Spring事务传播机制最佳实践》Spring的事务传播机制为我们提供了优雅的解决方案,本文将带您深入理解这一机制,掌握不同场景下的最佳实践,感兴趣的朋友一起看看吧... 目录1. 什么是事务传播行为2. Spring支持的七种事务传播行为2.1 REQUIRED(默认)2.2 SUPPORTS2

Java中的雪花算法Snowflake解析与实践技巧

《Java中的雪花算法Snowflake解析与实践技巧》本文解析了雪花算法的原理、Java实现及生产实践,涵盖ID结构、位运算技巧、时钟回拨处理、WorkerId分配等关键点,并探讨了百度UidGen... 目录一、雪花算法核心原理1.1 算法起源1.2 ID结构详解1.3 核心特性二、Java实现解析2.

Python中win32包的安装及常见用途介绍

《Python中win32包的安装及常见用途介绍》在Windows环境下,PythonWin32模块通常随Python安装包一起安装,:本文主要介绍Python中win32包的安装及常见用途的相关... 目录前言主要组件安装方法常见用途1. 操作Windows注册表2. 操作Windows服务3. 窗口操作

MySQL中的锁机制详解之全局锁,表级锁,行级锁

《MySQL中的锁机制详解之全局锁,表级锁,行级锁》MySQL锁机制通过全局、表级、行级锁控制并发,保障数据一致性与隔离性,全局锁适用于全库备份,表级锁适合读多写少场景,行级锁(InnoDB)实现高并... 目录一、锁机制基础:从并发问题到锁分类1.1 并发访问的三大问题1.2 锁的核心作用1.3 锁粒度分

MySQL 中 ROW_NUMBER() 函数最佳实践

《MySQL中ROW_NUMBER()函数最佳实践》MySQL中ROW_NUMBER()函数,作为窗口函数为每行分配唯一连续序号,区别于RANK()和DENSE_RANK(),特别适合分页、去重... 目录mysql 中 ROW_NUMBER() 函数详解一、基础语法二、核心特点三、典型应用场景1. 数据分

c++中的set容器介绍及操作大全

《c++中的set容器介绍及操作大全》:本文主要介绍c++中的set容器介绍及操作大全,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录​​一、核心特性​​️ ​​二、基本操作​​​​1. 初始化与赋值​​​​2. 增删查操作​​​​3. 遍历方

java向微信服务号发送消息的完整步骤实例

《java向微信服务号发送消息的完整步骤实例》:本文主要介绍java向微信服务号发送消息的相关资料,包括申请测试号获取appID/appsecret、关注公众号获取openID、配置消息模板及代码... 目录步骤1. 申请测试系统2. 公众号账号信息3. 关注测试号二维码4. 消息模板接口5. Java测试