Java里的管道输入流 PipedInputStream与管道输出流 PipedOutputStream

2024-05-04 05:38

本文主要是介绍Java里的管道输入流 PipedInputStream与管道输出流 PipedOutputStream,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

测试代码:

import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;/*** @author* @desc* @date 2018-04-08*/
public class TestMain
{public static void main(String[] args){PipedInputStream pIn = new PipedInputStream(1);PipedOutputStream pOut = new PipedOutputStream();try{pIn.connect(pOut);}catch (IOException e){e.printStackTrace();}Thread th1 = new Thread(() -> {try (PipedOutputStream pos = pOut){int a = 0;String str;byte[] strs;do{if(a == 9){str = "OK";}else{str = (a+"");}strs = str.getBytes();pos.write(strs,0,strs.length);pos.flush();a++;}while (a<10);}catch (IOException e){e.printStackTrace();}});Thread th3 = new Thread(() -> {byte[] buf = new byte[1024];int l;try (PipedInputStream pin = pIn){while (( l = pin.read(buf) ) != -1){if("OK".equals(new String(buf,0,l))){System.out.println("=================");}else {System.out.println(new String(buf,0,l));System.out.println("***************************");}}}catch (IOException e){e.printStackTrace();}});ExecutorService executorService = Executors.newCachedThreadPool();executorService.execute(th1);executorService.execute(th3);executorService.shutdown();}//高位在前,低位在后public static byte[] int2bytes(int num){byte[] result = new byte[4];result[0] = (byte)((num >>> 24) & 0xff);//说明一result[1] = (byte)((num >>> 16)& 0xff );result[2] = (byte)((num >>> 8) & 0xff );result[3] = (byte)((num >>> 0) & 0xff );return result;}//高位在前,低位在后public static int bytes2int(byte[] bytes){int result = 0;if(bytes.length == 4){int a = (bytes[0] & 0xff) << 24;int b = (bytes[1] & 0xff) << 16;int c = (bytes[2] & 0xff) << 8;int d = (bytes[3] & 0xff);result = a | b | c | d;}return result;}
}

运行结果:

0
***************************
1
***************************
2
***************************
3
***************************
4
***************************
5
***************************
6
***************************
7
***************************
8
***************************
O
***************************
K
***************************

以下内容转载自:https://blog.csdn.net/zlp1992/article/details/50298195

Java里的管道输入流PipedInputStream与管道输出流PipedOutputStream实现了类似管道的功能,用于不同线程之间的相互通信,下面说下自己的一些理解。

java的管道输入与输出实际上使用的是一个循环缓冲数组来实现,这个数组默认大小为1024字节。输入流PipedInputStream从这个循环缓冲数组中读数据,输出流PipedOutputStream往这个循环缓冲数组中写入数据。当这个缓冲数组已满的时候,输出流PipedOutputStream所在的线程将阻塞;当这个缓冲数组首次为空的时候,输入流PipedInputStream所在的线程将阻塞。但是在实际用起来的时候,却会发现并不是那么好用。

Java在它的jdk文档中提到不要在一个线程中同时使用PipeInpuStream和PipeOutputStream,这会造成死锁

下面看一些实际使用例子以及通过源码分析下这个管道流的实现,实际上这两个类不复杂而且比较独立(只分别继承了InputStream和OutputStream然后就没有了)。

一、 管道输入流PipedInputStream与管道输出流PipedOutputStream建立连接

一般我们使用都是先定义一个管道输入流PipedInputStream对象和管道输出流PipedOutputStream对象,然后将他们关联起来,建立了一条”管道”。

使用connect()方法

        PipedInputStream pipedInputStream=new PipedInputStream();PipedOutputStream pipedOutputStream=new PipedOutputStream();try {pipedInputStream.connect(pipedOutputStream);} catch (IOException e) {e.printStackTrace();}

不使用connect()方法

        PipedInputStream pipedInputStream=new PipedInputStream();PipedOutputStream pipedOutputStream=null;try {pipedOutputStream = new PipedOutputStream(pipedInputStream);} catch (IOException e) {e.printStackTrace();}

