RocketMQ sql92的使用及原理简单分析附源码

2023-11-02 14:52

本文主要是介绍RocketMQ sql92的使用及原理简单分析附源码,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

这里是weihubeats,觉得文章不错可以关注公众号小奏技术,文章首发。拒绝营销号,拒绝标题党

RocketMQ 版本

  • 5.1.0

RokcetMQ消息过滤

目前官方支持的消息过滤方式主要有两种

  • tag
  • sql92

我们可以通过查看ExpressionType的源码证明

tag过滤方式是现在最为常用的过滤方式,但是一个消息只能包含一个tag。

对于相对复杂的消息过滤场景tag过滤方式可能就不够用了,但是绝大多数业务场景tag过滤方式已经够用了。

sql92过滤方式可以有助于我们实现一些高级功能,比如RocketMQ的多测试环境消息隔离等。

这里就暂时不过多讨论sql92的具体使用场景,我们还是先来学习怎么使用sql92

sql92 语法规则

语法说明示例
IS NULL判断属性不存在。a IS NULL :属性a不存在。
IS NOT NULL判断属性存在。a IS NOT NULL:属性a存在。
> >= < <=用于比较数字,不能用于比较字符串,否则消费者客户端启动时会报错。 说明 可转化为数字的字符串也被认为是数字。a IS NOT NULL AND a > 100:属性a存在且属性a的值大于100。 a IS NOT NULL AND a > ‘abc’:错误示例,abc为字符串,不能用于比较大小。
BETWEEN xxx AND xxx用于比较数字,不能用于比较字符串,否则消费者客户端启动时会报错。等价于>= xxx AND <= xxx。表示属性值在两个数字之间。a IS NOT NULL AND (a BETWEEN 10 AND 100):属性a存在且属性a的值大于等于10且小于等于100。
NOT BETWEEN xxx AND xxx用于比较数字,不能用于比较字符串,否则消费者客户端启动会报错。等价于< xxx OR > xxx,表示属性值在两个值的区间之外。a IS NOT NULL AND (a NOT BETWEEN 10 AND 100):属性a存在且属性a的值小于10或大于100。
IN (xxx, xxx)表示属性的值在某个集合内。集合的元素只能是字符串。a IS NOT NULL AND (a IN (‘abc’, ‘def’)):属性a存在且属性a的值为abc或def。
= <>等于和不等于。可用于比较数字和字符串。a IS NOT NULL AND (a = ‘abc’ OR a<>‘def’):属性a存在且属性a的值为abc或a的值不为def。
AND OR逻辑与、逻辑或。可用于组合任意简单的逻辑判断,需要将每个逻辑判断内容放入括号内。a IS NOT NULL AND (a > 100) OR (b IS NULL):属性a存在且属性a的值大于100或属性b不存在。

由于SQL属性过滤是生产者定义消息属性,消费者设置SQL过滤条件,因此过滤条件的计算结果具有不确定性,服务端的处理方式如下:

  • 异常情况处理:如果过滤条件的表达式计算抛异常,消息默认被过滤,不会被投递给消费者。例如比较数字和非数字类型的值。

  • 空值情况处理:如果过滤条件的表达式计算值为null或不是布尔类型(true和false),则消息默认被过滤,不会被投递给消费者。例如发送消息时未定义某个属性,在订阅时过滤条件中直接使用该属性,则过滤条件的表达式计算结果为null。

  • 数值类型不符处理:如果消息自定义属性为浮点型,但过滤条件中使用整数进行判断,则消息默认被过滤,不会被投递给消费者。

sql92使用

源码

所有源码已上传至github

  • 地址:https://github.com/weihubeats/weihubeats_demos/tree/master/java-demos/rocketmq-demo/src/main/java/com/weihubeats/rocketmq/demo/sql92

消息发送

public class SQLProducer {public static int count = 10;public static String topic = "xiao-zou-topic";public static void main(String[] args) {DefaultMQProducer producer = MQUtils.createLocalProducer();IntStream.range(0, count).forEach(i -> {Message message = new Message(topic, ("sql92 test" + i).getBytes(StandardCharsets.UTF_8));try {if (i % 2 == 0) {message.putUserProperty("gray", "dev1");}SendResult sendResult = producer.send(message);DateTimeFormatter dtf2 = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");System.out.printf("%s %s%n", sendResult, dtf2.format(LocalDateTime.now()));}catch (Exception e) {throw new RuntimeException(e);}});producer.shutdown();}
}

这里我们假装消息是发送个多个测试的消息,所以每条消息都在UserProperty添加了一个dev1标签。

我们要实现的就是比如只有dev1环境的消费者才会消费带有dev1标签的消息,其他消息则丢弃掉

消息消费

public class SQLConsumer {public static String GID = "xiao-zou-gid";public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = MQUtils.createLocalConsumer(GID);String sql = "gray is not null and gray = 'dev1'";consumer.subscribe(MQUtils.TOPIC, MessageSelector.bySql(sql));consumer.registerMessageListener((MessageListenerConcurrently) (msg, context) -> {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msg);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});/**  Launch the consumer instance.*/consumer.start();System.out.printf("Consumer Started.%n");}
}

这里的消息消费方式唯一不同的是我们订阅消息的方式发生了变化

普通方法我们调用的是这个方法进行消息订阅的,传入tag就行

比如像这样

consumer.subscribe("TopicTest", "TagA || TagC || TagD");

但是这里我们使用的是sql92方式

传入的是一个MessageSelector,订阅的规则是

String sql = "gray is not null and gray = 'dev1'";

运行效果

