消息生产者端使用 ScheduledExecutorService 来定期重置计数器,以实现限流

本文主要是介绍消息生产者端使用 ScheduledExecutorService 来定期重置计数器,以实现限流,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

使用 ScheduledExecutorService 可以很容易地实现定时任务,比如每秒重置计数器来实现限流。下面是一个改进的示例,展示了如何在消息生产者端使用 ScheduledExecutorService 来定期重置计数器,以实现限流:

java
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;

public class LimitedRateProducer {
private static final String TOPIC = “test-topic”;
private static final String TAG = “*”;
private static final long MAX_RATE = 100L; // 每秒最大发送速率
private static final long RESET_INTERVAL = 1000; // 重置计数器的时间间隔,单位毫秒
private final DefaultMQProducer producer;
private final AtomicLong counter = new AtomicLong(0);
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();

public LimitedRateProducer() throws Exception {  producer = new DefaultMQProducer("limited-rate-producer-group");  producer.setNamesrvAddr("localhost:9876");  producer.start();  // 安排一个任务来每秒重置计数器  scheduler.scheduleAtFixedRate(() -> counter.set(0), RESET_INTERVAL, RESET_INTERVAL, TimeUnit.MILLISECONDS);  
}  public void sendMessage(String content) throws Exception {  long currentRate = counter.incrementAndGet();  if (currentRate > MAX_RATE) {  // 超出限流速率,丢弃消息或执行其他策略  counter.decrementAndGet(); // 减少计数器,因为这条消息没有被发送  System.out.println("Message discarded due to rate limit.");  return;  }  Message msg = new Message(TOPIC, TAG, content.getBytes());  producer.send(msg);  
}  public void shutdown() throws Exception {  producer.shutdown();  scheduler.shutdown(); // 关闭调度器,停止所有计划任务  
}  public static void main(String[] args) throws Exception {  LimitedRateProducer producer = new LimitedRateProducer();  // 模拟发送消息  for (int i = 0; i < 1000; i++) {  producer.sendMessage("Hello RocketMQ " + i);  // 休眠一段时间来模拟发送间隔  Thread.sleep(10);  }  producer.shutdown(); // 关闭生产者和调度器  
}  

}
在这个示例中,我们创建了一个 ScheduledExecutorService 来定期重置 counter。scheduleAtFixedRate 方法用于安排一个固定频率执行的任务,这里我们设置每 RESET_INTERVAL 毫秒(即每秒)执行一次任务,任务的内容是简单地将 counter 设置为0。

代码注意事项:

  1. 异常处理

    • sendMessage 方法中,当调用 producer.send(msg) 时,应该捕获并处理可能抛出的异常。
    • shutdown 方法中,也需要处理 producer.shutdown()scheduler.shutdown() 可能抛出的异常。
  2. 资源关闭

    • shutdown 方法中,不仅要关闭 scheduler,还要确保 producer 也被正确关闭,并等待关闭操作完成。
    • 考虑使用 try-finally 块或 try-with-resources 语句来确保资源被释放。
  3. 并发安全

    • 虽然 AtomicLong 提供了线程安全的计数器操作,但如果限流逻辑变得更复杂,可能需要进一步考虑并发控制。
  4. 限流精度

    • 使用 ScheduledExecutorService 的定时任务进行限流可能不够精确,特别是在高并发场景下。如果精度要求较高,可能需要考虑使用其他限流算法或工具。
  5. 日志记录

    • 在限流逻辑中加入日志记录,有助于监控和调试。当消息被丢弃或限流逻辑被触发时,应该记录相关信息。

设计注意事项:

  1. 可扩展性

    • 如果未来需要调整限流策略或增加其他功能,设计应该考虑易于扩展和维护。
  2. 灵活性

    • 提供配置化支持,允许用户动态调整限流速率、重置间隔等参数。
  3. 健壮性

    • 考虑系统在各种异常情况下的表现,如网络故障、消息队列不可用等,确保系统能够优雅地处理这些情况。
  4. 性能考虑

    • 在高并发场景下,限流逻辑可能成为性能瓶颈。需要通过性能测试和调优来确保系统的性能。

安全性注意事项:

  1. 防止拒绝服务攻击(DoS)

    • 如果限流逻辑不当,恶意用户可能会利用它发起DoS攻击。因此,需要确保限流策略能够有效防止此类攻击。
  2. 敏感信息保护

    • 在日志记录和错误处理中,避免泄露敏感信息,如用户凭证、内部系统细节等。

测试注意事项:

