1 /* 2 * Copyright (c) 1995, 2006, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package java.io; 27 28 /** 29 * A piped input stream should be connected 30 * to a piped output stream; the piped input 31 * stream then provides whatever data bytes 32 * are written to the piped output stream. 33 * Typically, data is read from a <code>PipedInputStream</code> 34 * object by one thread and data is written 35 * to the corresponding <code>PipedOutputStream</code> 36 * by some other thread. Attempting to use 37 * both objects from a single thread is not 38 * recommended, as it may deadlock the thread. 39 * The piped input stream contains a buffer, 40 * decoupling read operations from write operations, 41 * within limits. 42 * A pipe is said to be <a name=BROKEN> <i>broken</i> </a> if a 43 * thread that was providing data bytes to the connected 44 * piped output stream is no longer alive. 45 * 46 * @author James Gosling 47 * @see java.io.PipedOutputStream 48 * @since JDK1.0 49 */ 50 public class PipedInputStream extends InputStream { 51 boolean closedByWriter = false; 52 volatile boolean closedByReader = false; 53 boolean connected = false; 54 55 /* REMIND: identification of the read and write sides needs to be 56 more sophisticated. Either using thread groups (but what about 57 pipes within a thread?) or using finalization (but it may be a 58 long time until the next GC). */ 59 Thread readSide; 60 Thread writeSide; 61 62 private static final int DEFAULT_PIPE_SIZE = 1024; 63 64 /** 65 * The default size of the pipe's circular input buffer. 66 * @since JDK1.1 67 */ 68 // This used to be a constant before the pipe size was allowed 69 // to change. This field will continue to be maintained 70 // for backward compatibility. 71 protected static final int PIPE_SIZE = DEFAULT_PIPE_SIZE; 72 73 /** 74 * The circular buffer into which incoming data is placed. 75 * @since JDK1.1 76 */ 77 protected byte buffer[]; 78 79 /** 80 * The index of the position in the circular buffer at which the 81 * next byte of data will be stored when received from the connected 82 * piped output stream. <code>in<0</code> implies the buffer is empty, 83 * <code>in==out</code> implies the buffer is full 84 * @since JDK1.1 85 */ 86 protected int in = -1; 87 88 /** 89 * The index of the position in the circular buffer at which the next 90 * byte of data will be read by this piped input stream. 91 * @since JDK1.1 92 */ 93 protected int out = 0; 94 95 /** 96 * Creates a <code>PipedInputStream</code> so 97 * that it is connected to the piped output 98 * stream <code>src</code>. Data bytes written 99 * to <code>src</code> will then be available 100 * as input from this stream. 101 * 102 * @param src the stream to connect to. 103 * @exception IOException if an I/O error occurs. 104 */ 105 public PipedInputStream(PipedOutputStream src) throws IOException { 106 this(src, DEFAULT_PIPE_SIZE); 107 } 108 109 /** 110 * Creates a <code>PipedInputStream</code> so that it is 111 * connected to the piped output stream 112 * <code>src</code> and uses the specified pipe size for 113 * the pipe's buffer. 114 * Data bytes written to <code>src</code> will then 115 * be available as input from this stream. 116 * 117 * @param src the stream to connect to. 118 * @param pipeSize the size of the pipe's buffer. 119 * @exception IOException if an I/O error occurs. 120 * @exception IllegalArgumentException if <code>pipeSize <= 0</code>. 121 * @since 1.6 122 */ 123 public PipedInputStream(PipedOutputStream src, int pipeSize) 124 throws IOException { 125 initPipe(pipeSize); 126 connect(src); 127 } 128 129 /** 130 * Creates a <code>PipedInputStream</code> so 131 * that it is not yet {@linkplain #connect(java.io.PipedOutputStream) 132 * connected}. 133 * It must be {@linkplain java.io.PipedOutputStream#connect( 134 * java.io.PipedInputStream) connected} to a 135 * <code>PipedOutputStream</code> before being used. 136 */ 137 public PipedInputStream() { 138 initPipe(DEFAULT_PIPE_SIZE); 139 } 140 141 /** 142 * Creates a <code>PipedInputStream</code> so that it is not yet 143 * {@linkplain #connect(java.io.PipedOutputStream) connected} and 144 * uses the specified pipe size for the pipe's buffer. 145 * It must be {@linkplain java.io.PipedOutputStream#connect( 146 * java.io.PipedInputStream) 147 * connected} to a <code>PipedOutputStream</code> before being used. 148 * 149 * @param pipeSize the size of the pipe's buffer. 150 * @exception IllegalArgumentException if <code>pipeSize <= 0</code>. 151 * @since 1.6 152 */ 153 public PipedInputStream(int pipeSize) { 154 initPipe(pipeSize); 155 } 156 157 private void initPipe(int pipeSize) { 158 if (pipeSize <= 0) { 159 throw new IllegalArgumentException("Pipe Size <= 0"); 160 } 161 buffer = new byte[pipeSize]; 162 } 163 164 /** 165 * Causes this piped input stream to be connected 166 * to the piped output stream <code>src</code>. 167 * If this object is already connected to some 168 * other piped output stream, an <code>IOException</code> 169 * is thrown. 170 * <p> 171 * If <code>src</code> is an 172 * unconnected piped output stream and <code>snk</code> 173 * is an unconnected piped input stream, they 174 * may be connected by either the call: 175 * <p> 176 * <pre><code>snk.connect(src)</code> </pre> 177 * <p> 178 * or the call: 179 * <p> 180 * <pre><code>src.connect(snk)</code> </pre> 181 * <p> 182 * The two 183 * calls have the same effect. 184 * 185 * @param src The piped output stream to connect to. 186 * @exception IOException if an I/O error occurs. 187 */ 188 public void connect(PipedOutputStream src) throws IOException { 189 src.connect(this); 190 } 191 192 /** 193 * Receives a byte of data. This method will block if no input is 194 * available. 195 * @param b the byte being received 196 * @exception IOException If the pipe is <a href=#BROKEN> <code>broken</code></a>, 197 * {@link #connect(java.io.PipedOutputStream) unconnected}, 198 * closed, or if an I/O error occurs. 199 * @since JDK1.1 200 */ 201 protected synchronized void receive(int b) throws IOException { 202 checkStateForReceive(); 203 writeSide = Thread.currentThread(); 204 if (in == out) 205 awaitSpace(); 206 if (in < 0) { 207 in = 0; 208 out = 0; 209 } 210 buffer[in++] = (byte)(b & 0xFF); 211 if (in >= buffer.length) { 212 in = 0; 213 } 214 } 215 216 /** 217 * Receives data into an array of bytes. This method will 218 * block until some input is available. 219 * @param b the buffer into which the data is received 220 * @param off the start offset of the data 221 * @param len the maximum number of bytes received 222 * @exception IOException If the pipe is <a href=#BROKEN> broken</a>, 223 * {@link #connect(java.io.PipedOutputStream) unconnected}, 224 * closed,or if an I/O error occurs. 225 */ 226 synchronized void receive(byte b[], int off, int len) throws IOException { 227 checkStateForReceive(); 228 writeSide = Thread.currentThread(); 229 int bytesToTransfer = len; 230 while (bytesToTransfer > 0) { 231 if (in == out) 232 awaitSpace(); 233 int nextTransferAmount = 0; 234 if (out < in) { 235 nextTransferAmount = buffer.length - in; 236 } else if (in < out) { 237 if (in == -1) { 238 in = out = 0; 239 nextTransferAmount = buffer.length - in; 240 } else { 241 nextTransferAmount = out - in; 242 } 243 } 244 if (nextTransferAmount > bytesToTransfer) 245 nextTransferAmount = bytesToTransfer; 246 assert(nextTransferAmount > 0); 247 System.arraycopy(b, off, buffer, in, nextTransferAmount); 248 bytesToTransfer -= nextTransferAmount; 249 off += nextTransferAmount; 250 in += nextTransferAmount; 251 if (in >= buffer.length) { 252 in = 0; 253 } 254 } 255 } 256 257 private void checkStateForReceive() throws IOException { 258 if (!connected) { 259 throw new IOException("Pipe not connected"); 260 } else if (closedByWriter || closedByReader) { 261 throw new IOException("Pipe closed"); 262 } else if (readSide != null && !readSide.isAlive()) { 263 throw new IOException("Read end dead"); 264 } 265 } 266 267 private void awaitSpace() throws IOException { 268 while (in == out) { 269 checkStateForReceive(); 270 271 /* full: kick any waiting readers */ 272 notifyAll(); 273 try { 274 wait(1000); 275 } catch (InterruptedException ex) { 276 throw new java.io.InterruptedIOException(); 277 } 278 } 279 } 280 281 /** 282 * Notifies all waiting threads that the last byte of data has been 283 * received. 284 */ 285 synchronized void receivedLast() { 286 closedByWriter = true; 287 notifyAll(); 288 } 289 290 /** 291 * Reads the next byte of data from this piped input stream. The 292 * value byte is returned as an <code>int</code> in the range 293 * <code>0</code> to <code>255</code>. 294 * This method blocks until input data is available, the end of the 295 * stream is detected, or an exception is thrown. 296 * 297 * @return the next byte of data, or <code>-1</code> if the end of the 298 * stream is reached. 299 * @exception IOException if the pipe is 300 * {@link #connect(java.io.PipedOutputStream) unconnected}, 301 * <a href=#BROKEN> <code>broken</code></a>, closed, 302 * or if an I/O error occurs. 303 */ 304 public synchronized int read() throws IOException { 305 if (!connected) { 306 throw new IOException("Pipe not connected"); 307 } else if (closedByReader) { 308 throw new IOException("Pipe closed"); 309 } else if (writeSide != null && !writeSide.isAlive() 310 && !closedByWriter && (in < 0)) { 311 throw new IOException("Write end dead"); 312 } 313 314 readSide = Thread.currentThread(); 315 int trials = 2; 316 while (in < 0) { 317 if (closedByWriter) { 318 /* closed by writer, return EOF */ 319 return -1; 320 } 321 if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)) { 322 throw new IOException("Pipe broken"); 323 } 324 /* might be a writer waiting */ 325 notifyAll(); 326 try { 327 wait(1000); 328 } catch (InterruptedException ex) { 329 throw new java.io.InterruptedIOException(); 330 } 331 } 332 int ret = buffer[out++] & 0xFF; 333 if (out >= buffer.length) { 334 out = 0; 335 } 336 if (in == out) { 337 /* now empty */ 338 in = -1; 339 } 340 341 return ret; 342 } 343 344 /** 345 * Reads up to <code>len</code> bytes of data from this piped input 346 * stream into an array of bytes. Less than <code>len</code> bytes 347 * will be read if the end of the data stream is reached or if 348 * <code>len</code> exceeds the pipe's buffer size. 349 * If <code>len </code> is zero, then no bytes are read and 0 is returned; 350 * otherwise, the method blocks until at least 1 byte of input is 351 * available, end of the stream has been detected, or an exception is 352 * thrown. 353 * 354 * @param b the buffer into which the data is read. 355 * @param off the start offset in the destination array <code>b</code> 356 * @param len the maximum number of bytes read. 357 * @return the total number of bytes read into the buffer, or 358 * <code>-1</code> if there is no more data because the end of 359 * the stream has been reached. 360 * @exception NullPointerException If <code>b</code> is <code>null</code>. 361 * @exception IndexOutOfBoundsException If <code>off</code> is negative, 362 * <code>len</code> is negative, or <code>len</code> is greater than 363 * <code>b.length - off</code> 364 * @exception IOException if the pipe is <a href=#BROKEN> <code>broken</code></a>, 365 * {@link #connect(java.io.PipedOutputStream) unconnected}, 366 * closed, or if an I/O error occurs. 367 */ 368 public synchronized int read(byte b[], int off, int len) throws IOException { 369 if (b == null) { 370 throw new NullPointerException(); 371 } else if (off < 0 || len < 0 || len > b.length - off) { 372 throw new IndexOutOfBoundsException(); 373 } else if (len == 0) { 374 return 0; 375 } 376 377 /* possibly wait on the first character */ 378 int c = read(); 379 if (c < 0) { 380 return -1; 381 } 382 b[off] = (byte) c; 383 int rlen = 1; 384 while ((in >= 0) && (len > 1)) { 385 386 int available; 387 388 if (in > out) { 389 available = Math.min((buffer.length - out), (in - out)); 390 } else { 391 available = buffer.length - out; 392 } 393 394 // A byte is read beforehand outside the loop 395 if (available > (len - 1)) { 396 available = len - 1; 397 } 398 System.arraycopy(buffer, out, b, off + rlen, available); 399 out += available; 400 rlen += available; 401 len -= available; 402 403 if (out >= buffer.length) { 404 out = 0; 405 } 406 if (in == out) { 407 /* now empty */ 408 in = -1; 409 } 410 } 411 return rlen; 412 } 413 414 /** 415 * Returns the number of bytes that can be read from this input 416 * stream without blocking. 417 * 418 * @return the number of bytes that can be read from this input stream 419 * without blocking, or {@code 0} if this input stream has been 420 * closed by invoking its {@link #close()} method, or if the pipe 421 * is {@link #connect(java.io.PipedOutputStream) unconnected}, or 422 * <a href=#BROKEN> <code>broken</code></a>. 423 * 424 * @exception IOException if an I/O error occurs. 425 * @since JDK1.0.2 426 */ 427 public synchronized int available() throws IOException { 428 if(in < 0) 429 return 0; 430 else if(in == out) 431 return buffer.length; 432 else if (in > out) 433 return in - out; 434 else 435 return in + buffer.length - out; 436 } 437 438 /** 439 * Closes this piped input stream and releases any system resources 440 * associated with the stream. 441 * 442 * @exception IOException if an I/O error occurs. 443 */ 444 public void close() throws IOException { 445 closedByReader = true; 446 synchronized (this) { 447 in = -1; 448 } 449 } 450 }