Disruptor系列3:Disruptor样例实战

2024-04-29 09:18
文章标签 实战 系列 样例 disruptor

本文主要是介绍Disruptor系列3:Disruptor样例实战,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

章节回顾:
- Disruptor系列1:初识Disruptor
- Disruptor系列2:Disruptor原理剖析

本章节是Disruptor样例实战,依据Disruptor的工作流依次执行的特性,实现各种样例。如果想了解Disruptor是什么,可以查看章节 Disruptor系列1:初识Disruptor ,如果想深层次了解Disruptor,可以查看章节 Disruptor系列2:Disruptor原理剖析。通过本章节,希望让大家对如何使用Disruptor有个初步认识,看看它能够解决哪些情况。

具体而言,它可以解决如下方面:
- 并行计算实现;
- 串行依次执行;
- 菱形方式执行;
- 链式并行计算。
并且基于以上情况,每种类型的消费者都可以池化,默认初始化多个同一类型的消费者实例,并行处理,提高系统吞吐量。

本样例是一个生产者生产一个Long类型的数值,消费者对该数值进行处理的操作。本样例对以上各种情况的实现只是disruptor注册消费者的方式不同,因此,我们先把事件类、事件工厂类、消费者类、事件转换类和主函数贴出来。

事件类

public class LongEvent {private Long number;public Long getNumber() {return number;}public void setNumber(Long number) {this.number = number;}
}

事件工厂类

public class LongEventFactory implements EventFactory<LongEvent> {@Overridepublic LongEvent newInstance() {return new LongEvent();}
}

事件转换类

public class LongEventTranslator implements EventTranslatorOneArg<LongEvent, Long> {@Overridepublic void translateTo(LongEvent event, long sequence, Long arg0) {event.setNumber(arg0);}
}

C1-1消费者类

该消费者执行将数值+10的操作。可以看到该消费者同时实现了EventHandlerWorkHandler两个接口。如果不需要池化,只需要实现EventHandler类即可。如果需要池化,只需要实现WorkHandler类即可。本例为了能够同时讲解池化和非池化的实现,因此同时实现了两个类,当然,也没啥问题。

public class C11EventHandler implements EventHandler<LongEvent>,WorkHandler<LongEvent> {@Overridepublic void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception {long number = event.getNumber();number += 10;System.out.println(System.currentTimeMillis()+": c1-1 consumer finished.number=" + number);}@Overridepublic void onEvent(LongEvent event) throws Exception {long number = event.getNumber();number += 10;System.out.println(System.currentTimeMillis()+": c1-1 consumer finished.number=" + number);}
}

C1-2消费者类

该消费者类执行将数值乘以10的操作。

public class C12EventHandler implements EventHandler<LongEvent>,WorkHandler<LongEvent> {@Overridepublic void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception {long number = event.getNumber();number *= 10;System.out.println(System.currentTimeMillis()+": c1-2 consumer finished.number=" + number);}@Overridepublic void onEvent(LongEvent event) throws Exception {long number = event.getNumber();number *= 10;System.out.println(System.currentTimeMillis()+": c1-2 consumer finished.number=" + number);}
}

c2-1消费者类

该消费者类负责将数值+20.

public class C21EventHandler implements EventHandler<LongEvent>,WorkHandler<LongEvent> {@Overridepublic void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception {long number = event.getNumber();number += 20;System.out.println(System.currentTimeMillis()+": c2-1 consumer finished.number=" + number);}@Overridepublic void onEvent(LongEvent event) throws Exception {long number = event.getNumber();number += 20;System.out.println(System.currentTimeMillis()+": c2-1 consumer finished.number=" + number);}
}

C2-2消费者类

该消费者类负责将数值*20

public class C22EventHandler implements EventHandler<LongEvent>,WorkHandler<LongEvent> {@Overridepublic void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception {long number = event.getNumber();number *= 20;System.out.println(System.currentTimeMillis()+": c2-2 consumer finished.number=" + number);}@Overridepublic void onEvent(LongEvent event) throws Exception {long number = event.getNumber();number *= 20;System.out.println(System.currentTimeMillis()+": c2-2 consumer finished.number=" + number);}
}

