java多线程间通信_Java中的线程间通信以光速传输

2023-10-08 11:10

本文主要是介绍java多线程间通信_Java中的线程间通信以光速传输,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

java多线程间通信

故事以一个简单的想法开始:创建一个开发人员友好,简单且轻量级的线程间通信框架,而无需使用任何锁,同步器,信号量,等待,通知; 并且没有队列,消息,事件或任何其他并发特定的词或工具。

只需让POJO在普通的旧Java接口之间进行通信即可。

它可能类似于Akka类型的actor ,但是由于新框架必须超轻量且针对单个多核计算机上的线程间通信进行了优化,因此可能会过大。

当参与者跨越同一机器上或跨网络分布的机器上的不同JVM实例之间的进程边界时,Akka框架非常适合进程间通信。

但是,在较小的项目中使用Akka类型的actor可能是多余的,在这些项目中,您仅需要线程间通信,但是您仍然要坚持使用typed actor方法。

我使用动态代理,阻塞队列和缓存的线程池在几天之内创建了一个解决方案。

图1显示了所创建框架的高层架构:



图1:框架的高级架构

SPSC队列是“单一生产者/单一消费者”队列。 MPSC队列是多生产者/单消费者。

分派器线程从Actor线程接收消息,并将其发送到适当的SPSC队列中。

参与者线程使用接收到的消息中的数据来调用参与者实例的相应方法。 通过使用其他角色的代理,角色实例将消息发送到MPSC队列,然后消息到达目标角色线程。

为了进行简单的测试,我创建了一个乒乓球示例:

public interface PlayerA (
void pong(long ball); //send and forget method call 
}
public interface PlayerB {   
void ping(PlayerA playerA, long ball); //send and forget method call    
}    
public class PlayerAImpl implements PlayerA {    
@Override    
@ublic void pong(long ball) {    
}    
}
public class PlayerBImpl implements PlayerB {   
@Override    
public void ping(PlayerA playerA, long ball) {    
playerA.pong(ball);    
}    
}
public class PingPongExample {   
public void testPingPong() {
// this manager hides the complexity of inter-thread communications   
// and it takes control over actor proxies, actor implementations and threads    
ActorManager manager = new ActorManager();
// registers actor implementations inside the manager   
manager.registerImpl(PlayerAImpl.class);    
manager.registerImpl(PlayerBImpl.class);
//Create actor proxies. Proxies convert method calls into internal messages    
//which would be sent between threads to a specific actor instance.    
PlayerA playerA = manager.createActor(PlayerA.class);    
PlayerB playerB = manager.createActor(PlayerB.class);    
for(int i = 0; i < 1000000; i++) {    
playerB.ping(playerA, i);     
}    
}

他们的演奏速度约为每秒500,000 ping / pong。 到目前为止,一切都很好。 但是,当与仅使用一个线程的执行速度进行比较时,它突然看起来并不那么好。 在单个线程中运行的代码每秒可以执行超过20亿(2,681,850,373)个操作!

相差超过5,000次。 这让我很失望。 它产生的单线程代码在许多情况下比多线程代码更有效。

我开始寻找导致乒乓球运动员动作缓慢的原因。 经过一些调查和测试,我发现我用来在参与者之间传递消息的阻塞队列正在影响性能。

图2:具有单个生产者和单个消费者的SPSC队列

因此,我开始寻求Java中最快的队列实现之一作为替代。 我找到了Nitsan Wakart一个很棒的博客 。 他有几篇文章描述了单生产者/单消费者(SPSC)无锁队列的一些实现。 这些帖子的灵感来自马丁·汤普森(Martin Thompson)的无锁算法以实现最终性能 。

与基于锁原语的队列相比,无锁队列提供了更好的性能。 对于基于锁的队列,当一个线程获得锁时,其他线程将被阻塞,直到锁释放为止。 在无锁算法的情况下,生产者线程可以生成消息而不会被其他生产者线程阻塞,并且使用者从队列中读取时不会被其他使用者阻塞。

Martin Thompson的演讲和Nitsan的博客中描述的SPSC队列的性能结果令人难以置信- 超过100M ops / sec 。 它比JDK的并发队列实施快10倍以上(在具有4个内核的Intel Core i7上的性能约为8M ops / sec)。

带着极大的期待,我用无锁的SPSC队列实现替换了连接到每个参与者的链接阻塞队列。 可悲的是,性能测试并没有显着提高吞吐量。 很快就意识到瓶颈不是SPSC队列,而是多生产者/单一消费者(MPSC)队列。

在MPSC队列中使用SPSC队列不是一件容易的事; 多个生产者可以通过执行放置操作覆盖彼此的值。 SPSC队列只是没有代码来控制多个生产者的放置操作。 因此,即使最快的SPSC队列也无法解决我的问题。

对于多个生产者/单个消费者,我决定使用LMAX Disruptor –一种基于环形缓冲区的高性能线程间消息传递库。

图3:具有单个生产者和单个消费者的LMAX破坏者

通过使用Disruptor,很容易实现非常低延迟,高吞吐量的线程间消息通信。 它还为生产者和消费者的不同组合提供了用例。 几个线程可以从环形缓冲区读取而不会互相阻塞:

图4:具有单个生产者和两个消费者的LMAX Disruptor

多个生产者向环形缓冲区写入消息,而多个使用者从环形缓冲区获取消息的情况。

图5:具有两个生产者和两个消费者的LMAX Disruptor

快速搜索性能测试后,我发现了针对三个发布者和一个消费者的吞吐量测试 。 那正是医生命令的,并且产生了以下结果:

LinkedBlockingQueue

破坏者

运行0

4,550,625个操作/秒

11,487,650次操作/秒

运行1

4,651,162 ops / sec

11,049,723次操作/秒

运行2

4,404,316 ops / sec

11,142,061 ops / sec

在3个Producers / 1 Consumer案例中,Disruptor的速度是LinkedBlockingQueue的两倍以上。 但是,这与我期望的性能结果提高10倍相比还有很长的路要走。

我对这种顺序感到沮丧,我的头脑正在寻找解决方案。 由于命运的缘故,我最近修改了通勤路线,改用地铁代替旧的拼车。 突然间,一场遐想传遍了我,我的思绪开始将制图站映射到生产者和消费者。 在一个站点上,我们既有生产者(以载人的货车形式)也有消费者(与载人的货车相同)。

我创建了Railway类,并使用AtomicLong跟踪火车从车站到车站的经过。 对于简单的情况,我从单轨铁路开始。

public class RailWay {  
private final Train train = new Train();  
// the stationNo tracks the train and defines which station has the received train
private final AtomicInteger stationIndex = new AtomicInteger();
// Multiple threads access this method and wait for the train on the specific station. 
public Train waitTrainOnStation(final int stationNo) {
while (stationIndex.get() % stationCount != stationNo) {
Thread.yield(); // this is necessary to keep a high throughput of message passing.   
//But it eats CPU cycles while waiting for a train  
}  
// the busy loop returns only when the station number will match  
// stationIndex.get() % stationCount condition
return train;
}
// this method moves this train to the next station by incrementing the train station index…
public void sendTrain() {
stationIndex.getAndIncrement();
}
}

为了进行测试,我使用了Disruptor性能测试和SPSC队列测试中使用的相同条件-测试在线程之间传递长值。 我创建了以下Train类,其中包含一个长数组:

public class Train {   
//   
public static int CAPACITY = 2*1024;
private final long[] goodsArray; // array to transfer freight goods
private int index;
public Train() {   
goodsArray = new long[CAPACITY];     
}
public int goodsCount() { // returns the count of goods    
return index;    
}    
public void addGoods(long i) { // adds item to the train    
goodsArray[index++] = i;    
}    
public long getGoods(int i) { //removes the item from the train    
index--;    
return goodsArray[i];    
}    
}

然后,我编写了一个简单的测试 :两个线程通过火车在彼此之间传输长整型。

图6:具有单一生产者和单一消费者的铁路使用单一火车

public void testRailWay() {   
final Railway railway = new Railway();    
final long n = 20000000000l;    
//starting a consumer thread    
new Thread() {    
long lastValue = 0;
@Override   
public void run() {    
while (lastValue < n) {    
Train train = railway.waitTrainOnStation(1); //waits for the train at the station #1    
int count = train.goodsCount();    
for (int i = 0; i < count; i++) {    
lastValue = train.getGoods(i); // unload goods    
}    
railway.sendTrain(); //sends the current train to the first station.    
}    
}    
}.start();
final long start = System.nanoTime();
long i = 0;   
while (i < n) {    
Train train = railway.waitTrainOnStation(0); // waits for the train on the station #0    
int capacity = train.getCapacity();    
for (int j = 0; j < capacity; j++) {    
train.addGoods((int)i++); // adds goods to the train    
}    
railway.sendTrain();
if (i % 100000000 == 0) { //measures the performance per each 100M items   
final long duration = System.nanoTime() - start;|    
final long ops = (i * 1000L * 1000L * 1000L) / duration;    
System.out.format("ops/sec = %,d\n", ops);    
System.out.format("trains/sec = %,d\n", ops / Train.CAPACITY);    
System.out.format("latency nanos = %.3f%n\n", 
duration / (float)(i) * (float) Train.CAPACITY);    
}    
}    
}

通过以不同的火车容量运行测试,结果令我惊讶:

容量

吞吐量:ops / sec

延迟时间:ns

1个

5,190,883

192.6

2

10,282,820

194.5

32

104,878,614

305.1

256

344,614,640

742. 9

2048

608,112,493

3,367.8

32768

767,028,751

42,720.7

在两个线程之间传输消息的吞吐量达到767,028,751 ops / sec,列车容量为32,768长。 它比Nitsan博客中的SPSC队列快几倍。

继续铁路的思路,我考虑了如果我们有两列火车会发生什么? 我认为它应该同时提高吞吐量和减少延迟。 每个车站都有自己的火车 。 一列火车将在第一站装载货物,而第二列火车将在第二站卸载货物,反之亦然。

图7:具有单一生产者和单一消费者的铁路使用两列火车

这是吞吐量的结果:

容量

吞吐量:ops / sec

延迟时间:ns

1个

7,492,684

133.5

2

14,754,786

135.5

32

174,227,656

183.7

256

613,555,475

417.2

2048

940,144,900

2,178.4

32768

797,806,764

41,072.6

结果是惊人的。 它比单列火车的测试结果快1.4倍以上。 对于1的火车容量,等待时间从192.6纳秒减少到133.5纳秒; 显然是一个有希望的迹象。

因此,我的实验还没有结束。 对于2048的火车容量,在线程之间传输消息的等待时间为-2,178.4纳秒,这太多了。 我正在考虑如何减少这种情况,并创建了很多火车的案例:

图8:具有单一生产者和单一消费者的铁路使用许多火车

我还将火车容量减少到一个长值,并开始使用火车数量。 以下是测试结果:

火车数量

吞吐量:ops / sec

延迟时间:ns

2

10,917,951

91.6

32

31,233,310

32.0

256

42,791,962

23.4

1024

53,220,057

18.8

32768

71,812,166

13.9

使用32,768个训练,线程之间发送长值的等待时间减少到13.9纳秒。 当等待时间不是很高并且吞吐量不是那么低时,通过训练火车数量和火车容量,可以将吞吐量和等待时间调整到最佳平衡。

这些数字对于单一生产者和单一消费者(SPSC)而言非常有用。 但是我们如何才能为多个生产者和消费者使用这项功能? 答案很简单-添加更多电台!

图9:具有一个生产者和两个消费者的铁路

每个线程都会等待下一列火车,然后加载/卸载项目,然后将火车发送到下一站。 生产者线程将物品放到火车上,而消费者从火车上获得物品。 火车不断从一个车站到另一个车站绕圈行驶。

为了测试单一生产者/多个消费者(SPMC)案例,我创建了具有8个站点的铁路测试 。 一个站点属于单个生产者,而其他7个站点属于消费者。 结果是:

对于火车数量= 256和火车容量= 32:

ops/sec =116,604,397    
latency nanos = 274.4

对于火车数量= 32和火车容量= 256:

ops/sec =432,055,469    
latency nanos = 592.5

如您所见,即使使用八个工作线程,该测试也显示出不错的结果-432,055,469个操作/秒,包含32个列和256个long的容量。 在测试期间,所有CPU内核均加载到100%。

图10:8个站的铁路测试期间的CPU利用率

在使用Rails算法时,我几乎忘记了我的目标。 以提高“多个生产者/单个消费者”案例的性能。

图11:具有三个生产者和单个消费者的铁路

我创建了一个包含3个生产者和1个消费者的新测试。 每列火车从一个站点到另一个站点跟踪圆,而每个生产者仅承载每列火车容量的1/3。 每趟火车,消费者都会从三个生产者那里获得全部三个物品。 性能测试显示以下平均结果:

ops/sec = 162,597,109 
trains/sec = 54,199,036    
latency ns = 18.5

很好 生产者和消费者的工作速度超过1.6亿次操作/秒。

为了弥补差异,以下结果显示了针对同一案例-3个生产者和1个消费者的 Disruptor测试:

Run 0, Disruptor=11,467,889 ops/sec   
Run 1, Disruptor=11,280,315 ops/sec    
Run 2, Disruptor=11,286,681 ops/sec    
Run 3, Disruptor=11,254,924 ops/sec

在运行另一个具有消息批处理功能的Disruptor 3P:1C测试的结果下方(每批10条消息):

Run 0, Disruptor=116,009,280 ops/sec    
Run 1, Disruptor=128,205,128 ops/sec    
Run 2, Disruptor=101,317,122 ops/sec    
Run 3, Disruptor=98,716,683 ops/sec;

最后是Disruptor测试的结果,但使用3P:1C方案的LinkedBlockingQueue实现:

Run 0, BlockingQueue=4,546,281 ops/sec   
Run 1, BlockingQueue=4,508,769 ops/sec    
Run 2, BlockingQueue=4,101,386 ops/sec    
Run 3, BlockingQueue=4,124,561 ops/sec

如您所见,铁路方法提供的平均吞吐量为162,597,109 ops / sec,而在相同情况下使用Disruptor的最佳结果仅为128,205,128 ops / sec。 对于LinkedBlockingQueue,最佳结果仅为4,546,281 ops / sec。

铁路算法引入了一种简单的事件批处理方法,可显着提高吞吐量。 通过玩火车容量或火车数量,可以很容易地配置它以获得吞吐量/延迟的期望结果。

同样,铁路可以用于生产者和消费者的混合,在真正复杂的情况下使用,而同一线程可以用于消费消息,处理消息并将结果返回到环网:

图12:生产者和消费者混合的铁路

最后,我将提供针对超高吞吐量的单生产者/单消费者测试的优化:

图13:具有单一生产者和单一消费者的铁路

它具有以下平均结果:每秒吞吐量超过一亿五千万(1,569,884,271)次操作,而延迟等于1.3微秒。 如您所见,测试结果与本文开头描述的单线程测试的结果处于相同的数量级,每秒执行2,681,850,373次操作。

在这里,我将让您得出自己的结论。

我希望在以后的文章中演示如何为生产者和消费者的不同组合使用Queue和BlockingQueue接口支持Railway算法。 敬请关注。

翻译自: https://www.infoq.com/articles/High-Performance-Java-Inter-Thread-Communications/?topicPageSponsorship=c1246725-b0a7-43a6-9ef9-68102c8d48e1

java多线程间通信

这篇关于java多线程间通信_Java中的线程间通信以光速传输的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/164871

相关文章

Spring @Scheduled注解及工作原理

《Spring@Scheduled注解及工作原理》Spring的@Scheduled注解用于标记定时任务,无需额外库,需配置@EnableScheduling,设置fixedRate、fixedDe... 目录1.@Scheduled注解定义2.配置 @Scheduled2.1 开启定时任务支持2.2 创建

SpringBoot中使用Flux实现流式返回的方法小结

《SpringBoot中使用Flux实现流式返回的方法小结》文章介绍流式返回(StreamingResponse)在SpringBoot中通过Flux实现,优势包括提升用户体验、降低内存消耗、支持长连... 目录背景流式返回的核心概念与优势1. 提升用户体验2. 降低内存消耗3. 支持长连接与实时通信在Sp

Spring Boot 实现 IP 限流的原理、实践与利弊解析

《SpringBoot实现IP限流的原理、实践与利弊解析》在SpringBoot中实现IP限流是一种简单而有效的方式来保障系统的稳定性和可用性,本文给大家介绍SpringBoot实现IP限... 目录一、引言二、IP 限流原理2.1 令牌桶算法2.2 漏桶算法三、使用场景3.1 防止恶意攻击3.2 控制资源

Mac系统下卸载JAVA和JDK的步骤

《Mac系统下卸载JAVA和JDK的步骤》JDK是Java语言的软件开发工具包,它提供了开发和运行Java应用程序所需的工具、库和资源,:本文主要介绍Mac系统下卸载JAVA和JDK的相关资料,需... 目录1. 卸载系统自带的 Java 版本检查当前 Java 版本通过命令卸载系统 Java2. 卸载自定

springboot下载接口限速功能实现

《springboot下载接口限速功能实现》通过Redis统计并发数动态调整每个用户带宽,核心逻辑为每秒读取并发送限定数据量,防止单用户占用过多资源,确保整体下载均衡且高效,本文给大家介绍spring... 目录 一、整体目标 二、涉及的主要类/方法✅ 三、核心流程图解(简化) 四、关键代码详解1️⃣ 设置

Java Spring ApplicationEvent 代码示例解析

《JavaSpringApplicationEvent代码示例解析》本文解析了Spring事件机制,涵盖核心概念(发布-订阅/观察者模式)、代码实现(事件定义、发布、监听)及高级应用(异步处理、... 目录一、Spring 事件机制核心概念1. 事件驱动架构模型2. 核心组件二、代码示例解析1. 事件定义

SpringMVC高效获取JavaBean对象指南

《SpringMVC高效获取JavaBean对象指南》SpringMVC通过数据绑定自动将请求参数映射到JavaBean,支持表单、URL及JSON数据,需用@ModelAttribute、@Requ... 目录Spring MVC 获取 JavaBean 对象指南核心机制:数据绑定实现步骤1. 定义 Ja

javax.net.ssl.SSLHandshakeException:异常原因及解决方案

《javax.net.ssl.SSLHandshakeException:异常原因及解决方案》javax.net.ssl.SSLHandshakeException是一个SSL握手异常,通常在建立SS... 目录报错原因在程序中绕过服务器的安全验证注意点最后多说一句报错原因一般出现这种问题是因为目标服务器

Java实现删除文件中的指定内容

《Java实现删除文件中的指定内容》在日常开发中,经常需要对文本文件进行批量处理,其中,删除文件中指定内容是最常见的需求之一,下面我们就来看看如何使用java实现删除文件中的指定内容吧... 目录1. 项目背景详细介绍2. 项目需求详细介绍2.1 功能需求2.2 非功能需求3. 相关技术详细介绍3.1 Ja

springboot项目中整合高德地图的实践

《springboot项目中整合高德地图的实践》:本文主要介绍springboot项目中整合高德地图的实践,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一:高德开放平台的使用二:创建数据库(我是用的是mysql)三:Springboot所需的依赖(根据你的需求再