PipedInputStream,PipedOutputStream源码分析

2024-01-22 01:32

本文主要是介绍PipedInputStream,PipedOutputStream源码分析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

           PipedInputStream类与PipedOutputStream类用于在应用程序中创建管道通信.一个PipedInputStream实例对象必须和一个PipedOutputStream实例对象进行连接而产生一个通信管道.PipedOutputStream可以向管道中写入数据,PipedIntputStream可以读取PipedOutputStream向管道中写入的数据.这两个类主要用来完成线程之间的通信.一个线程的PipedInputStream对象能够从另外一个线程的PipedOutputStream对象中读取数据.


          首先简单的介绍一下这两个类的实现原理,PipedInputStreamPipedOutputStream的实现原理类似于"生产者-消费者"原理,PipedOutputStream是生产者,PipedInputStream是消费者,在PipedInputStream中有一个buffer字节数组,默认大小为1024,作为缓冲区,存放"生产者"生产出来的东东.还有两个变量,in,out,in是用来记录"生产者"生产了多少,out是用来记录"消费者"消费了多少,in为-1表示消费完了,in==out表示生产满了.当消费者没东西可消费的时候,也就是当in为-1的时候,消费者会一直等待,直到有东西可消费.


 protected synchronized void receive(int b) throws IOException {// 这里好像有些问题,因为这个方法是在PipedOutputStream类中调用的,而这个方法是protected的,下面另一个receive方法就不是protected,可能是我的源码有些问题,也请大家帮我看看
        checkStateForReceive();// 检测通道是否连接,准备好接收产品
        writeSide = Thread.currentThread();// 当前线程是生产者
        if (in == out)
            awaitSpace();
// 发现通道满了,没地方放东西啦,等吧~~
        if (in < 0{// in<0,表示通道是空的,将生产和消费的位置都置成第一个位置
            in = 0;
            out 
= 0;
        }

        buffer[in
++= (byte) (b & 0xFF);
        
if (in >= buffer.length) {// 如果生产位置到达了通道的末尾,为了循环利用通道,将in置成0
            in = 0;
        }

    }


    
synchronized void receive(byte b[], int off, int len) throws IOException {// 看,这个方法不是protected的!
        checkStateForReceive();
        writeSide 
= Thread.currentThread();
        
int bytesToTransfer = len;// 需要接收多少产品的数量
        while (bytesToTransfer > 0{
            
if (in == out)
                awaitSpace();
            
int nextTransferAmount = 0;// 本次实际可以接收的数量
            if (out < in) {
                nextTransferAmount 
= buffer.length - in;// 如果消费的当前位置<生产的当前位置,则还可以再生产buffer.length-in这么多
            }
 else if (in < out) {
                
if (in == -1{
                    in 
= out = 0;// 如果已经消费完,则将in,out置成0,从头开始接收
                    nextTransferAmount = buffer.length - in;
                }
 else {
                    nextTransferAmount 
= out - in;// 如果消费的当前位置>生产的当前位置,而且还没消费完,那么至少还可以再生产out-in这么多,注意,这种情况是因为通道被重复利用而产生的!
                }

            }

            
if (nextTransferAmount > bytesToTransfer)// 如果本次实际可以接收的数量要大于当前传过来的数量,
                nextTransferAmount = bytesToTransfer;// 那么本次实际只能接收当前传过来的这么多了
            assert (nextTransferAmount > 0);
            System.arraycopy(b, off, buffer, in, nextTransferAmount);
// 把本次实际接收的数量放进通道
            bytesToTransfer -= nextTransferAmount;// 算出还剩多少需要放进通道
            off += nextTransferAmount;
            in 
+= nextTransferAmount;
            
if (in >= buffer.length) {// 到末尾了,该从头开始了
                in = 0;
            }

        }

    }

 

消费产品的行为:

     public   synchronized   int  read()  throws  IOException  {// 消费单个产品
        if (!connected) {
            
throw new IOException("Pipe not connected");
        }
 else if (closedByReader) {
            
throw new IOException("Pipe closed");
        }
 else if (writeSide != null && !writeSide.isAlive() && !closedByWriter
                
&& (in < 0)) {
            
throw new IOException("Write end dead");
        }


        readSide 
= Thread.currentThread();
        
int trials = 2;
        
while (in < 0{// in<0,表示通道是空的,等待生产者生产
            if (closedByWriter) {
                
/* closed by writer, return EOF */
                
return -1;// 返回-1表示生产者已经不再生产产品了,closedByWriter为true表示是由生产者将通道关闭的
            }

            
if ((writeSide != null&& (!writeSide.isAlive()) && (--trials < 0)) {
                
throw new IOException("Pipe broken");
            }

            
/* might be a writer waiting */
            notifyAll();
            
try {
                wait(
1000);
            }
 catch (InterruptedException ex) {
                
throw new java.io.InterruptedIOException();
            }

        }

        
int ret = buffer[out++& 0xFF;
        
if (out >= buffer.length) {
            out 
= 0;// 如果消费到通道的末尾了,从通道头开始继续循环消费
        }

        
if (in == out) {
            
/* now empty */
            in 
= -1;// 消费的位置和生产的位置重合了,表示消费完了,需要生产者生产,in置为-1
        }

        
return ret;
    }


    
public   synchronized   int  read( byte  b[],  int  off,  int  len)  throws  IOException  {
        
if (b == null{
            
throw new NullPointerException();
        }
 else if ((off < 0|| (off > b.length) || (len < 0)
                
|| ((off + len) > b.length) || ((off + len) < 0)) {
            
throw new IndexOutOfBoundsException();
        }
 else if (len == 0{
            
return 0;
        }


        
/* possibly wait on the first character */
        
int c = read();// 利用消费单个产品来检测通道是否连接,并且通道中是否有东西可消费
        if (c < 0{
            
return -1;// 返回-1表示生产者生产完了,消费者也消费完了,消费者可以关闭通道了
        }

        b[off] 
= (byte) c;
        
int rlen = 1;

        
// 这里没有采用receive(byte [], int ,
        
// int)方法中System.arrayCopy()的方法,其实用System.arrayCopy()的方法也可以实现
        /*
         * 这是用System.arrayCopy()实现的方法 int bytesToConsume = len - 1; while
         * (bytesToConsume > 0 && in >= 0) { int nextConsumeAmount = 0; if (out <
         * in) { nextConsumeAmount = in - out; // System.arraycopy(buffer, out,
         * b, off, nextConsumeAmount); } else if (in < out) { nextConsumeAmount =
         * buffer.length - out; }
         * 
         * if (nextConsumeAmount > bytesToConsume) nextConsumeAmount =
         * bytesToConsume; assert (nextConsumeAmount > 0);
         * System.arraycopy(buffer, out, b, off, nextConsumeAmount);
         * bytesToConsume -= nextConsumeAmount; off += nextConsumeAmount; out +=
         * nextConsumeAmount; rlen += nextConsumeAmount; if (out >=
         * buffer.length) { out = 0; } if(in == out) { in = -1; } }
         
*/


        
while ((in >= 0&& (--len > 0)) {
            b[off 
+ rlen] = buffer[out++];
            rlen
++;
            
if (out >= buffer.length) {
                out 
= 0;
            }

            
if (in == out) {
                
/* now empty */
                in 
= -1;// in==out,表示满了,将in置成-1
            }

        }

        
return rlen;
    }

这篇关于PipedInputStream,PipedOutputStream源码分析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/631451

相关文章

MySQL中的表连接原理分析

《MySQL中的表连接原理分析》:本文主要介绍MySQL中的表连接原理分析,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录1、背景2、环境3、表连接原理【1】驱动表和被驱动表【2】内连接【3】外连接【4编程】嵌套循环连接【5】join buffer4、总结1、背景

python中Hash使用场景分析

《python中Hash使用场景分析》Python的hash()函数用于获取对象哈希值,常用于字典和集合,不可变类型可哈希,可变类型不可,常见算法包括除法、乘法、平方取中和随机数哈希,各有优缺点,需根... 目录python中的 Hash除法哈希算法乘法哈希算法平方取中法随机数哈希算法小结在Python中,

Java Stream的distinct去重原理分析

《JavaStream的distinct去重原理分析》Javastream中的distinct方法用于去除流中的重复元素,它返回一个包含过滤后唯一元素的新流,该方法会根据元素的hashcode和eq... 目录一、distinct 的基础用法与核心特性二、distinct 的底层实现原理1. 顺序流中的去重

关于MyISAM和InnoDB对比分析

《关于MyISAM和InnoDB对比分析》:本文主要介绍关于MyISAM和InnoDB对比分析,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录开篇:从交通规则看存储引擎选择理解存储引擎的基本概念技术原理对比1. 事务支持:ACID的守护者2. 锁机制:并发控制的艺

MyBatis Plus 中 update_time 字段自动填充失效的原因分析及解决方案(最新整理)

《MyBatisPlus中update_time字段自动填充失效的原因分析及解决方案(最新整理)》在使用MyBatisPlus时,通常我们会在数据库表中设置create_time和update... 目录前言一、问题现象二、原因分析三、总结:常见原因与解决方法对照表四、推荐写法前言在使用 MyBATis

Python主动抛出异常的各种用法和场景分析

《Python主动抛出异常的各种用法和场景分析》在Python中,我们不仅可以捕获和处理异常,还可以主动抛出异常,也就是以类的方式自定义错误的类型和提示信息,这在编程中非常有用,下面我将详细解释主动抛... 目录一、为什么要主动抛出异常?二、基本语法:raise关键字基本示例三、raise的多种用法1. 抛

github打不开的问题分析及解决

《github打不开的问题分析及解决》:本文主要介绍github打不开的问题分析及解决,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、找到github.com域名解析的ip地址二、找到github.global.ssl.fastly.net网址解析的ip地址三

Mysql的主从同步/复制的原理分析

《Mysql的主从同步/复制的原理分析》:本文主要介绍Mysql的主从同步/复制的原理分析,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录为什么要主从同步?mysql主从同步架构有哪些?Mysql主从复制的原理/整体流程级联复制架构为什么好?Mysql主从复制注意

java -jar命令运行 jar包时运行外部依赖jar包的场景分析

《java-jar命令运行jar包时运行外部依赖jar包的场景分析》:本文主要介绍java-jar命令运行jar包时运行外部依赖jar包的场景分析,本文给大家介绍的非常详细,对大家的学习或工作... 目录Java -jar命令运行 jar包时如何运行外部依赖jar包场景:解决:方法一、启动参数添加: -Xb

Apache 高级配置实战之从连接保持到日志分析的完整指南

《Apache高级配置实战之从连接保持到日志分析的完整指南》本文带你从连接保持优化开始,一路走到访问控制和日志管理,最后用AWStats来分析网站数据,对Apache配置日志分析相关知识感兴趣的朋友... 目录Apache 高级配置实战:从连接保持到日志分析的完整指南前言 一、Apache 连接保持 - 性