消息生产者端使用 ScheduledExecutorService 来定期重置计数器,以实现限流

本文主要是介绍消息生产者端使用 ScheduledExecutorService 来定期重置计数器,以实现限流,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

使用 ScheduledExecutorService 可以很容易地实现定时任务,比如每秒重置计数器来实现限流。下面是一个改进的示例,展示了如何在消息生产者端使用 ScheduledExecutorService 来定期重置计数器,以实现限流:

java
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;

public class LimitedRateProducer {
private static final String TOPIC = “test-topic”;
private static final String TAG = “*”;
private static final long MAX_RATE = 100L; // 每秒最大发送速率
private static final long RESET_INTERVAL = 1000; // 重置计数器的时间间隔,单位毫秒
private final DefaultMQProducer producer;
private final AtomicLong counter = new AtomicLong(0);
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();

public LimitedRateProducer() throws Exception {  producer = new DefaultMQProducer("limited-rate-producer-group");  producer.setNamesrvAddr("localhost:9876");  producer.start();  // 安排一个任务来每秒重置计数器  scheduler.scheduleAtFixedRate(() -> counter.set(0), RESET_INTERVAL, RESET_INTERVAL, TimeUnit.MILLISECONDS);  
}  public void sendMessage(String content) throws Exception {  long currentRate = counter.incrementAndGet();  if (currentRate > MAX_RATE) {  // 超出限流速率,丢弃消息或执行其他策略  counter.decrementAndGet(); // 减少计数器,因为这条消息没有被发送  System.out.println("Message discarded due to rate limit.");  return;  }  Message msg = new Message(TOPIC, TAG, content.getBytes());  producer.send(msg);  
}  public void shutdown() throws Exception {  producer.shutdown();  scheduler.shutdown(); // 关闭调度器,停止所有计划任务  
}  public static void main(String[] args) throws Exception {  LimitedRateProducer producer = new LimitedRateProducer();  // 模拟发送消息  for (int i = 0; i < 1000; i++) {  producer.sendMessage("Hello RocketMQ " + i);  // 休眠一段时间来模拟发送间隔  Thread.sleep(10);  }  producer.shutdown(); // 关闭生产者和调度器  
}  

}
在这个示例中,我们创建了一个 ScheduledExecutorService 来定期重置 counter。scheduleAtFixedRate 方法用于安排一个固定频率执行的任务,这里我们设置每 RESET_INTERVAL 毫秒(即每秒)执行一次任务,任务的内容是简单地将 counter 设置为0。

代码注意事项:

  1. 异常处理

    • sendMessage 方法中,当调用 producer.send(msg) 时,应该捕获并处理可能抛出的异常。
    • shutdown 方法中,也需要处理 producer.shutdown()scheduler.shutdown() 可能抛出的异常。
  2. 资源关闭

    • shutdown 方法中,不仅要关闭 scheduler,还要确保 producer 也被正确关闭,并等待关闭操作完成。
    • 考虑使用 try-finally 块或 try-with-resources 语句来确保资源被释放。
  3. 并发安全

    • 虽然 AtomicLong 提供了线程安全的计数器操作,但如果限流逻辑变得更复杂,可能需要进一步考虑并发控制。
  4. 限流精度

    • 使用 ScheduledExecutorService 的定时任务进行限流可能不够精确,特别是在高并发场景下。如果精度要求较高,可能需要考虑使用其他限流算法或工具。
  5. 日志记录

    • 在限流逻辑中加入日志记录,有助于监控和调试。当消息被丢弃或限流逻辑被触发时,应该记录相关信息。

设计注意事项:

  1. 可扩展性

    • 如果未来需要调整限流策略或增加其他功能,设计应该考虑易于扩展和维护。
  2. 灵活性

    • 提供配置化支持,允许用户动态调整限流速率、重置间隔等参数。
  3. 健壮性

    • 考虑系统在各种异常情况下的表现,如网络故障、消息队列不可用等,确保系统能够优雅地处理这些情况。
  4. 性能考虑

    • 在高并发场景下,限流逻辑可能成为性能瓶颈。需要通过性能测试和调优来确保系统的性能。

安全性注意事项:

  1. 防止拒绝服务攻击(DoS)

    • 如果限流逻辑不当,恶意用户可能会利用它发起DoS攻击。因此,需要确保限流策略能够有效防止此类攻击。
  2. 敏感信息保护

    • 在日志记录和错误处理中,避免泄露敏感信息,如用户凭证、内部系统细节等。

测试注意事项:

  1. 单元测试