第一种方式中,先定义一个PipedInputStream对象和一个PipedOutputStream对象,然后调用两个对象中任意一个的connect方法将输入流和输出流关联起来,这里调用 pipedInputStream.connect(pipedOutputStream)或者pipedOutputStream.connect(pipedInputStream)效果是一样的,但是要注意 只能选择其中的一个而不能两个connect同时调用,即不能像下面一样使用:

        PipedInputStream pipedInputStream=new PipedInputStream();PipedOutputStream pipedOutputStream=new PipedOutputStream();        try {pipedInputStream.connect(pipedOutputStream);pipedOutputStream.connect(pipedInputStream);} catch (IOException e) {e.printStackTrace();}

这样的话会报错 “Already Connected”


java.io.IOException: Already connectedat java.io.PipedOutputStream.connect(Unknown Source)at com.idc.pipe.test.OneThreadTest.main(OneThreadTest.java:20)

我们来看下建立”管道”的过程中做了什么,PipedInputStream的无参构造方法中调用了私有的成员方法initPipe(int pipeSize) 传入的参数DEFAULT_PIPE_SIZE为1024,initPipe(int pipeSize)方法如下:

private void initPipe(int pipeSize) {if (pipeSize <= 0) {throw new IllegalArgumentException("Pipe Size <= 0");}buffer = new byte[pipeSize];}

其中成员变量buffer为字节数组。因此在我们new一个PipedInputStream对象之后,PipedInputStream对象便有一个大小为1024字节的数组,这个数组即我们前面提到的循环缓冲数组,用于保存管道输出流写入的数据,并且向管道输入流提供数据。

再来看下管道输出流 PipedOutputStream的无参构造方法,PipedOutputStream的构造方法 PipedOutputStream()是空的,什么也没做。

下面看下connect方法,PipedInputStream和PipedOutputStream均有connect方法,PipedInputStream中的connect方法是通过PipedOutputStream的connect方法实现。
PipedInputStream中的connect方法:

public void connect(PipedOutputStream src) throws IOException {src.connect(this);}

PipedOutputStream中的connect方法:

public synchronized void connect(PipedInputStream snk) throws IOException {if (snk == null) {throw new NullPointerException();} else if (sink != null || snk.connected) {throw new IOException("Already connected");}sink = snk;snk.in = -1;snk.out = 0;snk.connected = true;}

先介绍下PipedInputStream的几个成员变量

protected int in = -1; /*代表连接该管道输入流的输出流PipedOutputStream下一个字节将存储在循环缓冲数组buffer的位置。当in<0说明缓冲数组是空的;当in==out说明缓冲数组已满。*/
protected int out = 0; //代表该管道输入流下一个要读取的字节在循环缓冲数组中的位置
boolean connected = false; //表示该管道输入流是否与管道输出流建立了连接,true为已连接

再介绍下PipedOutputStream的成员变量


private PipedInputStream sink; //代表与该管道输出流建立了连接的管道输入流PipedInputStream对象

PipedOutputStream的connect方法是一个同步方法,需要事先获取PipedOutputStream对象锁,当我们第一次在PipedInpuStream对象上调用connect(PipedOutputStream out)的时候,此时PipedOutputStream对象的成员变量sink为空并且PipeInputStream对象的成员变量 connected为false,建立连接成功;当我们之后在同一个PipedOutputStream对象上调用connect(PipedInputStream in)来连接同一个PipedInputStream对象的时候,由于此时PipedOutputStream对象的成员变量sink不为null且PipedInputStream对象的成员变量connected为true,因此抛出异常。由此可见:

对于同一个PipedInputStream对象和同一个PipedOutputStream对象,不能既调用PipedInpuSteam对象的connect方法又调用PipedOutputStream的connect方法
一个PipedInputStream对象不能连接多个PipedOutputStream对象;一个PipedOutputStream对象也不能连接多个PipedInputStream对象。也即一个管道输入流只能对应一个管道输出流,只能一对一。
PipedInputStream和PipedOutputStream的带参数构造方法最终也是通过PipedOutputStream里的connect方法来建立连接的,因此略过。