  1. 单元测试

    • LimitedRateProducer 类进行单元测试,验证限流逻辑的正确性。
  2. 集成测试

    • 在实际环境中进行集成测试,确保限流逻辑与整个系统的其他部分协同工作。
  3. 性能测试

    • 在不同负载下进行性能测试,确保系统在高并发场景下能够保持稳定和高效的限流能力。

这篇关于消息生产者端使用 ScheduledExecutorService 来定期重置计数器,以实现限流的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/920935

相关文章

SpringBoot中SM2公钥加密、私钥解密的实现示例详解

《SpringBoot中SM2公钥加密、私钥解密的实现示例详解》本文介绍了如何在SpringBoot项目中实现SM2公钥加密和私钥解密的功能,通过使用Hutool库和BouncyCastle依赖,简化... 目录一、前言1、加密信息(示例)2、加密结果(示例)二、实现代码1、yml文件配置2、创建SM2工具

Mysql实现范围分区表(新增、删除、重组、查看)

《Mysql实现范围分区表(新增、删除、重组、查看)》MySQL分区表的四种类型(范围、哈希、列表、键值),主要介绍了范围分区的创建、查询、添加、删除及重组织操作,具有一定的参考价值,感兴趣的可以了解... 目录一、mysql分区表分类二、范围分区(Range Partitioning1、新建分区表:2、分

MySQL 定时新增分区的实现示例

《MySQL定时新增分区的实现示例》本文主要介绍了通过存储过程和定时任务实现MySQL分区的自动创建,解决大数据量下手动维护的繁琐问题,具有一定的参考价值,感兴趣的可以了解一下... mysql创建好分区之后,有时候会需要自动创建分区。比如,一些表数据量非常大,有些数据是热点数据,按照日期分区MululbU

Spring IoC 容器的使用详解(最新整理)

《SpringIoC容器的使用详解(最新整理)》文章介绍了Spring框架中的应用分层思想与IoC容器原理,通过分层解耦业务逻辑、数据访问等模块,IoC容器利用@Component注解管理Bean... 目录1. 应用分层2. IoC 的介绍3. IoC 容器的使用3.1. bean 的存储3.2. 方法注

MySQL中查找重复值的实现

《MySQL中查找重复值的实现》查找重复值是一项常见需求,比如在数据清理、数据分析、数据质量检查等场景下,我们常常需要找出表中某列或多列的重复值,具有一定的参考价值,感兴趣的可以了解一下... 目录技术背景实现步骤方法一:使用GROUP BY和HAVING子句方法二:仅返回重复值方法三:返回完整记录方法四:

Python内置函数之classmethod函数使用详解

《Python内置函数之classmethod函数使用详解》:本文主要介绍Python内置函数之classmethod函数使用方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地... 目录1. 类方法定义与基本语法2. 类方法 vs 实例方法 vs 静态方法3. 核心特性与用法(1编程客

IDEA中新建/切换Git分支的实现步骤

《IDEA中新建/切换Git分支的实现步骤》本文主要介绍了IDEA中新建/切换Git分支的实现步骤,通过菜单创建新分支并选择是否切换,创建后在Git详情或右键Checkout中切换分支,感兴趣的可以了... 前提:项目已被Git托管1、点击上方栏Git->NewBrancjsh...2、输入新的分支的

Linux中压缩、网络传输与系统监控工具的使用完整指南

《Linux中压缩、网络传输与系统监控工具的使用完整指南》在Linux系统管理中,压缩与传输工具是数据备份和远程协作的桥梁,而系统监控工具则是保障服务器稳定运行的眼睛,下面小编就来和大家详细介绍一下它... 目录引言一、压缩与解压:数据存储与传输的优化核心1. zip/unzip:通用压缩格式的便捷操作2.

Python实现对阿里云OSS对象存储的操作详解

《Python实现对阿里云OSS对象存储的操作详解》这篇文章主要为大家详细介绍了Python实现对阿里云OSS对象存储的操作相关知识,包括连接,上传,下载,列举等功能,感兴趣的小伙伴可以了解下... 目录一、直接使用代码二、详细使用1. 环境准备2. 初始化配置3. bucket配置创建4. 文件上传到os

关于集合与数组转换实现方法

《关于集合与数组转换实现方法》:本文主要介绍关于集合与数组转换实现方法,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录1、Arrays.asList()1.1、方法作用1.2、内部实现1.3、修改元素的影响1.4、注意事项2、list.toArray()2.1、方