管道流是用来在多个线程之间进行信息传递的Java流,包括字节管道读取流PipedInputStream和字节管道写入流PipedOutputStream、字符管道读取流PipedReader和字符管道写入流PipedWriter。其中读取流是读取者/消费者/接收者,写入流是写入者/生产者/发送者。
需要注意的是:
管道流仅用于多个线程之间传递信息,若用在同一个线程中可能会造成死锁。
管道流的输入输出是成对的,一个输出流只能对应一个输入流,使用构造函数或者connect函数进行连接。
一对管道流包含一个缓冲区,其默认值为1024个字节,若要改变缓冲区大小,可以使用带有参数的构造函数。
管道的读写操作是互相阻塞的,当缓冲区为空时,读操作阻塞;当缓冲区满时,写操作阻塞。
管道依附于线程,因此若线程结束,则虽然管道流对象还在,仍然会报错“read dead end”。
管道流的读取方法与普通流不同,只有输出流正确close时,输出流才能读到-1值。
PipedReader/PipedWriter和PipedInputStream/PipedOutputStream源码相似,思路相同,所以以下以PipedInputStream/PipedOutputStream源码为例。
由于connect方法在PipedOutputStream中,所以从PipedOutputStream开始看。
java.io.PipedOutputStream继承了基类OutputStream。包含PipedInputStream实例sink,在构造函数中可以与传入管道输入流进行连接。connect方法所做的是将传入管道输入流传给参数sink,并且初始化一些参数和状态。由于管道输入输出流是一一对应的,在进行连接前,connect方法会进行判断,若双方任何一个已有连接则抛出异常。
/*对应的管道输入流*/ private PipedInputStream sink; /*构造函数:连接传入的管道输入流*/ public PipedOutputStream(PipedInputStream snk) throws IOException { connect(snk); } /*空参数构造函数:未进行连接*/ public PipedOutputStream() { } /*connect方法*/ public synchronized void connect(PipedInputStream snk) throws IOException { if (snk == null) { throw new NullPointerException(); } else if (sink != null || snk.connected) { throw new IOException("Already connected"); } sink = snk; snk.in = -1; snk.out = 0; snk.connected = true; }
PipedOutputStream是生产者/写入者,将数据写到“管道”中,由对应的PipedInputStream来读取,不过缓冲区在PipedInputStream之中,上面connect时初始化的也是对应PipedInputStream中的参数,PipedOutputStream实例在写入时,调用的是对应的消费者/读取者来receive数据。
public void write(int b) throws IOException { if (sink == null) { throw new IOException("Pipe not connected"); } sink.receive(b); } public void write(byte b[], int off, int len) throws IOException { if (sink == null) { throw new IOException("Pipe not connected"); } else if (b == null) { throw new NullPointerException(); } else if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) || ((off + len) < 0)) { throw new IndexOutOfBoundsException(); } else if (len == 0) { return; } sink.receive(b, off, len); }
其close()方法也只是调用对应PipedInputStream 的receivedLast方法来实现。
public void close() throws IOException { if (sink != null) { sink.receivedLast(); } }
PipedInputStream中,该方法将closeByWriter置为true并且唤醒所有等待线程,将所有数据写入PipedInputStream的缓冲区。
synchronized void receivedLast() { closedByWriter = true; notifyAll(); }
而PipedInputStream的close()方法则是将closedByReader置为True。而closeByWriter和closedByReader两个变量在PipedInputStream的receive以及read方法中有不同的作用,在closeByWriter和closedByReader任一为True的时候都不能再调用receive方法进行写入,而在closeByWriter为True,而closedByReader为False时,若缓冲区仍有数据未读取,则可以继续读取。
public void close() throws IOException { closedByReader = true; synchronized (this) { in = -1; } }
从PipedOutputStream的源码可以看出来,PipedOutputStream做的只是连接对应的PipedInputStream 实例并在写入时调用对应的receive方法,管道流具体的实现还是主要在PipedInputStream 之中。
java.io.PipedInputStream继承了基类InputStream,其主要参数包含以下几个部分:
负责连接与colse的参数
读取线程与写入线程
“管道”,即缓冲区相关参数
/*负责连接与close的参数*/ boolean closedByWriter = false; volatile boolean closedByReader = false; boolean connected = false; /*读取线程与写入线程*/ Thread readSide; Thread writeSide; /*“管道”,即缓冲区相关参数*/ private static final int DEFAULT_PIPE_SIZE = 1024; protected static final int PIPE_SIZE = DEFAULT_PIPE_SIZE; protected byte buffer[];//“管道” protected int in = -1;//指向下一个“写入”的位置 protected int out = 0;//指向下一个“读取”的位置
而构造方法所做的两件事情分别是:1、为“管道”的缓冲区分配空间。2、连接对应的PipedOutputStream。四个构造方法的区别在于,当传入了缓冲区大小则按照自定义大小分配空间,没有缓冲区大小参数则使用默认大小,当传入PipedOutputStream参数则进行连接,反之则暂时不进行连接。
/*构造方法1:使用默认“管道”大小,并连接传入的PipedOutputStream*/ public PipedInputStream(PipedOutputStream src) throws IOException { this(src, DEFAULT_PIPE_SIZE); } /*构造方法2:自定义“管道”大小,并连接传入的PipedOutputStream*/ public PipedInputStream(PipedOutputStream src, int pipeSize) throws IOException { initPipe(pipeSize); connect(src); } /*构造方法3:使用默认“管道”大小,并未进行连接*/ public PipedInputStream() { initPipe(DEFAULT_PIPE_SIZE); } /*构造方法4:自定义“管道”大小,并未进行连接*/ public PipedInputStream(int pipeSize) { initPipe(pipeSize); } /*为“管道”按照大小分配空间*/ private void initPipe(int pipeSize) { if (pipeSize <= 0) { throw new IllegalArgumentException("Pipe Size <= 0"); } buffer = new byte[pipeSize]; } /*PipedInputStream.connect()调用传入PipedOutputStream的connect方法*/ public void connect(PipedOutputStream src) throws IOException { src.connect(this); }
主要问题在于receive系列方法及read系列方法。
PipedInputStream的receive方法,在功能上是实现了“写入”的功能的,将传入的数据写入到“管道”之中。
首先涉及两个方法checkStateForReceive和awaitSpace。
checkStateForReceive方法所做的是确认这对管道流可用:1、写入者和读取者是否已连接 2、是否关闭 3、读取线程是否有效。
private void checkStateForReceive() throws IOException { if (!connected) { throw new IOException("Pipe not connected"); } else if (closedByWriter || closedByReader) { throw new IOException("Pipe closed"); } else if (readSide != null && !readSide.isAlive()) { throw new IOException("Read end dead"); } }
awaitApace则是在“管道”缓冲区已满的时候,阻塞数据写入。ps:由于这个缓冲区使用时可以看做一个循环队列,缓冲区已满判断条件是in==out,而判断缓冲区为空的条件是in=-1(read的时候缓冲区为空会将in置为-1)。
private void awaitSpace() throws IOException { while (in == out) { checkStateForReceive(); /* full: kick any waiting readers */ notifyAll(); try { wait(1000); } catch (InterruptedException ex) { throw new java.io.InterruptedIOException(); } } }
两个receive方法中receive(int b)
方法比较简单,在判断了管道流可用以及缓冲区未满之后写入,只是在写入到了缓冲区队尾(且缓冲区未满)的时候会跳到队头继续写入。
而receive(byte b[], int off, int len)
方法在写入字节数组的时候会复杂些。有关“管道”缓冲区,数据有以下三种情况:
缓冲区为空:in=-1,out=0,初始时以及在读取的时候发现缓冲区为空会将in置为-1
缓冲区有数据但是未满:in<out或者out<in
缓冲区已满:int==out
receive(byte b[], int off, int len)
所做的是bytesToTransfer保留还有多少字节未写入,nextTransferAmount保存下一个可写入空间的大小,写入后,若bytesToTransfer仍大于0,则继续循环,判断缓冲区情况,尝试寻找下一个可写入空间直至全部写入。
synchronized void receive(byte b[], int off, int len) throws IOException { checkStateForReceive(); writeSide = Thread.currentThread(); int bytesToTransfer = len; while (bytesToTransfer > 0) { if (in == out)/*缓冲区已满:阻塞*/ awaitSpace(); int nextTransferAmount = 0; /*判断下一个可写入的连续空间的大小*/ if (out < in) {/*当out<in,下一个可写入的空间是in到队尾*/ nextTransferAmount = buffer.length - in; } else if (in < out) { if (in == -1) {/*当缓冲区为空,下一个可写入的空间是0到队尾*/ in = out = 0; nextTransferAmount = buffer.length - in; } else {/*当in<out,下一个可写入的空间是in-->out的空间*/ nextTransferAmount = out - in; } } if (nextTransferAmount > bytesToTransfer)/*如果空间足够写入则写入全部*/ nextTransferAmount = bytesToTransfer; assert(nextTransferAmount > 0); System.arraycopy(b, off, buffer, in, nextTransferAmount); bytesToTransfer -= nextTransferAmount;/*如果空间不足够写入,则减去已写入的部分,进入下一个循环找下一个可写入的空间*/ off += nextTransferAmount; in += nextTransferAmount; if (in >= buffer.length) { in = 0; } } }
read方法包含read()
和read(byte b[], int off, int len)
。
read()
方法在判断了是否有关联PipedOutputStream,读取流是否关闭和写入流是否完全关闭之后开始尝试读取数据,有以下情况:
缓冲区为空,则先尝试唤醒全部等待线程并等待,等待对应的写入线程是否有未完成的写入。若有则等待写入后读取,若无,则尝试2次之后抛出异常并退出。
缓冲区不为空,则直接读取数据,更新参数
public synchronized int read() throws IOException { /*判断是否有关联PipedOutputStream,读取流是否关闭和写入流是否完全关闭*/ if (!connected) { throw new IOException("Pipe not connected"); } else if (closedByReader) { throw new IOException("Pipe closed"); } else if (writeSide != null && !writeSide.isAlive() && !closedByWriter && (in < 0)) { throw new IOException("Write end dead"); } readSide = Thread.currentThread(); int trials = 2; /*如果缓冲区为空*/ while (in < 0) { /*缓冲区为空,并且写入流已经关闭则结束并返回-1*/ if (closedByWriter) { /* closed by writer, return EOF */ return -1; } /*如果写入线程不再活动,并且已经尝试等待2次后仍无数据则抛出异常*/ if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)) { throw new IOException("Pipe broken"); } /*可能仍有写入线程在等待的,read方法尝试唤醒全部线程并等待,尝试2次后退出并抛出异常*/ notifyAll(); try { wait(1000); } catch (InterruptedException ex) { throw new java.io.InterruptedIOException(); } } /*若缓冲区不为空,读取数据,并更新in和out*/ int ret = buffer[out++] & 0xFF; if (out >= buffer.length) { out = 0; } if (in == out) { /* now empty */ in = -1; } return ret; }
read(byte b[], int off, int len)
跟receive的思路相同,每次都是获取可读取的下一个连续空间的字节数来尝试读取,该方法会尝试读取足够多的字节,如果缓冲区的字节数<len会全部读取并返回实际读取的字节数。不过在读取第一个字节的时候调用的read()
方法来进行一系列的判断及操作,譬如说缓冲区为空时等待并唤醒可能存在的写入线程来写入后再读取。
public synchronized int read(byte b[], int off, int len) throws IOException { /*判断传入参数的合理性*/ if (b == null) { throw new NullPointerException(); } else if (off < 0 || len < 0 || len > b.length - off) { throw new IndexOutOfBoundsException(); } else if (len == 0) { return 0; } /* possibly wait on the first character */ /*尝试调用read()读取第一个字节*/ int c = read(); if (c < 0) { return -1; } b[off] = (byte) c; int rlen = 1; /*当缓冲区不为空,并且读取的字节数仍不够时则继续读取*/ while ((in >= 0) && (len > 1)) { /*获取下一次可读取连续空间的字节数*/ int available; if (in > out) { available = Math.min((buffer.length - out), (in - out)); } else { available = buffer.length - out; } /*如果这次可读连续空间的字节数已经够了,则只读取len-1个字节*/ if (available > (len - 1)) { available = len - 1; } /*读取数据,并更新参数*/ System.arraycopy(buffer, out, b, off + rlen, available); out += available; rlen += available; len -= available; if (out >= buffer.length) { out = 0; } if (in == out) { /* now empty */ in = -1; } } return rlen; }
public void Test(){ try{ PipedInputStream pis = new PipedInputStream(); PipedOutputStream pos = new PipedOutputStream(pis); Wtr wtr = new Wtr(pos); Rdr rdr = new Rdr(pis); wtr.start(); rdr.start(); } catch (IOException e) { e.printStackTrace(); } } class Wtr extends Thread{ private PipedOutputStream writer; public Wtr(PipedOutputStream pos){ writer = pos; } @Override public void run(){ String s = "好好学习,天天向上"; byte[] buf = s.getBytes(); System.out.println("Send "+buf.length+" Bytes : "+s); try { writer.write(buf,0,buf.length); } catch (IOException e) { e.printStackTrace(); } System.out.println("Send done"); } } class Rdr extends Thread{ private PipedInputStream reader; public Rdr(PipedInputStream pis){ reader = pis; } @Override public void run(){ ByteArrayOutputStream bis = new ByteArrayOutputStream(); byte[] buf = new byte[1024]; int len = 0; try { len = reader.read(buf,0,1024); bis.write(buf,0,len); } catch (IOException e) { e.printStackTrace(); } System.out.println("read "+len+"Bytes : "+bis.toString()); } }
public void Test(){ try{ PipedReader prd = new PipedReader(); PipedWriter pwr = new PipedWriter(prd); Wtr wtr = new Wtr(pwr); Rdr rdr = new Rdr(prd); wtr.start(); rdr.start(); } catch (IOException e) { e.printStackTrace(); } } class Wtr extends Thread{ private PipedWriter writer; public Wtr(PipedWriter pwr){ writer = pwr; } @Override public void run(){ String s = "好好学习,天天向上"; char[] chr = s.toCharArray(); System.out.println("Send "+chr.length+" Bytes : "+s); try { writer.write(chr,0,chr.length); } catch (IOException e) { e.printStackTrace(); } System.out.println("Send done"); } } class Rdr extends Thread{ private PipedReader reader; public Rdr(PipedReader prd){ reader = prd; } @Override public void run(){ char[] chr = new char[1024]; int len = 0; try { len = reader.read(chr,0,1024); } catch (IOException e) { e.printStackTrace(); } System.out.println("read "+len+"Bytes : "+new String(chr)); } }