二、在一个线程里使用PipedInpuStream和PipedOutputStream(会造成死锁?)

示例代码 1 如下:

    public static void main(String[] args) {PipedInputStream pipedInputStream=new PipedInputStream();PipedOutputStream pipedOutputStream=new PipedOutputStream();    try {pipedInputStream.connect(pipedOutputStream);//默认一次最多只能写入1024字节byte[] data=new byte[1000];byte[] store=new byte[20];Arrays.fill(data, (byte)1);System.out.println("first writing data");//每次写1000字节数据pipedOutputStream.write(data,0,data.length);System.out.println("finish first writing");int count=1;while(count<100){System.out.println(count+" times read data");pipedInputStream.read(store, 0, store.length); //每次读20字节数据System.out.println(count+" times finish reading data");System.out.println((count+1)+" times write data");pipedOutputStream.write(data);//每次写1000字节数据System.out.println((count+1)+" times finish writing data");count++;}} catch (IOException e) {e.printStackTrace();}   }

结果如下:

first writing data
finish first writing
1 times read data
1 times finish reading data
2 times write data

可以看到,第二次尝试通过管道输出流PipedOutputStream写数据的时候阻塞,同时也无法从管道输入流PipedInputStream读取数据,我们通过源码看下这是怎么发生的?
第一次往”管道”写入1000字节数据:


pipedOutputStream.write(data,0,data.length);

管道输出流的write方法如下:

public void write(byte b[], int off, int len) throws IOException {if (sink == null) {throw new IOException("Pipe not connected");} else if (b == null) {throw new NullPointerException();} else if ((off < 0) || (off > b.length) || (len < 0) ||((off + len) > b.length) || ((off + len) < 0)) {throw new IndexOutOfBoundsException();} else if (len == 0) {return;}sink.receive(b, off, len);}

可以看到当试图通过输出流PipedOutputStream对象往”管道”写数据时,如果事先没有输入流PipedInputStream对象与该输出流建立连接(即sink==null),则报错。同时管道输出流是通过与其建立了连接的管道输入流PipedInputStream对象来写入数据的,因此我们看下输入流的receive方法,如下:


synchronized void receive(byte b[], int off, int len)  throws IOException {checkStateForReceive();writeSide = Thread.currentThread();int bytesToTransfer = len;while (bytesToTransfer > 0) {if (in == out)awaitSpace();int nextTransferAmount = 0;if (out < in) {nextTransferAmount = buffer.length - in;} else if (in < out) {if (in == -1) {in = out = 0;nextTransferAmount = buffer.length - in;} else {nextTransferAmount = out - in;}}if (nextTransferAmount > bytesToTransfer)nextTransferAmount = bytesToTransfer;assert(nextTransferAmount > 0);System.arraycopy(b, off, buffer, in, nextTransferAmount);bytesToTransfer -= nextTransferAmount;off += nextTransferAmount;in += nextTransferAmount;if (in >= buffer.length) {in = 0;}}}

PipedInputStream对象的receive方法是一个同步方法,也就是说当在一个PipedInputStream对象上调用其receive方法时,该对象所在的线程必须先获得这个PipedInputStream对象的锁,才能进入该方法。由于我们只有一个线程且是第一次使用PipedInputStream对象调用receive方法,因此便获得了PipedInputStream对象pipedInputStream的对象锁。receive方法中首先调用checkStateForReceive()方法,如下:

    private void checkStateForReceive() throws IOException {if (!connected) {throw new IOException("Pipe not connected");} else if (closedByWriter || closedByReader) {throw new IOException("Pipe closed");} else if (readSide != null && !readSide.isAlive()) {throw new IOException("Read end dead");}}

checkStateForReceive做一些事前检测,注意到这里的readSide != null && !readSide.isAlive() 如果管道输入流所在的线程不为空但是线程已死(之前的管道输入流所在的线程已死),则会抛出异常。由于我们还没有读,因此readSide==null,这里也就不会报异常。那么这个readSide是干什么的?什么时候赋值的呢? 后面会说明。

检查完后,将当前线程保存至成员变量 writeSide,即表示管道输出流所在的线程,最开始的时候buffer为1024,in为-1, out为0。第一次写入1000字节的时候能够全部成功写入,写入完成后,in为1000,out为0,receive方法释放锁。
循环缓冲数组分布如下:
这里写图片描述
接着来到第一次通过管道输入流PipedInputStream对象pipedInputStream读取数据pipedInputStream.read(store, 0, store.length); 例子中我们读取20字节数据,read(store, 0, store.length)方法如下:

public synchronized int read(byte b[], int off, int len)  throws IOException {if (b == null) {throw new NullPointerException();} else if (off < 0 || len < 0 || len > b.length - off) {throw new IndexOutOfBoundsException();} else if (len == 0) {return 0;}/* possibly wait on the first character */int c = read();if (c < 0) {return -1;}b[off] = (byte) c;int rlen = 1;while ((in >= 0) && (len > 1)) {int available;if (in > out) {available = Math.min((buffer.length - out), (in - out));} else {available = buffer.length - out;}// A byte is read beforehand outside the loopif (available > (len - 1)) {available = len - 1;}System.arraycopy(buffer, out, b, off + rlen, available);out += available;rlen += available;len -= available;if (out >= buffer.length) {out = 0;}if (in == out) {/* now empty */in = -1;}}return rlen;}

管道输入流PipedInpuStream的read方法也是一个同步方法,因此PipedInpuStream对象所在的线程也需要事先获得PipedInpuStream对象的对象锁才能进入该方法。可以看到在读取一个数组大小的数据时,read方法先读取了一个字节的数据,这是通过其另一个重载版本的无参read()方法实现,如下:

public synchronized int read()  throws IOException {if (!connected) {throw new IOException("Pipe not connected");} else if (closedByReader) {throw new IOException("Pipe closed");} else if (writeSide != null && !writeSide.isAlive()&& !closedByWriter && (in < 0)) {throw new IOException("Write end dead");}readSide = Thread.currentThread();int trials = 2;while (in < 0) {if (closedByWriter) {/* closed by writer, return EOF */return -1;}if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)) {throw new IOException("Pipe broken");}/* might be a writer waiting */notifyAll();try {wait(1000);} catch (InterruptedException ex) {throw new java.io.InterruptedIOException();}}int ret = buffer[out++] & 0xFF;if (out >= buffer.length) {out = 0;}if (in == out) {/* now empty */in = -1;}return ret;}

那么它为什么要先读一个字节呢?

在read()方法中,注意 writeSide != null && !writeSide.isAlive()
&& !closedByWriter && (in < 0) 即如果管道输出流所在的线程不为空但是线程已死并且缓冲区没有数据则抛出异常,这和checkStateForReceive()方法中的readSide != null && !readSide.isAlive() 是类似的,这里总结下

当首次使用管道输出流PipedOutputStream对象的write方法写数据后,该输出流连接的输入流PipedInputStream的成员变量writeSide便不为null; 当首次使用管道输入流PipedInputStream对象的read方法读数据后,成员变量readSide便不为null, 注意readSide和writeSide均为PipedInputStream类的成员变量。

假设管道输入流所在的线程A与管道输出流所在的线程B建立了连接,当B写完数据后终止,这时A去读数据不会出错,但是当A再次去读取数据的时候,如果缓冲区没有数据(即缓冲区为空)同时由于线程B已经死亡,这时便会报错,示例代码 2 如下:

public static void main(String[] args) {final PipedInputStream pipedInputStream=new PipedInputStream();final PipedOutputStream pipedOutputStream ;Thread otherThread=null;try {pipedOutputStream=new PipedOutputStream(pipedInputStream);otherThread=new Thread(){@Overridepublic void run(){try {System.out.println(Thread.currentThread()+" write data");pipedOutputStream.write(5);System.out.println(Thread.currentThread()+" finish write data");} catch (IOException e) {e.printStackTrace();}}};otherThread.start();} catch (IOException e) {e.printStackTrace();}try {TimeUnit.SECONDS.sleep(10);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(otherThread.isAlive());try {System.out.println(Thread.currentThread()+" first read data");pipedInputStream.read();System.out.println(Thread.currentThread()+" Second read data");pipedInputStream.read();} catch (IOException e) {e.printStackTrace();}}

结果如下:


Thread[Thread-0,5,main] write data
Thread[Thread-0,5,main] finish write data
false
Thread[main,5,main] first read data
Thread[main,5,main] Second read data
java.io.IOException: Write end deadat java.io.PipedInputStream.read(Unknown Source)at com.idc.pipe.test.DoubleThreadTest.main(DoubleThreadTest.java:48)

readSide代表管道输入流PipedInputStream对象所在的线程,在read()方法里赋值

现在回答它为什么要先读一个字节?
我们知道如果当输入流去读数据的时候此时循环数组中没有数据怎么办?输入流便阻塞,因此read(byte b[], int off, int len)调用read()先读一个字节就是为了当缓冲区没有数据的时候,输入流所在的线程能够阻塞 可以看到在read()方法里有个循环不停的判断in是否小于0,同时在循环里调用notifyAll()来唤醒那些被阻塞的输出流线程。

回到我们的例子,当第一次读取20字节的数据后,缓冲数组还剩余980字节数据,此时in为1000,out为20,这是buffer分布如下:
这里写图片描述
再次往里面写入1000字节数据,来到receive方法,由于不能一次性将1000字节写入循环缓冲数组buffer中,因此receive里的第一次循环写入了24字节的数据,此时in=1024, out=20, buffer分布如下
这里写图片描述
由于数组前面还有20个字节的剩余空空间,同时in!=out,且还有数据未写入,进入第二次循环写入20字节的数据,此时in==out=20,还有1000-24-20字节数据待写入,此时buffer分布如下
这里写图片描述
再次进入循环,由于in==out,进入awaitSpace(),如下:

private void awaitSpace() throws IOException {while (in == out) {checkStateForReceive();/* full: kick any waiting readers */notifyAll();try {wait(1000);} catch (InterruptedException ex) {throw new java.io.InterruptedIOException();}}}

由于我们的管道输入流和输出流处于同一个线程,输出流一直处于while(in==out)循环中,而输入流因为得不到对象锁而无法读数据(由于输入流和输出流位于且只位于同一个线程,当调用notifyAll()的时候是肯定没有输入流wait在read()方法里),当输出流wait(1000)之后in还是等于out,便进入了死循环,造成死锁(PS: 这里的notify与wait自己还不是很清楚,有待加强)。

这里就有个有意思的地方,当我们将管道输入流与输出流建立连接但是没有进行过数据写,我第一次去尝试读的时候,由于缓冲区为空,读线程将被阻塞,示例代码 3 如下:

public static void main(String[] args) {final PipedInputStream pipedInputStream=new PipedInputStream();final PipedOutputStream pipedOutputStream ;Thread otherThread=null;try {pipedOutputStream=new PipedOutputStream(pipedInputStream);otherThread=new Thread(){@Overridepublic void run(){
//                  try {
//                      System.out.println(Thread.currentThread()+" write data");
//                      pipedOutputStream.write(5);
//                      System.out.println(Thread.currentThread()+" finish write data");
//                  } catch (IOException e) {
//                      e.printStackTrace();
//                  }}};otherThread.start();} catch (IOException e) {e.printStackTrace();}try {TimeUnit.SECONDS.sleep(10);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(otherThread.isAlive());try {System.out.println(Thread.currentThread()+" first read data");pipedInputStream.read();System.out.println(Thread.currentThread()+" Second read data");pipedInputStream.read();} catch (IOException e) {e.printStackTrace();}}

结果为:


false
Thread[main,5,main] first read data

但是如果我写过至少一次数据之后,如果缓冲区为空且此时管道输出流所在的线程已死亡,那么当尝试去读的时候,便会发生异常,如上面的示例代码2 或者把示例代码3中的注释去掉。

这篇关于Java里的管道输入流 PipedInputStream与管道输出流 PipedOutputStream的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/958436

相关文章

测试服务搭建之centos7下安装java

一 安装Java 1 创建普通用户 useradd userNameTest passwd userNameTest 提示输入密码:【输入wujian的密码】 user:userNameTest group:root passwd:123456789   2 给“userNameTest”分配root权限 vi /etc/sudoers 在文件中找到 # %wheel ALL=(

selenium +java 多个类公用driver问题

问题点:太久没有写selenium代码,居然把driver公用的问题忘记了,即:每写一个测试类,执行过程中都会新建一个窗口,这样应该说是非常不专业的。 大概想了一个方法,虽然看起来也不怎么专业,但感觉能用就很开心了。 解决步骤:                1 创建一个获取获取driver的方法getDriver()                2 创建成员变量,将 getDriver()赋值

mybaits输出helloworld-------mybatis(二)

mybatis输出helloworld 创建数据库(临时学习的话,建议使用docker) 这里存在一个误区,虽然容器的3306已经映射到主机3306,但是扔不能使用连接命令直接在物理机进行 连接,需要使用docker exec -it 容器名 /bin/bash 进入容器内部进行连接 测试数据库是否能正常连接,编辑完成之后,最好测试一下 常见误区:由于使用的是腾讯云服务器,所以腾讯云的安

IDEA +maven git tomcat database数据库 调试 插件 log4j Spring junit

前言 idea优化配置、常规配置、配置maven、git、tomcat、database数据库、调试配置、插件配置、log4j配置、Spring配置等等,稍后一一更新! 优化配置(#item1 “item1”) 打开文件 :“idea – > bin – >idea64.exe.vmoptions” -Xms: 初始内存;-Xmx : 最大内存;-ReservedCodeCache

4-Springboot集成FLOWABLE之流程驳回

目录标题 演示地址效果功能后端代码补充 演示地址 效果 功能 默认驳回到上一节点 后端代码 flowable自带驳回功能, 在源码ProcessInstanceResource.class下已有该功能,不需要自己额外去写 @ApiOperation(value = "Change the state a process instance", tags = { "Pr

bimface 模型集成-后端(java)上传、发起转换、获取转换状态

目录 前言后端架构流程存储表结构全局工具类先根据appid, appsecret 生成accesstoken, 保存到自己的存储服务器。利用保存的 accesstoken 上传模型发起转换获取转换状态根据bimface文件ID获取模型viewtoken, 获取到viewtoken就可以利用前端浏览模型或图纸了 前言 之前没有注意官方有个sdk,然后自己就实现了这么个逻辑。建议

JavaScript的变量申明提前

变量提升 JavaScript的函数定义有个特点,它会先扫描整个函数体的语句,把所有申明的变量“提升”到函数顶部: 只是将变量的申明提前,而不提前变量的值和函数的值 'use strict';function foo() {var x = 'Hello, ' + y;alert(x);var y = 'Bob';}foo(); 虽然是strict模式,但语句var x = 'Hell

Java ArryList

ArrayList简介 ArrayList就是传说中的动态数组,用MSDN中的说法,就是Array的复杂版本,它提供了如下一些好处: 1、动态的增加和减少元素; 2、实现了ICollection和IList接口 ; 3、灵活的设置数组的大小; ArrayList的基本用法 1、创建一个动态数组,并赋值 //创建一个动态数组ArrayList list = new ArrayL

SpringBoot 学习六:数据库的增删改查

1、新建一个Girl类,添加如下代码: package controlle;import javax.persistence.Entity;import javax.persistence.GeneratedValue;import javax.persistence.Id;@Entitypublic class Girl {@Id@GeneratedValueprivate Integer

SpringBoot 学习五:连接数据库

1、在pom.xml需要添加与数据库相关的两个依赖: <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-jpa</artifactId></dependency><dependency><groupId>mysql</groupId><artifactId>my