  • 消息发送

这里我们发送了十条消息,只有5条是带有gray标签的

  • 消息消费

可以看到消息消费只有消费了带有gray标签的5条消息,符合我们的预期

sql92是在客户端还是在服务端过滤的?

sql92tag都是在服务端过滤的,我们可以查看源码得知

不过tag的过滤方式会在客户端再次过滤。因为在服务端是通过hashcode进行过滤的,为了提高性能,没有对原始的tag进行过滤,在通过hashcode过滤掉绝大多少的消息后,在客户端进行最后的tag完全过滤。

org.apache.rocketmq.client.impl.consumer.PullAPIWrapper#processPullResult

如果统一都在客户端过滤会导致传输大量的消息到客户端,影响性能

总结

本次我们对RocketMQ sql92过滤消息进行了简单的使用以及少量的源码分析,并没有完整的从整个流程进行分析,因为本篇并不是源码分析偏。sql92在实际的项目中的相对来说较少,偶尔如果做RocketMQ消息的多册环境或者灰度,可能是一个方案,但不是最佳的

参考

  • 官方文档

这篇关于RocketMQ sql92的使用及原理简单分析附源码的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/331378

相关文章

Spring IoC 容器的使用详解(最新整理)

《SpringIoC容器的使用详解(最新整理)》文章介绍了Spring框架中的应用分层思想与IoC容器原理,通过分层解耦业务逻辑、数据访问等模块,IoC容器利用@Component注解管理Bean... 目录1. 应用分层2. IoC 的介绍3. IoC 容器的使用3.1. bean 的存储3.2. 方法注

Python内置函数之classmethod函数使用详解

《Python内置函数之classmethod函数使用详解》:本文主要介绍Python内置函数之classmethod函数使用方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地... 目录1. 类方法定义与基本语法2. 类方法 vs 实例方法 vs 静态方法3. 核心特性与用法(1编程客

怎样通过分析GC日志来定位Java进程的内存问题

《怎样通过分析GC日志来定位Java进程的内存问题》:本文主要介绍怎样通过分析GC日志来定位Java进程的内存问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、GC 日志基础配置1. 启用详细 GC 日志2. 不同收集器的日志格式二、关键指标与分析维度1.

Linux中压缩、网络传输与系统监控工具的使用完整指南

《Linux中压缩、网络传输与系统监控工具的使用完整指南》在Linux系统管理中,压缩与传输工具是数据备份和远程协作的桥梁,而系统监控工具则是保障服务器稳定运行的眼睛,下面小编就来和大家详细介绍一下它... 目录引言一、压缩与解压:数据存储与传输的优化核心1. zip/unzip:通用压缩格式的便捷操作2.

从原理到实战深入理解Java 断言assert

《从原理到实战深入理解Java断言assert》本文深入解析Java断言机制,涵盖语法、工作原理、启用方式及与异常的区别,推荐用于开发阶段的条件检查与状态验证,并强调生产环境应使用参数验证工具类替代... 目录深入理解 Java 断言(assert):从原理到实战引言:为什么需要断言?一、断言基础1.1 语

使用Python实现可恢复式多线程下载器

《使用Python实现可恢复式多线程下载器》在数字时代,大文件下载已成为日常操作,本文将手把手教你用Python打造专业级下载器,实现断点续传,多线程加速,速度限制等功能,感兴趣的小伙伴可以了解下... 目录一、智能续传:从崩溃边缘抢救进度二、多线程加速:榨干网络带宽三、速度控制:做网络的好邻居四、终端交互

Python中注释使用方法举例详解

《Python中注释使用方法举例详解》在Python编程语言中注释是必不可少的一部分,它有助于提高代码的可读性和维护性,:本文主要介绍Python中注释使用方法的相关资料,需要的朋友可以参考下... 目录一、前言二、什么是注释?示例:三、单行注释语法:以 China编程# 开头,后面的内容为注释内容示例:示例:四

Go语言数据库编程GORM 的基本使用详解

《Go语言数据库编程GORM的基本使用详解》GORM是Go语言流行的ORM框架,封装database/sql,支持自动迁移、关联、事务等,提供CRUD、条件查询、钩子函数、日志等功能,简化数据库操作... 目录一、安装与初始化1. 安装 GORM 及数据库驱动2. 建立数据库连接二、定义模型结构体三、自动迁

ModelMapper基本使用和常见场景示例详解

《ModelMapper基本使用和常见场景示例详解》ModelMapper是Java对象映射库,支持自动映射、自定义规则、集合转换及高级配置(如匹配策略、转换器),可集成SpringBoot,减少样板... 目录1. 添加依赖2. 基本用法示例:简单对象映射3. 自定义映射规则4. 集合映射5. 高级配置匹

MySQL中的表连接原理分析

《MySQL中的表连接原理分析》:本文主要介绍MySQL中的表连接原理分析,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录1、背景2、环境3、表连接原理【1】驱动表和被驱动表【2】内连接【3】外连接【4编程】嵌套循环连接【5】join buffer4、总结1、背景