    • LimitedRateProducer 类进行单元测试,验证限流逻辑的正确性。
  2. 集成测试

    • 在实际环境中进行集成测试,确保限流逻辑与整个系统的其他部分协同工作。
  3. 性能测试

    • 在不同负载下进行性能测试,确保系统在高并发场景下能够保持稳定和高效的限流能力。

这篇关于消息生产者端使用 ScheduledExecutorService 来定期重置计数器,以实现限流的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/920935

相关文章

Java中流式并行操作parallelStream的原理和使用方法

《Java中流式并行操作parallelStream的原理和使用方法》本文详细介绍了Java中的并行流(parallelStream)的原理、正确使用方法以及在实际业务中的应用案例,并指出在使用并行流... 目录Java中流式并行操作parallelStream0. 问题的产生1. 什么是parallelS

C++中unordered_set哈希集合的实现

《C++中unordered_set哈希集合的实现》std::unordered_set是C++标准库中的无序关联容器,基于哈希表实现,具有元素唯一性和无序性特点,本文就来详细的介绍一下unorder... 目录一、概述二、头文件与命名空间三、常用方法与示例1. 构造与析构2. 迭代器与遍历3. 容量相关4

Linux join命令的使用及说明

《Linuxjoin命令的使用及说明》`join`命令用于在Linux中按字段将两个文件进行连接,类似于SQL的JOIN,它需要两个文件按用于匹配的字段排序,并且第一个文件的换行符必须是LF,`jo... 目录一. 基本语法二. 数据准备三. 指定文件的连接key四.-a输出指定文件的所有行五.-o指定输出

Linux jq命令的使用解读

《Linuxjq命令的使用解读》jq是一个强大的命令行工具,用于处理JSON数据,它可以用来查看、过滤、修改、格式化JSON数据,通过使用各种选项和过滤器,可以实现复杂的JSON处理任务... 目录一. 简介二. 选项2.1.2.2-c2.3-r2.4-R三. 字段提取3.1 普通字段3.2 数组字段四.

C++中悬垂引用(Dangling Reference) 的实现

《C++中悬垂引用(DanglingReference)的实现》C++中的悬垂引用指引用绑定的对象被销毁后引用仍存在的情况,会导致访问无效内存,下面就来详细的介绍一下产生的原因以及如何避免,感兴趣... 目录悬垂引用的产生原因1. 引用绑定到局部变量,变量超出作用域后销毁2. 引用绑定到动态分配的对象,对象

Linux kill正在执行的后台任务 kill进程组使用详解

《Linuxkill正在执行的后台任务kill进程组使用详解》文章介绍了两个脚本的功能和区别,以及执行这些脚本时遇到的进程管理问题,通过查看进程树、使用`kill`命令和`lsof`命令,分析了子... 目录零. 用到的命令一. 待执行的脚本二. 执行含子进程的脚本,并kill2.1 进程查看2.2 遇到的

SpringBoot基于注解实现数据库字段回填的完整方案

《SpringBoot基于注解实现数据库字段回填的完整方案》这篇文章主要为大家详细介绍了SpringBoot如何基于注解实现数据库字段回填的相关方法,文中的示例代码讲解详细,感兴趣的小伙伴可以了解... 目录数据库表pom.XMLRelationFieldRelationFieldMapping基础的一些代

Java HashMap的底层实现原理深度解析

《JavaHashMap的底层实现原理深度解析》HashMap基于数组+链表+红黑树结构,通过哈希算法和扩容机制优化性能,负载因子与树化阈值平衡效率,是Java开发必备的高效数据结构,本文给大家介绍... 目录一、概述:HashMap的宏观结构二、核心数据结构解析1. 数组(桶数组)2. 链表节点(Node

Java AOP面向切面编程的概念和实现方式

《JavaAOP面向切面编程的概念和实现方式》AOP是面向切面编程,通过动态代理将横切关注点(如日志、事务)与核心业务逻辑分离,提升代码复用性和可维护性,本文给大家介绍JavaAOP面向切面编程的概... 目录一、AOP 是什么?二、AOP 的核心概念与实现方式核心概念实现方式三、Spring AOP 的关

详解SpringBoot+Ehcache使用示例

《详解SpringBoot+Ehcache使用示例》本文介绍了SpringBoot中配置Ehcache、自定义get/set方式,并实际使用缓存的过程,文中通过示例代码介绍的非常详细,对大家的学习或者... 目录摘要概念内存与磁盘持久化存储:配置灵活性:编码示例引入依赖:配置ehcache.XML文件:配置