RateLimiter实现令牌桶算法和漏桶算法

2024-06-11 23:28

本文主要是介绍RateLimiter实现令牌桶算法和漏桶算法,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

RateLimiter

第三方工具类:disruptor(高性能的无阻塞的无锁队列)、guava--RateLimit(高性能的信号量的限流器)----【基础的类库】

在 Guava 的 RateLimiter 中,并没有直接提供实现漏桶算法的方法,因为 RateLimiter 的设计就是基于令牌桶的。但是,如果我们想实现一个漏桶算法,我们需要自己编写代码来模拟水的流入和流出。

令牌桶算法(RateLimiter 实现)

  • 令牌以固定的速率产生并放入桶中。

  • 请求尝试从桶中取出令牌

  • 如果桶中有令牌,则请求可以继续;否则,请求可能被拒绝或等待。

在 Guava 的 RateLimiter 中,你使用 tryAcquire() 方法来尝试获取令牌。

漏桶算法

  • 水(请求)以任意速率流入桶中。

  • 桶有一个固定的容量。

  • 水(请求)以固定的速率从桶中流出。

  • 如果桶满了,新的水(请求)会被丢弃。

在漏桶算法的实现中,你需要跟踪桶的当前水位(即当前处理的请求数量),以及流入和流出的速率。你可能需要定时任务来模拟水的流出,或者使用某种数据结构(如队列)来跟踪等待处理的请求。

  • acquire():阻塞方法,尝试获取一个令牌。如果桶中没有令牌,它会阻塞当前线程直到有一个令牌可用。

  • tryAcquire()(无参数):非阻塞方法,尝试获取一个令牌。如果桶中没有令牌,它会立即返回 false

  • tryAcquire(long timeout, TimeUnit unit):带超时的非阻塞方法,尝试在指定的超时时间内获取一个令牌。如果在这段时间内获取到了令牌,它会返回 true;否则,它会返回 false

漏桶算法

限定一个固定速率,当超出了可以采取降级策略。

  • 不管来的速率,始终以匀速的速率处理

