1 /*
   2  * Copyright (c) 1995, 2006, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package java.io;
  27 
  28 /**
  29  * A piped input stream should be connected
  30  * to a piped output stream; the piped  input
  31  * stream then provides whatever data bytes
  32  * are written to the piped output  stream.
  33  * Typically, data is read from a <code>PipedInputStream</code>
  34  * object by one thread  and data is written
  35  * to the corresponding <code>PipedOutputStream</code>
  36  * by some  other thread. Attempting to use
  37  * both objects from a single thread is not
  38  * recommended, as it may deadlock the thread.
  39  * The piped input stream contains a buffer,
  40  * decoupling read operations from write operations,
  41  * within limits.
  42  * A pipe is said to be <a name=BROKEN> <i>broken</i> </a> if a
  43  * thread that was providing data bytes to the connected
  44  * piped output stream is no longer alive.
  45  *
  46  * @author  James Gosling
  47  * @see     java.io.PipedOutputStream
  48  * @since   JDK1.0
  49  */
  50 public class PipedInputStream extends InputStream {
  51     boolean closedByWriter = false;
  52     volatile boolean closedByReader = false;
  53     boolean connected = false;
  54 
  55         /* REMIND: identification of the read and write sides needs to be
  56            more sophisticated.  Either using thread groups (but what about
  57            pipes within a thread?) or using finalization (but it may be a
  58            long time until the next GC). */
  59     Thread readSide;
  60     Thread writeSide;
  61 
  62     private static final int DEFAULT_PIPE_SIZE = 1024;
  63 
  64     /**
  65      * The default size of the pipe's circular input buffer.
  66      * @since   JDK1.1
  67      */
  68     // This used to be a constant before the pipe size was allowed
  69     // to change. This field will continue to be maintained
  70     // for backward compatibility.
  71     protected static final int PIPE_SIZE = DEFAULT_PIPE_SIZE;
  72 
  73     /**
  74      * The circular buffer into which incoming data is placed.
  75      * @since   JDK1.1
  76      */
  77     protected byte buffer[];
  78 
  79     /**
  80      * The index of the position in the circular buffer at which the
  81      * next byte of data will be stored when received from the connected
  82      * piped output stream. <code>in&lt;0</code> implies the buffer is empty,
  83      * <code>in==out</code> implies the buffer is full
  84      * @since   JDK1.1
  85      */
  86     protected int in = -1;
  87 
  88     /**
  89      * The index of the position in the circular buffer at which the next
  90      * byte of data will be read by this piped input stream.
  91      * @since   JDK1.1
  92      */
  93     protected int out = 0;
  94 
  95     /**
  96      * Creates a <code>PipedInputStream</code> so
  97      * that it is connected to the piped output
  98      * stream <code>src</code>. Data bytes written
  99      * to <code>src</code> will then be  available
 100      * as input from this stream.
 101      *
 102      * @param      src   the stream to connect to.
 103      * @exception  IOException  if an I/O error occurs.
 104      */
 105     public PipedInputStream(PipedOutputStream src) throws IOException {
 106         this(src, DEFAULT_PIPE_SIZE);
 107     }
 108 
 109     /**
 110      * Creates a <code>PipedInputStream</code> so that it is
 111      * connected to the piped output stream
 112      * <code>src</code> and uses the specified pipe size for
 113      * the pipe's buffer.
 114      * Data bytes written to <code>src</code> will then
 115      * be available as input from this stream.
 116      *
 117      * @param      src   the stream to connect to.
 118      * @param      pipeSize the size of the pipe's buffer.
 119      * @exception  IOException  if an I/O error occurs.
 120      * @exception  IllegalArgumentException if <code>pipeSize <= 0</code>.
 121      * @since      1.6
 122      */
 123     public PipedInputStream(PipedOutputStream src, int pipeSize)
 124             throws IOException {
 125          initPipe(pipeSize);
 126          connect(src);
 127     }
 128 
 129     /**
 130      * Creates a <code>PipedInputStream</code> so
 131      * that it is not yet {@linkplain #connect(java.io.PipedOutputStream)
 132      * connected}.
 133      * It must be {@linkplain java.io.PipedOutputStream#connect(
 134      * java.io.PipedInputStream) connected} to a
 135      * <code>PipedOutputStream</code> before being used.
 136      */
 137     public PipedInputStream() {
 138         initPipe(DEFAULT_PIPE_SIZE);
 139     }
 140 
 141     /**
 142      * Creates a <code>PipedInputStream</code> so that it is not yet
 143      * {@linkplain #connect(java.io.PipedOutputStream) connected} and
 144      * uses the specified pipe size for the pipe's buffer.
 145      * It must be {@linkplain java.io.PipedOutputStream#connect(
 146      * java.io.PipedInputStream)
 147      * connected} to a <code>PipedOutputStream</code> before being used.
 148      *
 149      * @param      pipeSize the size of the pipe's buffer.
 150      * @exception  IllegalArgumentException if <code>pipeSize <= 0</code>.
 151      * @since      1.6
 152      */
 153     public PipedInputStream(int pipeSize) {
 154         initPipe(pipeSize);
 155     }
 156 
 157     private void initPipe(int pipeSize) {
 158          if (pipeSize <= 0) {
 159             throw new IllegalArgumentException("Pipe Size <= 0");
 160          }
 161          buffer = new byte[pipeSize];
 162     }
 163 
 164     /**
 165      * Causes this piped input stream to be connected
 166      * to the piped  output stream <code>src</code>.
 167      * If this object is already connected to some
 168      * other piped output  stream, an <code>IOException</code>
 169      * is thrown.
 170      * <p>
 171      * If <code>src</code> is an
 172      * unconnected piped output stream and <code>snk</code>
 173      * is an unconnected piped input stream, they
 174      * may be connected by either the call:
 175      * <p>
 176      * <pre><code>snk.connect(src)</code> </pre>
 177      * <p>
 178      * or the call:
 179      * <p>
 180      * <pre><code>src.connect(snk)</code> </pre>
 181      * <p>
 182      * The two
 183      * calls have the same effect.
 184      *
 185      * @param      src   The piped output stream to connect to.
 186      * @exception  IOException  if an I/O error occurs.
 187      */
 188     public void connect(PipedOutputStream src) throws IOException {
 189         src.connect(this);
 190     }
 191 
 192     /**
 193      * Receives a byte of data.  This method will block if no input is
 194      * available.
 195      * @param b the byte being received
 196      * @exception IOException If the pipe is <a href=#BROKEN> <code>broken</code></a>,
 197      *          {@link #connect(java.io.PipedOutputStream) unconnected},
 198      *          closed, or if an I/O error occurs.
 199      * @since     JDK1.1
 200      */
 201     protected synchronized void receive(int b) throws IOException {
 202         checkStateForReceive();
 203         writeSide = Thread.currentThread();
 204         if (in == out)
 205             awaitSpace();
 206         if (in < 0) {
 207             in = 0;
 208             out = 0;
 209         }
 210         buffer[in++] = (byte)(b & 0xFF);
 211         if (in >= buffer.length) {
 212             in = 0;
 213         }
 214     }
 215 
 216     /**
 217      * Receives data into an array of bytes.  This method will
 218      * block until some input is available.
 219      * @param b the buffer into which the data is received
 220      * @param off the start offset of the data
 221      * @param len the maximum number of bytes received
 222      * @exception IOException If the pipe is <a href=#BROKEN> broken</a>,
 223      *           {@link #connect(java.io.PipedOutputStream) unconnected},
 224      *           closed,or if an I/O error occurs.
 225      */
 226     synchronized void receive(byte b[], int off, int len)  throws IOException {
 227         checkStateForReceive();
 228         writeSide = Thread.currentThread();
 229         int bytesToTransfer = len;
 230         while (bytesToTransfer > 0) {
 231             if (in == out)
 232                 awaitSpace();
 233             int nextTransferAmount = 0;
 234             if (out < in) {
 235                 nextTransferAmount = buffer.length - in;
 236             } else if (in < out) {
 237                 if (in == -1) {
 238                     in = out = 0;
 239                     nextTransferAmount = buffer.length - in;
 240                 } else {
 241                     nextTransferAmount = out - in;
 242                 }
 243             }
 244             if (nextTransferAmount > bytesToTransfer)
 245                 nextTransferAmount = bytesToTransfer;
 246             assert(nextTransferAmount > 0);
 247             System.arraycopy(b, off, buffer, in, nextTransferAmount);
 248             bytesToTransfer -= nextTransferAmount;
 249             off += nextTransferAmount;
 250             in += nextTransferAmount;
 251             if (in >= buffer.length) {
 252                 in = 0;
 253             }
 254         }
 255     }
 256 
 257     private void checkStateForReceive() throws IOException {
 258         if (!connected) {
 259             throw new IOException("Pipe not connected");
 260         } else if (closedByWriter || closedByReader) {
 261             throw new IOException("Pipe closed");
 262         } else if (readSide != null && !readSide.isAlive()) {
 263             throw new IOException("Read end dead");
 264         }
 265     }
 266 
 267     private void awaitSpace() throws IOException {
 268         while (in == out) {
 269             checkStateForReceive();
 270 
 271             /* full: kick any waiting readers */
 272             notifyAll();
 273             try {
 274                 wait(1000);
 275             } catch (InterruptedException ex) {
 276                 throw new java.io.InterruptedIOException();
 277             }
 278         }
 279     }
 280 
 281     /**
 282      * Notifies all waiting threads that the last byte of data has been
 283      * received.
 284      */
 285     synchronized void receivedLast() {
 286         closedByWriter = true;
 287         notifyAll();
 288     }
 289 
 290     /**
 291      * Reads the next byte of data from this piped input stream. The
 292      * value byte is returned as an <code>int</code> in the range
 293      * <code>0</code> to <code>255</code>.
 294      * This method blocks until input data is available, the end of the
 295      * stream is detected, or an exception is thrown.
 296      *
 297      * @return     the next byte of data, or <code>-1</code> if the end of the
 298      *             stream is reached.
 299      * @exception  IOException  if the pipe is
 300      *           {@link #connect(java.io.PipedOutputStream) unconnected},
 301      *           <a href=#BROKEN> <code>broken</code></a>, closed,
 302      *           or if an I/O error occurs.
 303      */
 304     public synchronized int read()  throws IOException {
 305         if (!connected) {
 306             throw new IOException("Pipe not connected");
 307         } else if (closedByReader) {
 308             throw new IOException("Pipe closed");
 309         } else if (writeSide != null && !writeSide.isAlive()
 310                    && !closedByWriter && (in < 0)) {
 311             throw new IOException("Write end dead");
 312         }
 313 
 314         readSide = Thread.currentThread();
 315         int trials = 2;
 316         while (in < 0) {
 317             if (closedByWriter) {
 318                 /* closed by writer, return EOF */
 319                 return -1;
 320             }
 321             if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)) {
 322                 throw new IOException("Pipe broken");
 323             }
 324             /* might be a writer waiting */
 325             notifyAll();
 326             try {
 327                 wait(1000);
 328             } catch (InterruptedException ex) {
 329                 throw new java.io.InterruptedIOException();
 330             }
 331         }
 332         int ret = buffer[out++] & 0xFF;
 333         if (out >= buffer.length) {
 334             out = 0;
 335         }
 336         if (in == out) {
 337             /* now empty */
 338             in = -1;
 339         }
 340 
 341         return ret;
 342     }
 343 
 344     /**
 345      * Reads up to <code>len</code> bytes of data from this piped input
 346      * stream into an array of bytes. Less than <code>len</code> bytes
 347      * will be read if the end of the data stream is reached or if
 348      * <code>len</code> exceeds the pipe's buffer size.
 349      * If <code>len </code> is zero, then no bytes are read and 0 is returned;
 350      * otherwise, the method blocks until at least 1 byte of input is
 351      * available, end of the stream has been detected, or an exception is
 352      * thrown.
 353      *
 354      * @param      b     the buffer into which the data is read.
 355      * @param      off   the start offset in the destination array <code>b</code>
 356      * @param      len   the maximum number of bytes read.
 357      * @return     the total number of bytes read into the buffer, or
 358      *             <code>-1</code> if there is no more data because the end of
 359      *             the stream has been reached.
 360      * @exception  NullPointerException If <code>b</code> is <code>null</code>.
 361      * @exception  IndexOutOfBoundsException If <code>off</code> is negative,
 362      * <code>len</code> is negative, or <code>len</code> is greater than
 363      * <code>b.length - off</code>
 364      * @exception  IOException if the pipe is <a href=#BROKEN> <code>broken</code></a>,
 365      *           {@link #connect(java.io.PipedOutputStream) unconnected},
 366      *           closed, or if an I/O error occurs.
 367      */
 368     public synchronized int read(byte b[], int off, int len)  throws IOException {
 369         if (b == null) {
 370             throw new NullPointerException();
 371         } else if (off < 0 || len < 0 || len > b.length - off) {
 372             throw new IndexOutOfBoundsException();
 373         } else if (len == 0) {
 374             return 0;
 375         }
 376 
 377         /* possibly wait on the first character */
 378         int c = read();
 379         if (c < 0) {
 380             return -1;
 381         }
 382         b[off] = (byte) c;
 383         int rlen = 1;
 384         while ((in >= 0) && (len > 1)) {
 385 
 386             int available;
 387 
 388             if (in > out) {
 389                 available = Math.min((buffer.length - out), (in - out));
 390             } else {
 391                 available = buffer.length - out;
 392             }
 393 
 394             // A byte is read beforehand outside the loop
 395             if (available > (len - 1)) {
 396                 available = len - 1;
 397             }
 398             System.arraycopy(buffer, out, b, off + rlen, available);
 399             out += available;
 400             rlen += available;
 401             len -= available;
 402 
 403             if (out >= buffer.length) {
 404                 out = 0;
 405             }
 406             if (in == out) {
 407                 /* now empty */
 408                 in = -1;
 409             }
 410         }
 411         return rlen;
 412     }
 413 
 414     /**
 415      * Returns the number of bytes that can be read from this input
 416      * stream without blocking.
 417      *
 418      * @return the number of bytes that can be read from this input stream
 419      *         without blocking, or {@code 0} if this input stream has been
 420      *         closed by invoking its {@link #close()} method, or if the pipe
 421      *         is {@link #connect(java.io.PipedOutputStream) unconnected}, or
 422      *          <a href=#BROKEN> <code>broken</code></a>.
 423      *
 424      * @exception  IOException  if an I/O error occurs.
 425      * @since   JDK1.0.2
 426      */
 427     public synchronized int available() throws IOException {
 428         if(in < 0)
 429             return 0;
 430         else if(in == out)
 431             return buffer.length;
 432         else if (in > out)
 433             return in - out;
 434         else
 435             return in + buffer.length - out;
 436     }
 437 
 438     /**
 439      * Closes this piped input stream and releases any system resources
 440      * associated with the stream.
 441      *
 442      * @exception  IOException  if an I/O error occurs.
 443      */
 444     public void close()  throws IOException {
 445         closedByReader = true;
 446         synchronized (this) {
 447             in = -1;
 448         }
 449     }
 450 }