Scalable IO in Java

2024-01-05 17:08
文章标签 java io scalable

本文主要是介绍Scalable IO in Java,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

 原文地址:

 http://gee.cs.oswego.edu/dl/cpjslides/nio.pdf

《Scalable IO in Java》是java.util.concurrent包的作者,大师Doug Lea关于分析与构建可伸缩的高性能IO服务的一篇经典文章,在文章中Doug Lea通过各个角度,循序渐进的梳理了服务开发中的相关问题,以及在解决问题的过程中服务模型的演变与进化,文章中基于Reactor反应器模式的几种服务模型架构,也被Netty、Mina等大多数高性能IO服务框架所采用,因此阅读这篇文章有助于你更深入了解Netty、Mina等服务框架的编程思想与设计模式。

 

一、网络服务

基本上所有的网络处理程序都有以下基本的处理过程:

Read request

Decode request

Process service

Encode reply

Send reply

 

1.传统的服务设计模式

每一个连接的处理都会对应分配一个新的线程,下面我们看一段经典的Server端Socket服务代码:

class Server implements Runnable {public void run() {try {ServerSocket ss = new ServerSocket(PORT);while (!Thread.interrupted())new Thread(new Handler(ss.accept())).start(); //创建新线程来handle// or, single-threaded, or a thread pool} catch (IOException ex) { /* ... */ }}static class Handler implements Runnable {final Socket socket;Handler(Socket s) { socket = s; }public void run() {try {byte[] input = new byte[MAX_INPUT];socket.getInputStream().read(input);byte[] output = process(input);socket.getOutputStream().write(output);} catch (IOException ex) { /* ... */ }}      private byte[] process(byte[] cmd) { /* ... */ }}
}

对于每一个请求都分发给一个线程,每个线程中都独自处理上面的流程。

这种模型由于IO在阻塞时会一直等待,因此在用户负载增加时,性能下降的非常快。

 

server导致阻塞的原因:

1、serversocket的accept方法,阻塞等待client连接,直到client连接成功。

2、线程从socket inputstream读入数据,会进入阻塞状态,直到全部数据读完。

3、线程向socket outputstream写入数据,会阻塞直到全部数据写完。

 

client导致阻塞的原因:

1、client建立连接时会阻塞,直到连接成功。

2、线程从socket输入流读入数据,如果没有足够数据读完会进入阻塞状态,直到有数据或者读到输入流末尾。

3、线程从socket输出流写入数据,直到输出所有数据。

4、socket.setsolinger()设置socket的延迟时间,当socket关闭时,会进入阻塞状态,直到全部数据都发送完或者超时。

 

改进:采用基于事件驱动的设计,当有事件触发时,才会调用处理器进行数据处理。

2. 构建高性能可伸缩的IO服务

在构建高性能可伸缩IO服务的过程中,我们希望达到以下的目标:

① 能够在海量负载连接情况下优雅降级;

② 能够随着硬件资源的增加,性能持续改进;

③ 具备低延迟、高吞吐量、可调节的服务质量等特点;

而分发处理就是实现上述目标的一个最佳方式。

 

3、分发模式

分发模式具有以下几个机制:

① 将一个完整处理过程分解为一个个细小的任务;

② 每个任务执行相关的动作且不产生阻塞;

③ 在任务执行状态被触发时才会去执行,例如只在有数据时才会触发读操作;

 

在一般的服务开发当中,IO事件通常被当做任务执行状态的触发器使用,在hander处理过程中主要针对的也就是IO事件;

 

 

 

java.nio包就很好的实现了上述的机制:

① 非阻塞的读和写

② 通过感知IO事件分发任务的执行

 

所以结合一系列基于事件驱动模式的设计,给高性能IO服务的架构与设计带来丰富的可扩展性;

二、基于事件驱动模式的设计

基于事件驱动的架构设计通常比其他架构模型更加有效,因为可以节省一定的性能资源,事件驱动模式下通常不需要为每一个客户端建立一个线程,这意味这更少的线程开销,更少的上下文切换和更少的锁互斥,但任务的调度可能会慢一些,而且通常实现的复杂度也会增加,相关功能必须分解成简单的非阻塞操作,类似与GUI的事件驱动机制,当然也不可能把所有阻塞都消除掉,特别是GC, page faults(内存缺页中断)等。由于是基于事件驱动的,所以需要跟踪服务的相关状态(因为你需要知道什么时候事件会发生);

 

下图是AWT中事件驱动设计的一个简单示意图,可以看到,在不同的架构设计中的基于事件驱动的IO操作使用的基本思路是一致的;

 

三、Reactor模式

Reactor也可以称作反应器模式,它有以下几个特点:

① Reactor模式中会通过分配适当的handler(处理程序)来响应IO事件,类似与AWT 事件处理线程;

② 每个handler执行非阻塞的操作,类似于AWT ActionListeners 事件监听

③ 通过将handler绑定到事件进行管理,类似与AWT addActionListener 添加事件监听;

1、单线程模式

下图展示的就是单线程下基本的Reactor设计模式

Reactor单线程模式示例

class Reactor implements Runnable {final Selector selector;final ServerSocketChannel serverSocket;Reactor(int port) throws IOException { //Reactor初始化selector = Selector.open();serverSocket = ServerSocketChannel.open();serverSocket.socket().bind(new InetSocketAddress(port));serverSocket.configureBlocking(false); //非阻塞SelectionKey sk = serverSocket.register(selector, SelectionKey.OP_ACCEPT); //分步处理,第一步,接收accept事件sk.attach(new Acceptor()); //attach callback object, Acceptor}public void run() {try {while (!Thread.interrupted()) {selector.select();Set selected = selector.selectedKeys();Iterator it = selected.iterator();while (it.hasNext())dispatch((SelectionKey)(it.next()); //Reactor负责dispatch收到的事件selected.clear();}} catch (IOException ex) { /* ... */ }}void dispatch(SelectionKey k) {Runnable r = (Runnable)(k.attachment()); //调用之前注册的callback对象if (r != null)r.run();}class Acceptor implements Runnable { // innerpublic void run() {try {SocketChannel c = serverSocket.accept();if (c != null)new Handler(selector, c);}catch(IOException ex) { /* ... */ }}}
}final class Handler implements Runnable {final SocketChannel socket;final SelectionKey sk;ByteBuffer input = ByteBuffer.allocate(MAXIN);ByteBuffer output = ByteBuffer.allocate(MAXOUT);static final int READING = 0, SENDING = 1;int state = READING;Handler(Selector sel, SocketChannel c) throws IOException {socket = c; c.configureBlocking(false);// Optionally try first read nowsk = socket.register(sel, 0);sk.attach(this); //将Handler作为callback对象sk.interestOps(SelectionKey.OP_READ); //第二步,接收Read事件sel.wakeup();}boolean inputIsComplete() { /* ... */ }boolean outputIsComplete() { /* ... */ }void process() { /* ... */ }public void run() {try {if (state == READING) read();else if (state == SENDING) send();} catch (IOException ex) { /* ... */ }}void read() throws IOException {socket.read(input);if (inputIsComplete()) {process();state = SENDING;// Normally also do first write nowsk.interestOps(SelectionKey.OP_WRITE); //第三步,接收write事件}}void send() throws IOException {socket.write(output);if (outputIsComplete()) sk.cancel(); //write完就结束了, 关闭select key}
}

 

//上面 的实现用Handler来同时处理Read和Write事件, 所以里面出现状态判断

//我们可以用State-Object pattern来更优雅的实现

class Handler { // ...public void run() { // initial state is readersocket.read(input);if (inputIsComplete()) {process();sk.attach(new Sender());  //状态迁移, Read后变成write, 用Sender作为新的callback对象sk.interest(SelectionKey.OP_WRITE);sk.selector().wakeup();}}class Sender implements Runnable {public void run(){ // ...socket.write(output);if (outputIsComplete()) sk.cancel();}}
}

关于Reactor模式的一些概念:

 

Reactor:负责响应IO事件,当检测到一个新的事件,将其发送给相应的Handler去处理。

Handler:负责处理非阻塞的行为,标识系统管理的资源;同时将handler与事件绑定。

Reactor为单个线程,需要处理accept连接,同时发送请求到处理器中。

 

由于只有单个线程,所以处理器中的业务需要能够快速处理完。

2、多线程设计模式

在多处理器场景下,为实现服务的高性能我们可以有目的的采用多线程模式:

  1、增加Worker线程,专门用于处理非IO操作,因为通过上面的程序我们可以看到,反应器线程需要迅速触发处理流程,而如果处理过程也就是process()方法产生阻塞会拖慢反应器线程的性能,所以我们需要把一些非IO操作交给Woker线程来做;

  2、拆分并增加反应器Reactor线程,一方面在压力较大时可以饱和处理IO操作,提高处理能力;另一方面维持多个Reactor线程也可以做负载均衡使用;线程的数量可以根据程序本身是CPU密集型还是IO密集型操作来进行合理的分配;

 

2.1 多线程模式

Reactor多线程设计模式具备以下几个特点:

① 通过卸载非IO操作来提升Reactor 线程的处理性能,这类似与POSA2 中Proactor的设计;

② 比将非IO操作重新设计为事件驱动的方式更简单;

③ 但是很难与IO重叠处理,最好能在第一时间将所有输入读入缓冲区;(这里我理解的是最好一次性读取缓冲区数据,方便异步非IO操作处理数据)

④ 可以通过线程池的方式对线程进行调优与控制,一般情况下需要的线程数量比客户端数量少很多;

 

下面是Reactor多线程设计模式的一个示意图与示例代码(我们可以看到在这种模式中在Reactor线程的基础上把非IO操作放在了Worker线程中执行):

 

 

多线程模式示例:

class Handler implements Runnable {// uses util.concurrent thread poolstatic PooledExecutor pool = new PooledExecutor(...);static final int PROCESSING = 3;// ...synchronized void read() { // ...socket.read(input);if (inputIsComplete()) {state = PROCESSING;pool.execute(new Processer()); //使用线程pool异步执行}}synchronized void processAndHandOff() {process();state = SENDING; // or rebind attachmentsk.interest(SelectionKey.OP_WRITE); //process完,开始等待write事件}class Processer implements Runnable {public void run() {processAndHandOff();}}}

当你把非IO操作放到线程池中运行时,你需要注意以下几点问题:

① 任务之间的协调与控制,每个任务的启动、执行、传递的速度是很快的,不容易协调与控制;

② 每个hander中dispatch的回调与状态控制;

③ 不同线程之间缓冲区的线程安全问题;

④ 需要任务返回结果时,任务线程等待和唤醒状态间的切换;

 

为解决上述问题可以使用PooledExecutor线程池框架,这是一个可控的任务线程池,主函数采用execute(Runnable r),它具备以下功能,可以很好的对池中的线程与任务进行控制与管理:

① 可设置线程池中最大与最小线程数;

② 按需要判断线程的活动状态,及时处理空闲线程;

③ 当执行任务数量超过线程池中线程数量时,有一系列的阻塞、限流的策略;

 

2.2 基于多个反应器的多线程模式

这是对上面模式的进一步完善,使用反应器线程池,一方面根据实际情况用于匹配调节CPU处理与IO读写的效率,提高系统资源的利用率,另一方面在静态或动态构造中每个反应器线程都包含对应的Selector,Thread,dispatchloop,下面是一个简单的代码示例与示意图(Netty就是基于这个模式设计的,一个处理Accpet连接的mainReactor线程,多个处理IO事件的subReactor线程):

参考代码:

Selector[] selectors;  //subReactors集合, 一个selector代表一个subReactor
int next = 0;
class Acceptor { // ...public synchronized void run() { ...Socket connection = serverSocket.accept(); //主selector负责acceptif (connection != null)new Handler(selectors[next], connection); //选个subReactor去负责接收到的connectionif (++next == selectors.length) next = 0;}
}

 

 

感谢:

https://yq.aliyun.com/articles/50466

http://www.mamicode.com/info-detail-2736833.html

https://www.cnblogs.com/luxiaoxun/archive/2015/03/11/4331110.html

 

 

这篇关于Scalable IO in Java的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/573565

相关文章

SpringBoot中HTTP连接池的配置与优化

《SpringBoot中HTTP连接池的配置与优化》这篇文章主要为大家详细介绍了SpringBoot中HTTP连接池的配置与优化的相关知识,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一... 目录一、HTTP连接池的核心价值二、Spring Boot集成方案方案1:Apache HttpCl

Spring Boot项目打包和运行的操作方法

《SpringBoot项目打包和运行的操作方法》SpringBoot应用内嵌了Web服务器,所以基于SpringBoot开发的web应用也可以独立运行,无须部署到其他Web服务器中,下面以打包dem... 目录一、打包为JAR包并运行1.打包为可执行的 JAR 包2.运行 JAR 包二、打包为WAR包并运行

Java进行日期解析与格式化的实现代码

《Java进行日期解析与格式化的实现代码》使用Java搭配ApacheCommonsLang3和Natty库,可以实现灵活高效的日期解析与格式化,本文将通过相关示例为大家讲讲具体的实践操作,需要的可以... 目录一、背景二、依赖介绍1. Apache Commons Lang32. Natty三、核心实现代

Spring Boot 常用注解整理(最全收藏版)

《SpringBoot常用注解整理(最全收藏版)》本文系统整理了常用的Spring/SpringBoot注解,按照功能分类进行介绍,每个注解都会涵盖其含义、提供来源、应用场景以及代码示例,帮助开发... 目录Spring & Spring Boot 常用注解整理一、Spring Boot 核心注解二、Spr

Python文件操作与IO流的使用方式

《Python文件操作与IO流的使用方式》:本文主要介绍Python文件操作与IO流的使用方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、python文件操作基础1. 打开文件2. 关闭文件二、文件读写操作1.www.chinasem.cn 读取文件2. 写

SpringBoot实现接口数据加解密的三种实战方案

《SpringBoot实现接口数据加解密的三种实战方案》在金融支付、用户隐私信息传输等场景中,接口数据若以明文传输,极易被中间人攻击窃取,SpringBoot提供了多种优雅的加解密实现方案,本文将从原... 目录一、为什么需要接口数据加解密?二、核心加解密算法选择1. 对称加密(AES)2. 非对称加密(R

详解如何在SpringBoot控制器中处理用户数据

《详解如何在SpringBoot控制器中处理用户数据》在SpringBoot应用开发中,控制器(Controller)扮演着至关重要的角色,它负责接收用户请求、处理数据并返回响应,本文将深入浅出地讲解... 目录一、获取请求参数1.1 获取查询参数1.2 获取路径参数二、处理表单提交2.1 处理表单数据三、

java变量内存中存储的使用方式

《java变量内存中存储的使用方式》:本文主要介绍java变量内存中存储的使用方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录1、介绍2、变量的定义3、 变量的类型4、 变量的作用域5、 内存中的存储方式总结1、介绍在 Java 中,变量是用于存储程序中数据

如何合理管控Java语言的异常

《如何合理管控Java语言的异常》:本文主要介绍如何合理管控Java语言的异常问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录1、介绍2、Thorwable类3、Error4、Exception类4.1、检查异常4.2、运行时异常5、处理方式5.1. 捕获异常

Spring Boot集成SLF4j从基础到高级实践(最新推荐)

《SpringBoot集成SLF4j从基础到高级实践(最新推荐)》SLF4j(SimpleLoggingFacadeforJava)是一个日志门面(Facade),不是具体的日志实现,这篇文章主要介... 目录一、日志框架概述与SLF4j简介1.1 为什么需要日志框架1.2 主流日志框架对比1.3 SLF4