1 /*
   2  * Copyright (c) 1995, 2013, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package java.io;
  27 
  28 /**
  29  * A piped input stream should be connected
  30  * to a piped output stream; the piped  input
  31  * stream then provides whatever data bytes
  32  * are written to the piped output  stream.
  33  * Typically, data is read from a <code>PipedInputStream</code>
  34  * object by one thread  and data is written
  35  * to the corresponding <code>PipedOutputStream</code>
  36  * by some  other thread. Attempting to use
  37  * both objects from a single thread is not
  38  * recommended, as it may deadlock the thread.
  39  * The piped input stream contains a buffer,
  40  * decoupling read operations from write operations,
  41  * within limits.
  42  * A pipe is said to be <a id="BROKEN"> <i>broken</i> </a> if a
  43  * thread that was providing data bytes to the connected
  44  * piped output stream is no longer alive.
  45  *
  46  * @author  James Gosling
  47  * @see     java.io.PipedOutputStream
  48  * @since   1.0
  49  */
  50 public class PipedInputStream extends InputStream {
  51     boolean closedByWriter;
  52     volatile boolean closedByReader;
  53     boolean connected;
  54 
  55         /* REMIND: identification of the read and write sides needs to be
  56            more sophisticated.  Either using thread groups (but what about
  57            pipes within a thread?) or using finalization (but it may be a
  58            long time until the next GC). */
  59     Thread readSide;
  60     Thread writeSide;
  61 
  62     private static final int DEFAULT_PIPE_SIZE = 1024;
  63 
  64     /**
  65      * The default size of the pipe's circular input buffer.
  66      * @since   1.1
  67      */
  68     // This used to be a constant before the pipe size was allowed
  69     // to change. This field will continue to be maintained
  70     // for backward compatibility.
  71     protected static final int PIPE_SIZE = DEFAULT_PIPE_SIZE;
  72 
  73     /**
  74      * The circular buffer into which incoming data is placed.
  75      * @since   1.1
  76      */
  77     protected byte buffer[];
  78 
  79     /**
  80      * The index of the position in the circular buffer at which the
  81      * next byte of data will be stored when received from the connected
  82      * piped output stream. <code>in&lt;0</code> implies the buffer is empty,
  83      * <code>in==out</code> implies the buffer is full
  84      * @since   1.1
  85      */
  86     protected int in = -1;
  87 
  88     /**
  89      * The index of the position in the circular buffer at which the next
  90      * byte of data will be read by this piped input stream.
  91      * @since   1.1
  92      */
  93     protected int out = 0;
  94 
  95     /**
  96      * Creates a <code>PipedInputStream</code> so
  97      * that it is connected to the piped output
  98      * stream <code>src</code>. Data bytes written
  99      * to <code>src</code> will then be  available
 100      * as input from this stream.
 101      *
 102      * @param      src   the stream to connect to.
 103      * @exception  IOException  if an I/O error occurs.
 104      */
 105     public PipedInputStream(PipedOutputStream src) throws IOException {
 106         this(src, DEFAULT_PIPE_SIZE);
 107     }
 108 
 109     /**
 110      * Creates a <code>PipedInputStream</code> so that it is
 111      * connected to the piped output stream
 112      * <code>src</code> and uses the specified pipe size for
 113      * the pipe's buffer.
 114      * Data bytes written to <code>src</code> will then
 115      * be available as input from this stream.
 116      *
 117      * @param      src   the stream to connect to.
 118      * @param      pipeSize the size of the pipe's buffer.
 119      * @exception  IOException  if an I/O error occurs.
 120      * @exception  IllegalArgumentException if {@code pipeSize <= 0}.
 121      * @since      1.6
 122      */
 123     public PipedInputStream(PipedOutputStream src, int pipeSize)
 124             throws IOException {
 125          initPipe(pipeSize);
 126          connect(src);
 127     }
 128 
 129     /**
 130      * Creates a <code>PipedInputStream</code> so
 131      * that it is not yet {@linkplain #connect(java.io.PipedOutputStream)
 132      * connected}.
 133      * It must be {@linkplain java.io.PipedOutputStream#connect(
 134      * java.io.PipedInputStream) connected} to a
 135      * <code>PipedOutputStream</code> before being used.
 136      */
 137     public PipedInputStream() {
 138         initPipe(DEFAULT_PIPE_SIZE);
 139     }
 140 
 141     /**
 142      * Creates a <code>PipedInputStream</code> so that it is not yet
 143      * {@linkplain #connect(java.io.PipedOutputStream) connected} and
 144      * uses the specified pipe size for the pipe's buffer.
 145      * It must be {@linkplain java.io.PipedOutputStream#connect(
 146      * java.io.PipedInputStream)
 147      * connected} to a <code>PipedOutputStream</code> before being used.
 148      *
 149      * @param      pipeSize the size of the pipe's buffer.
 150      * @exception  IllegalArgumentException if {@code pipeSize <= 0}.
 151      * @since      1.6
 152      */
 153     public PipedInputStream(int pipeSize) {
 154         initPipe(pipeSize);
 155     }
 156 
 157     private void initPipe(int pipeSize) {
 158          if (pipeSize <= 0) {
 159             throw new IllegalArgumentException("Pipe Size <= 0");
 160          }
 161          buffer = new byte[pipeSize];
 162     }
 163 
 164     /**
 165      * Causes this piped input stream to be connected
 166      * to the piped  output stream <code>src</code>.
 167      * If this object is already connected to some
 168      * other piped output  stream, an <code>IOException</code>
 169      * is thrown.
 170      * <p>
 171      * If <code>src</code> is an
 172      * unconnected piped output stream and <code>snk</code>
 173      * is an unconnected piped input stream, they
 174      * may be connected by either the call:
 175      *
 176      * <pre><code>snk.connect(src)</code> </pre>
 177      * <p>
 178      * or the call:
 179      *
 180      * <pre><code>src.connect(snk)</code> </pre>
 181      * <p>
 182      * The two calls have the same effect.
 183      *
 184      * @param      src   The piped output stream to connect to.
 185      * @exception  IOException  if an I/O error occurs.
 186      */
 187     public void connect(PipedOutputStream src) throws IOException {
 188         src.connect(this);
 189     }
 190 
 191     /**
 192      * Receives a byte of data.  This method will block if no input is
 193      * available.
 194      * @param b the byte being received
 195      * @exception IOException If the pipe is <a href="#BROKEN"> <code>broken</code></a>,
 196      *          {@link #connect(java.io.PipedOutputStream) unconnected},
 197      *          closed, or if an I/O error occurs.
 198      * @since     1.1
 199      */
 200     protected synchronized void receive(int b) throws IOException {
 201         checkStateForReceive();
 202         writeSide = Thread.currentThread();
 203         if (in == out)
 204             awaitSpace();
 205         if (in < 0) {
 206             in = 0;
 207             out = 0;
 208         }
 209         buffer[in++] = (byte)(b & 0xFF);
 210         if (in >= buffer.length) {
 211             in = 0;
 212         }
 213     }
 214 
 215     /**
 216      * Receives data into an array of bytes.  This method will
 217      * block until some input is available.
 218      * @param b the buffer into which the data is received
 219      * @param off the start offset of the data
 220      * @param len the maximum number of bytes received
 221      * @exception IOException If the pipe is <a href="#BROKEN"> broken</a>,
 222      *           {@link #connect(java.io.PipedOutputStream) unconnected},
 223      *           closed,or if an I/O error occurs.
 224      */
 225     synchronized void receive(byte b[], int off, int len)  throws IOException {
 226         checkStateForReceive();
 227         writeSide = Thread.currentThread();
 228         int bytesToTransfer = len;
 229         while (bytesToTransfer > 0) {
 230             if (in == out)
 231                 awaitSpace();
 232             int nextTransferAmount = 0;
 233             if (out < in) {
 234                 nextTransferAmount = buffer.length - in;
 235             } else if (in < out) {
 236                 if (in == -1) {
 237                     in = out = 0;
 238                     nextTransferAmount = buffer.length - in;
 239                 } else {
 240                     nextTransferAmount = out - in;
 241                 }
 242             }
 243             if (nextTransferAmount > bytesToTransfer)
 244                 nextTransferAmount = bytesToTransfer;
 245             assert(nextTransferAmount > 0);
 246             System.arraycopy(b, off, buffer, in, nextTransferAmount);
 247             bytesToTransfer -= nextTransferAmount;
 248             off += nextTransferAmount;
 249             in += nextTransferAmount;
 250             if (in >= buffer.length) {
 251                 in = 0;
 252             }
 253         }
 254     }
 255 
 256     private void checkStateForReceive() throws IOException {
 257         if (!connected) {
 258             throw new IOException("Pipe not connected");
 259         } else if (closedByWriter || closedByReader) {
 260             throw new IOException("Pipe closed");
 261         } else if (readSide != null && !readSide.isAlive()) {
 262             throw new IOException("Read end dead");
 263         }
 264     }
 265 
 266     private void awaitSpace() throws IOException {
 267         while (in == out) {
 268             checkStateForReceive();
 269 
 270             /* full: kick any waiting readers */
 271             notifyAll();
 272             try {
 273                 wait(1000);
 274             } catch (InterruptedException ex) {
 275                 throw new java.io.InterruptedIOException();
 276             }
 277         }
 278     }
 279 
 280     /**
 281      * Notifies all waiting threads that the last byte of data has been
 282      * received.
 283      */
 284     synchronized void receivedLast() {
 285         closedByWriter = true;
 286         notifyAll();
 287     }
 288 
 289     /**
 290      * Reads the next byte of data from this piped input stream. The
 291      * value byte is returned as an <code>int</code> in the range
 292      * <code>0</code> to <code>255</code>.
 293      * This method blocks until input data is available, the end of the
 294      * stream is detected, or an exception is thrown.
 295      *
 296      * @return     the next byte of data, or <code>-1</code> if the end of the
 297      *             stream is reached.
 298      * @exception  IOException  if the pipe is
 299      *           {@link #connect(java.io.PipedOutputStream) unconnected},
 300      *           <a href="#BROKEN"> <code>broken</code></a>, closed,
 301      *           or if an I/O error occurs.
 302      */
 303     public synchronized int read()  throws IOException {
 304         if (!connected) {
 305             throw new IOException("Pipe not connected");
 306         } else if (closedByReader) {
 307             throw new IOException("Pipe closed");
 308         } else if (writeSide != null && !writeSide.isAlive()
 309                    && !closedByWriter && (in < 0)) {
 310             throw new IOException("Write end dead");
 311         }
 312 
 313         readSide = Thread.currentThread();
 314         int trials = 2;
 315         while (in < 0) {
 316             if (closedByWriter) {
 317                 /* closed by writer, return EOF */
 318                 return -1;
 319             }
 320             if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)) {
 321                 throw new IOException("Pipe broken");
 322             }
 323             /* might be a writer waiting */
 324             notifyAll();
 325             try {
 326                 wait(1000);
 327             } catch (InterruptedException ex) {
 328                 throw new java.io.InterruptedIOException();
 329             }
 330         }
 331         int ret = buffer[out++] & 0xFF;
 332         if (out >= buffer.length) {
 333             out = 0;
 334         }
 335         if (in == out) {
 336             /* now empty */
 337             in = -1;
 338         }
 339 
 340         return ret;
 341     }
 342 
 343     /**
 344      * Reads up to <code>len</code> bytes of data from this piped input
 345      * stream into an array of bytes. Less than <code>len</code> bytes
 346      * will be read if the end of the data stream is reached or if
 347      * <code>len</code> exceeds the pipe's buffer size.
 348      * If <code>len </code> is zero, then no bytes are read and 0 is returned;
 349      * otherwise, the method blocks until at least 1 byte of input is
 350      * available, end of the stream has been detected, or an exception is
 351      * thrown.
 352      *
 353      * @param      b     the buffer into which the data is read.
 354      * @param      off   the start offset in the destination array <code>b</code>
 355      * @param      len   the maximum number of bytes read.
 356      * @return     the total number of bytes read into the buffer, or
 357      *             <code>-1</code> if there is no more data because the end of
 358      *             the stream has been reached.
 359      * @exception  NullPointerException If <code>b</code> is <code>null</code>.
 360      * @exception  IndexOutOfBoundsException If <code>off</code> is negative,
 361      * <code>len</code> is negative, or <code>len</code> is greater than
 362      * <code>b.length - off</code>
 363      * @exception  IOException if the pipe is <a href="#BROKEN"> <code>broken</code></a>,
 364      *           {@link #connect(java.io.PipedOutputStream) unconnected},
 365      *           closed, or if an I/O error occurs.
 366      */
 367     public synchronized int read(byte b[], int off, int len)  throws IOException {
 368         if (b == null) {
 369             throw new NullPointerException();
 370         } else if (off < 0 || len < 0 || len > b.length - off) {
 371             throw new IndexOutOfBoundsException();
 372         } else if (len == 0) {
 373             return 0;
 374         }
 375 
 376         /* possibly wait on the first character */
 377         int c = read();
 378         if (c < 0) {
 379             return -1;
 380         }
 381         b[off] = (byte) c;
 382         int rlen = 1;
 383         while ((in >= 0) && (len > 1)) {
 384 
 385             int available;
 386 
 387             if (in > out) {
 388                 available = Math.min((buffer.length - out), (in - out));
 389             } else {
 390                 available = buffer.length - out;
 391             }
 392 
 393             // A byte is read beforehand outside the loop
 394             if (available > (len - 1)) {
 395                 available = len - 1;
 396             }
 397             System.arraycopy(buffer, out, b, off + rlen, available);
 398             out += available;
 399             rlen += available;
 400             len -= available;
 401 
 402             if (out >= buffer.length) {
 403                 out = 0;
 404             }
 405             if (in == out) {
 406                 /* now empty */
 407                 in = -1;
 408             }
 409         }
 410         return rlen;
 411     }
 412 
 413     /**
 414      * Returns the number of bytes that can be read from this input
 415      * stream without blocking.
 416      *
 417      * @return the number of bytes that can be read from this input stream
 418      *         without blocking, or {@code 0} if this input stream has been
 419      *         closed by invoking its {@link #close()} method, or if the pipe
 420      *         is {@link #connect(java.io.PipedOutputStream) unconnected}, or
 421      *          <a href="#BROKEN"> <code>broken</code></a>.
 422      *
 423      * @exception  IOException  if an I/O error occurs.
 424      * @since   1.0.2
 425      */
 426     public synchronized int available() throws IOException {
 427         if(in < 0)
 428             return 0;
 429         else if(in == out)
 430             return buffer.length;
 431         else if (in > out)
 432             return in - out;
 433         else
 434             return in + buffer.length - out;
 435     }
 436 
 437     /**
 438      * Closes this piped input stream and releases any system resources
 439      * associated with the stream.
 440      *
 441      * @exception  IOException  if an I/O error occurs.
 442      */
 443     public void close()  throws IOException {
 444         closedByReader = true;
 445         synchronized (this) {
 446             in = -1;
 447         }
 448     }
 449 }