1.入门样例

 /*** 限流器示例类,用于演示如何使用Guava的RateLimiter进行速率限制。*/public class RateLimiterExample {/*** 静态的限流器实例,配置为每秒最多允许0.5个请求通过。* 这个配置意味着每两个请求之间至少需要一秒钟的时间间隔。*/private final static RateLimiter limiter = RateLimiter.create(0.5);​/*** 程序的入口点。* 创建一个固定大小的线程池,并提交10个任务来测试限流器。* 每个任务都会尝试获取限流器的许可,以便演示限流效果。** @param args 命令行参数*/public static void main(String[] args) {ExecutorService service = Executors.newFixedThreadPool(10);IntStream.range(0, 10).forEach(i ->service.submit(RateLimiterExample::testLimiter));}​/*** 测试限流器的函数。* 获取限流器的许可,并打印当前线程名以及获取许可所花费的时间。* 这个函数的目的是为了展示限流器如何限制并发请求的数量。*/private static void testLimiter() {// 获取限流器的许可,这可能需要等待,直到许可可用。System.out.println(currentThread() + " waiting " + limiter.acquire());}}

2.典型漏桶算法实现

 /*** Bucket类用于实现一个具有限流功能的队列。* 它使用并发链接deque作为容器,以限制队列中元素的数量,并通过RateLimiter控制取元素的速率。*/public class Bucket {/*** 使用并发链接deque作为容器,以支持并发操作。*/private final ConcurrentLinkedDeque<Integer> container = new ConcurrentLinkedDeque<>();​/*** 定义桶的容量限制。*/private final static int BUCKET_LIMIT = 1000;​/*** 使用RateLimiter来控制提交数据的速率。*/private final RateLimiter limiter = RateLimiter.create(10);​/*** 提交监视器,用于控制对容器进行提交操作的并发访问。*/private final Monitor offerMonitor = new Monitor();/*** 取消监视器,用于控制从容器中取出数据的并发访问。*/private final Monitor pollMonitor = new Monitor();​/*** 向桶中提交数据。* 提交数据到容器中。** 如果容器未满,则将数据添加到容器中;如果容器已满,则抛出异常。* 使用信号量机制来控制对容器的访问,以确保线程安全。** @param data 要提交的数据,必须为非空整数。* @throws IllegalArgumentException 如果容器已满,则抛出此异常。*/public void submit(Integer data){// 使用信号量检查容器是否已满,如果未满则获取访问权限if(offerMonitor.enterIf(offerMonitor.newGuard(()->container.size() < BUCKET_LIMIT))){try{// 在确保容器未满的情况下,添加数据到容器中container.offer(data);// 打印当前线程信息、提交的数据和容器的当前大小System.out.println(currentThread() + " submit data" + data + ",current size:" + container.size());}finally {// 无论添加数据成功与否,都释放访问权限offerMonitor.leave();}}else {// 如果容器已满,则抛出异常throw new IllegalArgumentException("The bucket is full");}}​​/*** 从桶中取出并消费一个元素。* 此方法用于从桶中取出一个元素,并使用提供的Consumer接口来消费这个元素。它首先尝试获取一个许可,确保桶不为空,* 如果成功获取许可,则执行元素的消费操作。这个方法的设计是为了在多线程环境下安全地访问和操作桶中的元素。** @param consumer 一个函数接口,用于定义如何消费桶中的元素。它接受一个整型参数,并没有返回值。*//*** 从桶中取出并消费一个元素。* 如果桶不为空,并且成功获得取元素的许可,则从桶中取出一个元素并使用提供的消费者进行消费。** @param consumer 消费元素的函数接口。*/public void takeThenConsume(Consumer<Integer> consumer){// 尝试获取许可以进入临界区,只有当桶不为空时才允许进入if(pollMonitor.enterIf(pollMonitor.newGuard(()->!container.isEmpty()))){try{// 打印当前线程正在等待获取许可的信息/*如果没有可用的许可,此方法可能会阻塞直到获得许可。这个方法的返回值通常表示成功获取的许可数量,在许多情况下对于一次性获取一个许可的情况会返回1,但具体取决于limiter的实现。*///todo 受RateLimiter.create(10);影响,控制速度System.out.println(currentThread() + " waiting " + limiter.acquire());// 从桶中poll(取出并删除)一个元素,并使用consumer接口消费这个元素consumer.accept(container.poll());}finally {// 无论操作成功与否,都释放许可pollMonitor.leave();}}}​}

测试类

 /*** BucketTest 类用于演示一个并发测试场景,其中多个线程向一个桶中添加数据,* 而其他线程从桶中获取数据并消费。*/public class BucketTest {/*** 主函数执行并发测试。* @param args 命令行参数*/public static void main(String[] args) {// 创建一个 Bucket 实例,用于线程间的数据传递。final Bucket bucket = new Bucket();// 创建一个 AtomicInteger,用于线程安全地自增,作为数据的唯一标识。final AtomicInteger DATA_CREATE = new AtomicInteger(0);​// 启动 5 个生产者线程,它们会不断地向桶中添加数据。IntStream.range(0, 5).forEach(i ->{new Thread(() ->{// 无限循环,生产者线程将持续添加数据。for (; ; ) {// 获取并增加数据序号,作为添加到桶中的数据。int data = DATA_CREATE.getAndIncrement();bucket.submit(data);try {// 休眠 200 毫秒,模拟数据生成的间隔。5个线程一秒提交五个,所以25TimeUnit.MILLISECONDS.sleep(200L);} catch (Exception e) {// 捕获并处理异常,这里只处理 IllegalArgumentException。if (e instanceof IllegalArgumentException) {System.out.println(e.getMessage());}}}}).start();});​// 25 : 10 ====> 5 : 2 是一个比例说明,表示生产者和RateLimit限定的速率(消费者?)的数量关系。​/*Thread[Thread-3,5,main] submit data1712,current size:1000Thread[Thread-7,5,main] waiting 0.09999Thread[Thread-7,5,main] W 711               711 / 1712 ~= 5 : 2*/​​// 启动 5 个消费者线程,它们会不断地从桶中获取数据并消费。IntStream.range(0, 5).forEach(i -> new Thread(() ->{// 无限循环,消费者线程将持续获取并消费数据。for (; ; ) {// 从桶中获取数据并消费,这里使用 lambda 表达式定义消费行为。bucket.takeThenConsume(x -> System.out.println(currentThread() + " W " + x));}}).start());}}

令牌桶算法

需要拿到令牌才允许进来

 /*** 令牌桶算法实现类。* 用于模拟销售系统中的限流策略,确保在限定的速率下出售商品,防止瞬间流量过高导致系统崩溃。*/public class TokenBucket {// 记录已售出的手机数量private AtomicInteger phoneNumbers = new AtomicInteger(0);​// 设置最大的销售限制private final static int LIMIT = 100;​// 使用RateLimiter实现限流,每10秒最多允许一个购买操作,  1s 10个把??private RateLimiter rateLimiter = RateLimiter.create(10);​// 每个实例的销售限制private final int saleLimit;​/*** 默认构造函数,使用预设的销售限制。*/public TokenBucket(){this(LIMIT);}​/*** 带有销售限制参数的构造函数。* * @param limit 销售限制的数量。*/public TokenBucket(int limit){this.saleLimit = limit;}​/*** 尝试购买手机的方法。* 如果令牌桶中有令牌,则尝试购买;如果已达到销售限制,则抛出异常。* * @return 返回购买成功的手机序号。* @throws IllegalStateException 如果已达到销售限制无法购买时抛出。* @throws RuntimeException 如果购买过程中发生异常时抛出。*/public int buy(){Stopwatch started = Stopwatch.createStarted();// 尝试获取令牌,如果10秒内无法获取则失败// 不想acquire会返回超时时间boolean success = rateLimiter.tryAcquire(10, TimeUnit.SECONDS);if(success){// 检查是否已达到销售限制//todo 如果放在一开始,就会导致在tryAcquire阻塞多个线程,即使是原子类组合起来也会有线程安全问题。两个原子方法get、getAndIncrement组合起来了if(phoneNumbers.get() >= saleLimit){throw new IllegalStateException("Not any phone can be sale,please wait to next time!");}// 获取并增加已售出的手机数量int phoneNo = phoneNumbers.getAndIncrement();// 模拟处理订单的时间handleOrder();// 打印购买成功信息System.out.println(currentThread() + " user get the Mi phone " + phoneNo + ",ELT: " + started.stop());return phoneNo;}else {// 如果获取令牌失败,则停止计时并抛出异常started.stop();throw new RuntimeException("Sorry, occur exception when buy phone!");}}​/*** 模拟处理订单的时间,随机延迟1-10秒。* 这个方法用于模拟真实购买过程中的一些处理时间,使得购买过程更加真实。*/private void handleOrder(){try {TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(10));} catch (InterruptedException e) {e.printStackTrace();}}​}

测试类

 public class TokenBucketExample {public static void main(String[] args) {final TokenBucket tokenBucket = new TokenBucket();for(int i = 0;i < 110;i++){new Thread(tokenBucket::buy).start();}}}

区别

漏桶:如果一下子来了很多请求,但是请求会被放在池子里面,然后以固定的速率去处理请求。

令牌桶:以固定的速率往桶内放入令牌,一下来很多请求,只要桶内的令牌足够多,请求就会立马被处理,这就是允许突发大量请求进来。

漏桶是请求进入桶内,但是处理请求的速率是固定的,令牌桶是只要拿到令牌请求立马会被处理。

漏桶算法与令牌桶算法的区别在于,漏桶算法能够强行限制数据的传输速率,令牌桶算法能够在限制数据的平均传输速率的同时还允许某种程度的突发传输。

需要注意的是,在某些情况下,漏桶算法不能够有效地使用网络资源,因为漏桶的漏出速率是固定的,所以即使网络中没有发生拥塞,漏桶算法也不能使某一个单独的数据流达到端口速率。因此,漏桶算法对于存在突发特性的流量来说缺乏效率。而令牌桶算法则能够满足这些具有突发特性的流量。通常,漏桶算法与令牌桶算法结合起来为网络流量提供更高效的控制。

  • 从 API 调用上,两者可能看起来很相似(都是尝试获取某种“资源”),但它们的内部实现和逻辑是不同的。RateLimiter 直接提供了令牌桶算法的实现,而如果你需要实现漏桶算法,你需要自己编写额外的代码。

这篇关于RateLimiter实现令牌桶算法和漏桶算法的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!


原文地址:
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.chinasem.cn/article/1052545

相关文章

Python实现精准提取 PDF中的文本,表格与图片

《Python实现精准提取PDF中的文本,表格与图片》在实际的系统开发中,处理PDF文件不仅限于读取整页文本,还有提取文档中的表格数据,图片或特定区域的内容,下面我们来看看如何使用Python实... 目录安装 python 库提取 PDF 文本内容:获取整页文本与指定区域内容获取页面上的所有文本内容获取

基于Python实现一个Windows Tree命令工具

《基于Python实现一个WindowsTree命令工具》今天想要在Windows平台的CMD命令终端窗口中使用像Linux下的tree命令,打印一下目录结构层级树,然而还真有tree命令,但是发现... 目录引言实现代码使用说明可用选项示例用法功能特点添加到环境变量方法一:创建批处理文件并添加到PATH1

Java使用HttpClient实现图片下载与本地保存功能

《Java使用HttpClient实现图片下载与本地保存功能》在当今数字化时代,网络资源的获取与处理已成为软件开发中的常见需求,其中,图片作为网络上最常见的资源之一,其下载与保存功能在许多应用场景中都... 目录引言一、Apache HttpClient简介二、技术栈与环境准备三、实现图片下载与保存功能1.

canal实现mysql数据同步的详细过程

《canal实现mysql数据同步的详细过程》:本文主要介绍canal实现mysql数据同步的详细过程,本文通过实例图文相结合给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的... 目录1、canal下载2、mysql同步用户创建和授权3、canal admin安装和启动4、canal

Nexus安装和启动的实现教程

《Nexus安装和启动的实现教程》:本文主要介绍Nexus安装和启动的实现教程,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、Nexus下载二、Nexus安装和启动三、关闭Nexus总结一、Nexus下载官方下载链接:DownloadWindows系统根

SpringBoot集成LiteFlow实现轻量级工作流引擎的详细过程

《SpringBoot集成LiteFlow实现轻量级工作流引擎的详细过程》LiteFlow是一款专注于逻辑驱动流程编排的轻量级框架,它以组件化方式快速构建和执行业务流程,有效解耦复杂业务逻辑,下面给大... 目录一、基础概念1.1 组件(Component)1.2 规则(Rule)1.3 上下文(Conte

MySQL 横向衍生表(Lateral Derived Tables)的实现

《MySQL横向衍生表(LateralDerivedTables)的实现》横向衍生表适用于在需要通过子查询获取中间结果集的场景,相对于普通衍生表,横向衍生表可以引用在其之前出现过的表名,本文就来... 目录一、横向衍生表用法示例1.1 用法示例1.2 使用建议前面我们介绍过mysql中的衍生表(From子句

Mybatis的分页实现方式

《Mybatis的分页实现方式》MyBatis的分页实现方式主要有以下几种,每种方式适用于不同的场景,且在性能、灵活性和代码侵入性上有所差异,对Mybatis的分页实现方式感兴趣的朋友一起看看吧... 目录​1. 原生 SQL 分页(物理分页)​​2. RowBounds 分页(逻辑分页)​​3. Page

Python基于微信OCR引擎实现高效图片文字识别

《Python基于微信OCR引擎实现高效图片文字识别》这篇文章主要为大家详细介绍了一款基于微信OCR引擎的图片文字识别桌面应用开发全过程,可以实现从图片拖拽识别到文字提取,感兴趣的小伙伴可以跟随小编一... 目录一、项目概述1.1 开发背景1.2 技术选型1.3 核心优势二、功能详解2.1 核心功能模块2.

MYSQL查询结果实现发送给客户端

《MYSQL查询结果实现发送给客户端》:本文主要介绍MYSQL查询结果实现发送给客户端方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录mysql取数据和发数据的流程(边读边发)Sending to clientSending DataLRU(Least Rec