主函数

public class Main {public static void main(String[] args) {int bufferSize = 1024*1024;//环形队列长度,必须是2的N次方EventFactory<LongEvent> eventFactory = new LongEventFactory();/*** 定义Disruptor,基于单生产者,阻塞策略*/Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(eventFactory,bufferSize, Executors.defaultThreadFactory(),ProducerType.SINGLE,new BlockingWaitStrategy());/////////////////////////////////////////////////////////////////////XXX(disruptor);//这里是调用各种不同方法的地方./////////////////////////////////////////////////////////////////////RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();/*** 输入10*/ringBuffer.publishEvent(new LongEventTranslator(),10L);ringBuffer.publishEvent(new LongEventTranslator(),100L);}
}

并行计算实现

这里写图片描述
并行计算就是消费者之间互相不依赖,并行执行,执行开始时间是一样的。

/*** 并行计算实现,c1,c2互相不依赖* <br/>* p --> c11*   --> c21*/public static void parallel(Disruptor<LongEvent> disruptor){disruptor.handleEventsWith(new C11EventHandler(),new C21EventHandler());disruptor.start();}

串行计算,依次执行

这里写图片描述

/*** 串行依次执行* <br/>* p --> c11 --> c21* @param disruptor*/public static void serial(Disruptor<LongEvent> disruptor){disruptor.handleEventsWith(new C11EventHandler()).then(new C21EventHandler());disruptor.start();}

菱形方式执行

这里写图片描述

/*** 菱形方式执行* <br/>*   --> c11* p          --> c21*   --> c12* @param disruptor*/public static void diamond(Disruptor<LongEvent> disruptor){disruptor.handleEventsWith(new C11EventHandler(),new C12EventHandler()).then(new C21EventHandler());disruptor.start();}

链式并行计算

这里写图片描述

/*** 链式并行计算* <br/>*   --> c11 --> c12* p*   --> c21 --> c22* @param disruptor*/public static void chain(Disruptor<LongEvent> disruptor){disruptor.handleEventsWith(new C11EventHandler()).then(new C12EventHandler());disruptor.handleEventsWith(new C21EventHandler()).then(new C22EventHandler());disruptor.start();}

上面的实例,每一种消费者都只有一个实例,如果想多个实例形成一个线程池并发处理多个任务怎么办?如果使用disruptor.handleEventWith(new C11EventHandler(),new C11EventHandler(),...)这种,会造成重复消费同一个数据,不是我们想要的。我们想要的是同一个类的实例消费不同的数据,怎么办?
- 首先,消费者类需要实现WorkHandler接口,而不是EventHandler接口。为了方便,我们同时实现了这两个接口。
- 其次,disruptor调用handleEventsWithWorkerPool方法,而不是handleEventsWith方法
- 最后,实例化多个事件消费类。

并行计算实现,c1,c2互相不依赖,同时C1,C2分别有2个实例

这里写图片描述

/*** 并行计算实现,c1,c2互相不依赖,同时C1,C2分别有2个实例* <br/>* p --> c11*   --> c21*/public static void parallelWithPool(Disruptor<LongEvent> disruptor){disruptor.handleEventsWithWorkerPool(new C11EventHandler(),new C11EventHandler());disruptor.handleEventsWithWorkerPool(new C21EventHandler(),new C21EventHandler());disruptor.start();}

串行依次执行,同时C11,C21分别有2个实例

这里写图片描述

/*** 串行依次执行,同时C11,C21分别有2个实例* <br/>* p --> c11 --> c21* @param disruptor*/public static void serialWithPool(Disruptor<LongEvent> disruptor){disruptor.handleEventsWithWorkerPool(new C11EventHandler(),new C11EventHandler()).then(new C21EventHandler(),new C21EventHandler());disruptor.start();}

这篇关于Disruptor系列3:Disruptor样例实战的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/945690

相关文章

SpringBoot 多环境开发实战(从配置、管理与控制)

《SpringBoot多环境开发实战(从配置、管理与控制)》本文详解SpringBoot多环境配置,涵盖单文件YAML、多文件模式、MavenProfile分组及激活策略,通过优先级控制灵活切换环境... 目录一、多环境开发基础(单文件 YAML 版)(一)配置原理与优势(二)实操示例二、多环境开发多文件版

Three.js构建一个 3D 商品展示空间完整实战项目

《Three.js构建一个3D商品展示空间完整实战项目》Three.js是一个强大的JavaScript库,专用于在Web浏览器中创建3D图形,:本文主要介绍Three.js构建一个3D商品展... 目录引言项目核心技术1. 项目架构与资源组织2. 多模型切换、交互热点绑定3. 移动端适配与帧率优化4. 可

从原理到实战解析Java Stream 的并行流性能优化

《从原理到实战解析JavaStream的并行流性能优化》本文给大家介绍JavaStream的并行流性能优化:从原理到实战的全攻略,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的... 目录一、并行流的核心原理与适用场景二、性能优化的核心策略1. 合理设置并行度:打破默认阈值2. 避免装箱

Maven中生命周期深度解析与实战指南

《Maven中生命周期深度解析与实战指南》这篇文章主要为大家详细介绍了Maven生命周期实战指南,包含核心概念、阶段详解、SpringBoot特化场景及企业级实践建议,希望对大家有一定的帮助... 目录一、Maven 生命周期哲学二、default生命周期核心阶段详解(高频使用)三、clean生命周期核心阶

Python实战之SEO优化自动化工具开发指南

《Python实战之SEO优化自动化工具开发指南》在数字化营销时代,搜索引擎优化(SEO)已成为网站获取流量的重要手段,本文将带您使用Python开发一套完整的SEO自动化工具,需要的可以了解下... 目录前言项目概述技术栈选择核心模块实现1. 关键词研究模块2. 网站技术seo检测模块3. 内容优化分析模

Java 正则表达式的使用实战案例

《Java正则表达式的使用实战案例》本文详细介绍了Java正则表达式的使用方法,涵盖语法细节、核心类方法、高级特性及实战案例,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要... 目录一、正则表达式语法详解1. 基础字符匹配2. 字符类([]定义)3. 量词(控制匹配次数)4. 边

Java Scanner类解析与实战教程

《JavaScanner类解析与实战教程》JavaScanner类(java.util包)是文本输入解析工具,支持基本类型和字符串读取,基于Readable接口与正则分隔符实现,适用于控制台、文件输... 目录一、核心设计与工作原理1.底层依赖2.解析机制A.核心逻辑基于分隔符(delimiter)和模式匹

Python内存优化的实战技巧分享

《Python内存优化的实战技巧分享》Python作为一门解释型语言,虽然在开发效率上有着显著优势,但在执行效率方面往往被诟病,然而,通过合理的内存优化策略,我们可以让Python程序的运行速度提升3... 目录前言python内存管理机制引用计数机制垃圾回收机制内存泄漏的常见原因1. 循环引用2. 全局变

PostgreSQL简介及实战应用

《PostgreSQL简介及实战应用》PostgreSQL是一种功能强大的开源关系型数据库管理系统,以其稳定性、高性能、扩展性和复杂查询能力在众多项目中得到广泛应用,本文将从基础概念讲起,逐步深入到高... 目录前言1. PostgreSQL基础1.1 PostgreSQL简介1.2 基础语法1.3 数据库

Python WebSockets 库从基础到实战使用举例

《PythonWebSockets库从基础到实战使用举例》WebSocket是一种全双工、持久化的网络通信协议,适用于需要低延迟的应用,如实时聊天、股票行情推送、在线协作、多人游戏等,本文给大家介... 目录1. 引言2. 为什么使用 WebSocket?3. 安装 WebSockets 库4. 使用 We