1 /*
   2  * Copyright (c) 1996, 2019, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package java.io;
  27 
  28 
  29 /**
  30  * Piped character-input streams.
  31  *
  32  * @author      Mark Reinhold
  33  * @since       1.1
  34  */
  35 
  36 public class PipedReader extends Reader {
  37     boolean closedByWriter = false;
  38     boolean closedByReader = false;
  39     boolean connected = false;
  40 
  41     /* REMIND: identification of the read and write sides needs to be
  42        more sophisticated.  Either using thread groups (but what about
  43        pipes within a thread?) or using finalization (but it may be a
  44        long time until the next GC). */
  45     Thread readSide;
  46     Thread writeSide;
  47 
  48    /**
  49     * The size of the pipe's circular input buffer.
  50     */
  51     private static final int DEFAULT_PIPE_SIZE = 1024;
  52 
  53     /**
  54      * The circular buffer into which incoming data is placed.
  55      */
  56     char buffer[];
  57 
  58     /**
  59      * The index of the position in the circular buffer at which the
  60      * next character of data will be stored when received from the connected
  61      * piped writer. <code>in&lt;0</code> implies the buffer is empty,
  62      * {@code in==out} implies the buffer is full
  63      */
  64     int in = -1;
  65 
  66     /**
  67      * The index of the position in the circular buffer at which the next
  68      * character of data will be read by this piped reader.
  69      */
  70     int out = 0;
  71 
  72     /**
  73      * Creates a {@code PipedReader} so
  74      * that it is connected to the piped writer
  75      * {@code src}. Data written to {@code src}
  76      * will then be available as input from this stream.
  77      *
  78      * @param      src   the stream to connect to.
  79      * @throws     IOException  if an I/O error occurs.
  80      */
  81     public PipedReader(PipedWriter src) throws IOException {
  82         this(src, DEFAULT_PIPE_SIZE);
  83     }
  84 
  85     /**
  86      * Creates a {@code PipedReader} so that it is connected
  87      * to the piped writer {@code src} and uses the specified
  88      * pipe size for the pipe's buffer. Data written to {@code src}
  89      * will then be  available as input from this stream.
  90 
  91      * @param      src       the stream to connect to.
  92      * @param      pipeSize  the size of the pipe's buffer.
  93      * @throws     IOException  if an I/O error occurs.
  94      * @throws     IllegalArgumentException if {@code pipeSize <= 0}.
  95      * @since      1.6
  96      */
  97     public PipedReader(PipedWriter src, int pipeSize) throws IOException {
  98         initPipe(pipeSize);
  99         connect(src);
 100     }
 101 
 102 
 103     /**
 104      * Creates a {@code PipedReader} so
 105      * that it is not yet {@linkplain #connect(java.io.PipedWriter)
 106      * connected}. It must be {@linkplain java.io.PipedWriter#connect(
 107      * java.io.PipedReader) connected} to a {@code PipedWriter}
 108      * before being used.
 109      */
 110     public PipedReader() {
 111         initPipe(DEFAULT_PIPE_SIZE);
 112     }
 113 
 114     /**
 115      * Creates a {@code PipedReader} so that it is not yet
 116      * {@link #connect(java.io.PipedWriter) connected} and uses
 117      * the specified pipe size for the pipe's buffer.
 118      * It must be  {@linkplain java.io.PipedWriter#connect(
 119      * java.io.PipedReader) connected} to a {@code PipedWriter}
 120      * before being used.
 121      *
 122      * @param   pipeSize the size of the pipe's buffer.
 123      * @throws  IllegalArgumentException if {@code pipeSize <= 0}.
 124      * @since   1.6
 125      */
 126     public PipedReader(int pipeSize) {
 127         initPipe(pipeSize);
 128     }
 129 
 130     private void initPipe(int pipeSize) {
 131         if (pipeSize <= 0) {
 132             throw new IllegalArgumentException("Pipe size <= 0");
 133         }
 134         buffer = new char[pipeSize];
 135     }
 136 
 137     /**
 138      * Causes this piped reader to be connected
 139      * to the piped  writer {@code src}.
 140      * If this object is already connected to some
 141      * other piped writer, an {@code IOException}
 142      * is thrown.
 143      * <p>
 144      * If {@code src} is an
 145      * unconnected piped writer and {@code snk}
 146      * is an unconnected piped reader, they
 147      * may be connected by either the call:
 148      *
 149      * <pre>{@code snk.connect(src)} </pre>
 150      * <p>
 151      * or the call:
 152      *
 153      * <pre>{@code src.connect(snk)} </pre>
 154      * <p>
 155      * The two calls have the same effect.
 156      *
 157      * @param      src   The piped writer to connect to.
 158      * @throws     IOException  if an I/O error occurs.
 159      */
 160     public void connect(PipedWriter src) throws IOException {
 161         src.connect(this);
 162     }
 163 
 164     /**
 165      * Receives a char of data. This method will block if no input is
 166      * available.
 167      */
 168     synchronized void receive(int c) throws IOException {
 169         if (!connected) {
 170             throw new IOException("Pipe not connected");
 171         } else if (closedByWriter || closedByReader) {
 172             throw new IOException("Pipe closed");
 173         } else if (readSide != null && !readSide.isAlive()) {
 174             throw new IOException("Read end dead");
 175         }
 176 
 177         writeSide = Thread.currentThread();
 178         while (in == out) {
 179             if ((readSide != null) && !readSide.isAlive()) {
 180                 throw new IOException("Pipe broken");
 181             }
 182             /* full: kick any waiting readers */
 183             notifyAll();
 184             try {
 185                 wait(1000);
 186             } catch (InterruptedException ex) {
 187                 throw new java.io.InterruptedIOException();
 188             }
 189         }
 190         if (in < 0) {
 191             in = 0;
 192             out = 0;
 193         }
 194         buffer[in++] = (char) c;
 195         if (in >= buffer.length) {
 196             in = 0;
 197         }
 198     }
 199 
 200     /**
 201      * Receives data into an array of characters.  This method will
 202      * block until some input is available.
 203      */
 204     synchronized void receive(char c[], int off, int len)  throws IOException {
 205         while (--len >= 0) {
 206             receive(c[off++]);
 207         }
 208     }
 209 
 210     /**
 211      * Notifies all waiting threads that the last character of data has been
 212      * received.
 213      */
 214     synchronized void receivedLast() {
 215         closedByWriter = true;
 216         notifyAll();
 217     }
 218 
 219     /**
 220      * Reads the next character of data from this piped stream.
 221      * If no character is available because the end of the stream
 222      * has been reached, the value {@code -1} is returned.
 223      * This method blocks until input data is available, the end of
 224      * the stream is detected, or an exception is thrown.
 225      *
 226      * @return  the next character of data, or {@code -1} if the end of the
 227      *          stream is reached.
 228      * @throws  IOException  if the pipe is
 229      *          <a href=PipedInputStream.html#BROKEN> {@code broken}</a>,
 230      *          {@link #connect(java.io.PipedWriter) unconnected}, closed,
 231      *          or an I/O error occurs.
 232      */
 233     public synchronized int read()  throws IOException {
 234         if (!connected) {
 235             throw new IOException("Pipe not connected");
 236         } else if (closedByReader) {
 237             throw new IOException("Pipe closed");
 238         } else if (writeSide != null && !writeSide.isAlive()
 239                    && !closedByWriter && (in < 0)) {
 240             throw new IOException("Write end dead");
 241         }
 242 
 243         readSide = Thread.currentThread();
 244         int trials = 2;
 245         while (in < 0) {
 246             if (closedByWriter) {
 247                 /* closed by writer, return EOF */
 248                 return -1;
 249             }
 250             if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)) {
 251                 throw new IOException("Pipe broken");
 252             }
 253             /* might be a writer waiting */
 254             notifyAll();
 255             try {
 256                 wait(1000);
 257             } catch (InterruptedException ex) {
 258                 throw new java.io.InterruptedIOException();
 259             }
 260         }
 261         int ret = buffer[out++];
 262         if (out >= buffer.length) {
 263             out = 0;
 264         }
 265         if (in == out) {
 266             /* now empty */
 267             in = -1;
 268         }
 269         return ret;
 270     }
 271 
 272     /**
 273      * Reads up to {@code len} characters of data from this piped
 274      * stream into an array of characters. Less than {@code len} characters
 275      * will be read if the end of the data stream is reached or if
 276      * {@code len} exceeds the pipe's buffer size. This method
 277      * blocks until at least one character of input is available.
 278      *
 279      * @param      cbuf     the buffer into which the data is read.
 280      * @param      off   the start offset of the data.
 281      * @param      len   the maximum number of characters read.
 282      * @return     the total number of characters read into the buffer, or
 283      *             {@code -1} if there is no more data because the end of
 284      *             the stream has been reached.
 285      * @throws     IOException  if the pipe is
 286      *             <a href=PipedInputStream.html#BROKEN> {@code broken}</a>,
 287      *             {@link #connect(java.io.PipedWriter) unconnected}, closed,
 288      *             or an I/O error occurs.
 289      * @throws     IndexOutOfBoundsException {@inheritDoc}
 290      */
 291     public synchronized int read(char cbuf[], int off, int len)  throws IOException {
 292         if (!connected) {
 293             throw new IOException("Pipe not connected");
 294         } else if (closedByReader) {
 295             throw new IOException("Pipe closed");
 296         } else if (writeSide != null && !writeSide.isAlive()
 297                    && !closedByWriter && (in < 0)) {
 298             throw new IOException("Write end dead");
 299         }
 300 
 301         if ((off < 0) || (off > cbuf.length) || (len < 0) ||
 302             ((off + len) > cbuf.length) || ((off + len) < 0)) {
 303             throw new IndexOutOfBoundsException();
 304         } else if (len == 0) {
 305             return 0;
 306         }
 307 
 308         /* possibly wait on the first character */
 309         int c = read();
 310         if (c < 0) {
 311             return -1;
 312         }
 313         cbuf[off] =  (char)c;
 314         int rlen = 1;
 315         while ((in >= 0) && (--len > 0)) {
 316             cbuf[off + rlen] = buffer[out++];
 317             rlen++;
 318             if (out >= buffer.length) {
 319                 out = 0;
 320             }
 321             if (in == out) {
 322                 /* now empty */
 323                 in = -1;
 324             }
 325         }
 326         return rlen;
 327     }
 328 
 329     /**
 330      * Tell whether this stream is ready to be read.  A piped character
 331      * stream is ready if the circular buffer is not empty.
 332      *
 333      * @throws     IOException  if the pipe is
 334      *             <a href=PipedInputStream.html#BROKEN> {@code broken}</a>,
 335      *             {@link #connect(java.io.PipedWriter) unconnected}, or closed.
 336      */
 337     public synchronized boolean ready() throws IOException {
 338         if (!connected) {
 339             throw new IOException("Pipe not connected");
 340         } else if (closedByReader) {
 341             throw new IOException("Pipe closed");
 342         } else if (writeSide != null && !writeSide.isAlive()
 343                    && !closedByWriter && (in < 0)) {
 344             throw new IOException("Write end dead");
 345         }
 346         if (in < 0) {
 347             return false;
 348         } else {
 349             return true;
 350         }
 351     }
 352 
 353     /**
 354      * Closes this piped stream and releases any system resources
 355      * associated with the stream.
 356      *
 357      * @throws     IOException  if an I/O error occurs.
 358      */
 359     public void close()  throws IOException {
 360         in = -1;
 361         closedByReader = true;
 362     }
 363 }