1 /* 2 * Copyright (c) 1995, 2013, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package java.io; 27 28 /** 29 * A piped input stream should be connected 30 * to a piped output stream; the piped input 31 * stream then provides whatever data bytes 32 * are written to the piped output stream. 33 * Typically, data is read from a <code>PipedInputStream</code> 34 * object by one thread and data is written 35 * to the corresponding <code>PipedOutputStream</code> 36 * by some other thread. Attempting to use 37 * both objects from a single thread is not 38 * recommended, as it may deadlock the thread. 39 * The piped input stream contains a buffer, 40 * decoupling read operations from write operations, 41 * within limits. 42 * A pipe is said to be <a id="BROKEN"> <i>broken</i> </a> if a 43 * thread that was providing data bytes to the connected 44 * piped output stream is no longer alive. 45 * 46 * @author James Gosling 47 * @see java.io.PipedOutputStream 48 * @since 1.0 49 */ 50 public class PipedInputStream extends InputStream { 51 boolean closedByWriter; 52 volatile boolean closedByReader; 53 boolean connected; 54 55 /* REMIND: identification of the read and write sides needs to be 56 more sophisticated. Either using thread groups (but what about 57 pipes within a thread?) or using finalization (but it may be a 58 long time until the next GC). */ 59 Thread readSide; 60 Thread writeSide; 61 62 private static final int DEFAULT_PIPE_SIZE = 1024; 63 64 /** 65 * The default size of the pipe's circular input buffer. 66 * @since 1.1 67 */ 68 // This used to be a constant before the pipe size was allowed 69 // to change. This field will continue to be maintained 70 // for backward compatibility. 71 protected static final int PIPE_SIZE = DEFAULT_PIPE_SIZE; 72 73 /** 74 * The circular buffer into which incoming data is placed. 75 * @since 1.1 76 */ 77 protected byte buffer[]; 78 79 /** 80 * The index of the position in the circular buffer at which the 81 * next byte of data will be stored when received from the connected 82 * piped output stream. <code>in<0</code> implies the buffer is empty, 83 * <code>in==out</code> implies the buffer is full 84 * @since 1.1 85 */ 86 protected int in = -1; 87 88 /** 89 * The index of the position in the circular buffer at which the next 90 * byte of data will be read by this piped input stream. 91 * @since 1.1 92 */ 93 protected int out = 0; 94 95 /** 96 * Creates a <code>PipedInputStream</code> so 97 * that it is connected to the piped output 98 * stream <code>src</code>. Data bytes written 99 * to <code>src</code> will then be available 100 * as input from this stream. 101 * 102 * @param src the stream to connect to. 103 * @exception IOException if an I/O error occurs. 104 */ 105 public PipedInputStream(PipedOutputStream src) throws IOException { 106 this(src, DEFAULT_PIPE_SIZE); 107 } 108 109 /** 110 * Creates a <code>PipedInputStream</code> so that it is 111 * connected to the piped output stream 112 * <code>src</code> and uses the specified pipe size for 113 * the pipe's buffer. 114 * Data bytes written to <code>src</code> will then 115 * be available as input from this stream. 116 * 117 * @param src the stream to connect to. 118 * @param pipeSize the size of the pipe's buffer. 119 * @exception IOException if an I/O error occurs. 120 * @exception IllegalArgumentException if {@code pipeSize <= 0}. 121 * @since 1.6 122 */ 123 public PipedInputStream(PipedOutputStream src, int pipeSize) 124 throws IOException { 125 initPipe(pipeSize); 126 connect(src); 127 } 128 129 /** 130 * Creates a <code>PipedInputStream</code> so 131 * that it is not yet {@linkplain #connect(java.io.PipedOutputStream) 132 * connected}. 133 * It must be {@linkplain java.io.PipedOutputStream#connect( 134 * java.io.PipedInputStream) connected} to a 135 * <code>PipedOutputStream</code> before being used. 136 */ 137 public PipedInputStream() { 138 initPipe(DEFAULT_PIPE_SIZE); 139 } 140 141 /** 142 * Creates a <code>PipedInputStream</code> so that it is not yet 143 * {@linkplain #connect(java.io.PipedOutputStream) connected} and 144 * uses the specified pipe size for the pipe's buffer. 145 * It must be {@linkplain java.io.PipedOutputStream#connect( 146 * java.io.PipedInputStream) 147 * connected} to a <code>PipedOutputStream</code> before being used. 148 * 149 * @param pipeSize the size of the pipe's buffer. 150 * @exception IllegalArgumentException if {@code pipeSize <= 0}. 151 * @since 1.6 152 */ 153 public PipedInputStream(int pipeSize) { 154 initPipe(pipeSize); 155 } 156 157 private void initPipe(int pipeSize) { 158 if (pipeSize <= 0) { 159 throw new IllegalArgumentException("Pipe Size <= 0"); 160 } 161 buffer = new byte[pipeSize]; 162 } 163 164 /** 165 * Causes this piped input stream to be connected 166 * to the piped output stream <code>src</code>. 167 * If this object is already connected to some 168 * other piped output stream, an <code>IOException</code> 169 * is thrown. 170 * <p> 171 * If <code>src</code> is an 172 * unconnected piped output stream and <code>snk</code> 173 * is an unconnected piped input stream, they 174 * may be connected by either the call: 175 * 176 * <pre><code>snk.connect(src)</code> </pre> 177 * <p> 178 * or the call: 179 * 180 * <pre><code>src.connect(snk)</code> </pre> 181 * <p> 182 * The two calls have the same effect. 183 * 184 * @param src The piped output stream to connect to. 185 * @exception IOException if an I/O error occurs. 186 */ 187 public void connect(PipedOutputStream src) throws IOException { 188 src.connect(this); 189 } 190 191 /** 192 * Receives a byte of data. This method will block if no input is 193 * available. 194 * @param b the byte being received 195 * @exception IOException If the pipe is <a href="#BROKEN"> <code>broken</code></a>, 196 * {@link #connect(java.io.PipedOutputStream) unconnected}, 197 * closed, or if an I/O error occurs. 198 * @since 1.1 199 */ 200 protected synchronized void receive(int b) throws IOException { 201 checkStateForReceive(); 202 writeSide = Thread.currentThread(); 203 if (in == out) 204 awaitSpace(); 205 if (in < 0) { 206 in = 0; 207 out = 0; 208 } 209 buffer[in++] = (byte)(b & 0xFF); 210 if (in >= buffer.length) { 211 in = 0; 212 } 213 } 214 215 /** 216 * Receives data into an array of bytes. This method will 217 * block until some input is available. 218 * @param b the buffer into which the data is received 219 * @param off the start offset of the data 220 * @param len the maximum number of bytes received 221 * @exception IOException If the pipe is <a href="#BROKEN"> broken</a>, 222 * {@link #connect(java.io.PipedOutputStream) unconnected}, 223 * closed,or if an I/O error occurs. 224 */ 225 synchronized void receive(byte b[], int off, int len) throws IOException { 226 checkStateForReceive(); 227 writeSide = Thread.currentThread(); 228 int bytesToTransfer = len; 229 while (bytesToTransfer > 0) { 230 if (in == out) 231 awaitSpace(); 232 int nextTransferAmount = 0; 233 if (out < in) { 234 nextTransferAmount = buffer.length - in; 235 } else if (in < out) { 236 if (in == -1) { 237 in = out = 0; 238 nextTransferAmount = buffer.length - in; 239 } else { 240 nextTransferAmount = out - in; 241 } 242 } 243 if (nextTransferAmount > bytesToTransfer) 244 nextTransferAmount = bytesToTransfer; 245 assert(nextTransferAmount > 0); 246 System.arraycopy(b, off, buffer, in, nextTransferAmount); 247 bytesToTransfer -= nextTransferAmount; 248 off += nextTransferAmount; 249 in += nextTransferAmount; 250 if (in >= buffer.length) { 251 in = 0; 252 } 253 } 254 } 255 256 private void checkStateForReceive() throws IOException { 257 if (!connected) { 258 throw new IOException("Pipe not connected"); 259 } else if (closedByWriter || closedByReader) { 260 throw new IOException("Pipe closed"); 261 } else if (readSide != null && !readSide.isAlive()) { 262 throw new IOException("Read end dead"); 263 } 264 } 265 266 private void awaitSpace() throws IOException { 267 while (in == out) { 268 checkStateForReceive(); 269 270 /* full: kick any waiting readers */ 271 notifyAll(); 272 try { 273 wait(1000); 274 } catch (InterruptedException ex) { 275 throw new java.io.InterruptedIOException(); 276 } 277 } 278 } 279 280 /** 281 * Notifies all waiting threads that the last byte of data has been 282 * received. 283 */ 284 synchronized void receivedLast() { 285 closedByWriter = true; 286 notifyAll(); 287 } 288 289 /** 290 * Reads the next byte of data from this piped input stream. The 291 * value byte is returned as an <code>int</code> in the range 292 * <code>0</code> to <code>255</code>. 293 * This method blocks until input data is available, the end of the 294 * stream is detected, or an exception is thrown. 295 * 296 * @return the next byte of data, or <code>-1</code> if the end of the 297 * stream is reached. 298 * @exception IOException if the pipe is 299 * {@link #connect(java.io.PipedOutputStream) unconnected}, 300 * <a href="#BROKEN"> <code>broken</code></a>, closed, 301 * or if an I/O error occurs. 302 */ 303 public synchronized int read() throws IOException { 304 if (!connected) { 305 throw new IOException("Pipe not connected"); 306 } else if (closedByReader) { 307 throw new IOException("Pipe closed"); 308 } else if (writeSide != null && !writeSide.isAlive() 309 && !closedByWriter && (in < 0)) { 310 throw new IOException("Write end dead"); 311 } 312 313 readSide = Thread.currentThread(); 314 int trials = 2; 315 while (in < 0) { 316 if (closedByWriter) { 317 /* closed by writer, return EOF */ 318 return -1; 319 } 320 if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)) { 321 throw new IOException("Pipe broken"); 322 } 323 /* might be a writer waiting */ 324 notifyAll(); 325 try { 326 wait(1000); 327 } catch (InterruptedException ex) { 328 throw new java.io.InterruptedIOException(); 329 } 330 } 331 int ret = buffer[out++] & 0xFF; 332 if (out >= buffer.length) { 333 out = 0; 334 } 335 if (in == out) { 336 /* now empty */ 337 in = -1; 338 } 339 340 return ret; 341 } 342 343 /** 344 * Reads up to <code>len</code> bytes of data from this piped input 345 * stream into an array of bytes. Less than <code>len</code> bytes 346 * will be read if the end of the data stream is reached or if 347 * <code>len</code> exceeds the pipe's buffer size. 348 * If <code>len </code> is zero, then no bytes are read and 0 is returned; 349 * otherwise, the method blocks until at least 1 byte of input is 350 * available, end of the stream has been detected, or an exception is 351 * thrown. 352 * 353 * @param b the buffer into which the data is read. 354 * @param off the start offset in the destination array <code>b</code> 355 * @param len the maximum number of bytes read. 356 * @return the total number of bytes read into the buffer, or 357 * <code>-1</code> if there is no more data because the end of 358 * the stream has been reached. 359 * @exception NullPointerException If <code>b</code> is <code>null</code>. 360 * @exception IndexOutOfBoundsException If <code>off</code> is negative, 361 * <code>len</code> is negative, or <code>len</code> is greater than 362 * <code>b.length - off</code> 363 * @exception IOException if the pipe is <a href="#BROKEN"> <code>broken</code></a>, 364 * {@link #connect(java.io.PipedOutputStream) unconnected}, 365 * closed, or if an I/O error occurs. 366 */ 367 public synchronized int read(byte b[], int off, int len) throws IOException { 368 if (b == null) { 369 throw new NullPointerException(); 370 } else if (off < 0 || len < 0 || len > b.length - off) { 371 throw new IndexOutOfBoundsException(); 372 } else if (len == 0) { 373 return 0; 374 } 375 376 /* possibly wait on the first character */ 377 int c = read(); 378 if (c < 0) { 379 return -1; 380 } 381 b[off] = (byte) c; 382 int rlen = 1; 383 while ((in >= 0) && (len > 1)) { 384 385 int available; 386 387 if (in > out) { 388 available = Math.min((buffer.length - out), (in - out)); 389 } else { 390 available = buffer.length - out; 391 } 392 393 // A byte is read beforehand outside the loop 394 if (available > (len - 1)) { 395 available = len - 1; 396 } 397 System.arraycopy(buffer, out, b, off + rlen, available); 398 out += available; 399 rlen += available; 400 len -= available; 401 402 if (out >= buffer.length) { 403 out = 0; 404 } 405 if (in == out) { 406 /* now empty */ 407 in = -1; 408 } 409 } 410 return rlen; 411 } 412 413 /** 414 * Returns the number of bytes that can be read from this input 415 * stream without blocking. 416 * 417 * @return the number of bytes that can be read from this input stream 418 * without blocking, or {@code 0} if this input stream has been 419 * closed by invoking its {@link #close()} method, or if the pipe 420 * is {@link #connect(java.io.PipedOutputStream) unconnected}, or 421 * <a href="#BROKEN"> <code>broken</code></a>. 422 * 423 * @exception IOException if an I/O error occurs. 424 * @since 1.0.2 425 */ 426 public synchronized int available() throws IOException { 427 if(in < 0) 428 return 0; 429 else if(in == out) 430 return buffer.length; 431 else if (in > out) 432 return in - out; 433 else 434 return in + buffer.length - out; 435 } 436 437 /** 438 * Closes this piped input stream and releases any system resources 439 * associated with the stream. 440 * 441 * @exception IOException if an I/O error occurs. 442 */ 443 public void close() throws IOException { 444 closedByReader = true; 445 synchronized (this) { 446 in = -1; 447 } 448 } 449 }