--- /dev/null 2015-04-26 06:51:08.003313989 -0700 +++ new/jdk/src/jdk.internal.le/share/classes/jdk/internal/jline/internal/NonBlockingInputStream.java 2015-06-25 08:33:18.294426807 -0700 @@ -0,0 +1,314 @@ +/* + * Copyright (c) 2002-2012, the original author or authors. + * + * This software is distributable under the BSD license. See the terms of the + * BSD license in the documentation provided with this software. + * + * http://www.opensource.org/licenses/bsd-license.php + */ +package jdk.internal.jline.internal; + +import java.io.IOException; +import java.io.InputStream; + +/** + * This class wraps a regular input stream and allows it to appear as if it + * is non-blocking; that is, reads can be performed against it that timeout + * if no data is seen for a period of time. This effect is achieved by having + * a separate thread perform all non-blocking read requests and then + * waiting on the thread to complete. + * + *

VERY IMPORTANT NOTES + *

+ * @since 2.7 + * @author Scott C. Gray + */ +public class NonBlockingInputStream + extends InputStream + implements Runnable +{ + private InputStream in; // The actual input stream + private int ch = -2; // Recently read character + + private boolean threadIsReading = false; + private boolean isShutdown = false; + private IOException exception = null; + private boolean nonBlockingEnabled; + + /** + * Creates a NonBlockingInputStream out of a normal blocking + * stream. Note that this call also spawn a separate thread to perform the + * blocking I/O on behalf of the thread that is using this class. The + * {@link #shutdown()} method must be called in order to shut this thread down. + * @param in The input stream to wrap + * @param isNonBlockingEnabled If true, then the non-blocking methods + * {@link #read(long)} and {@link #peek(long)} will be available and, + * more importantly, the thread will be started to provide support for the + * feature. If false, then this class acts as a clean-passthru for the + * underlying I/O stream and provides very little overhead. + */ + public NonBlockingInputStream (InputStream in, boolean isNonBlockingEnabled) { + this.in = in; + this.nonBlockingEnabled = isNonBlockingEnabled; + + if (isNonBlockingEnabled) { + Thread t = new Thread(this); + t.setName("NonBlockingInputStreamThread"); + t.setDaemon(true); + t.start(); + } + } + + /** + * Shuts down the thread that is handling blocking I/O. Note that if the + * thread is currently blocked waiting for I/O it will not actually + * shut down until the I/O is received. Shutting down the I/O thread + * does not prevent this class from being used, but causes the + * non-blocking methods to fail if called and causes {@link #isNonBlockingEnabled()} + * to return false. + */ + public synchronized void shutdown() { + if (!isShutdown && nonBlockingEnabled) { + isShutdown = true; + notify(); + } + } + + /** + * Non-blocking is considered enabled if the feature is enabled and the + * I/O thread has not been shut down. + * @return true if non-blocking mode is enabled. + */ + public boolean isNonBlockingEnabled() { + return nonBlockingEnabled && !isShutdown; + } + + @Override + public void close() throws IOException { + /* + * The underlying input stream is closed first. This means that if the + * I/O thread was blocked waiting on input, it will be woken for us. + */ + in.close(); + shutdown(); + } + + @Override + public int read() throws IOException { + if (nonBlockingEnabled) + return read(0L, false); + return in.read (); + } + + /** + * Peeks to see if there is a byte waiting in the input stream without + * actually consuming the byte. + * + * @param timeout The amount of time to wait, 0 == forever + * @return -1 on eof, -2 if the timeout expired with no available input + * or the character that was read (without consuming it). + * @throws IOException + */ + public int peek(long timeout) throws IOException { + if (!nonBlockingEnabled || isShutdown) { + throw new UnsupportedOperationException ("peek() " + + "cannot be called as non-blocking operation is disabled"); + } + return read(timeout, true); + } + + /** + * Attempts to read a character from the input stream for a specific + * period of time. + * @param timeout The amount of time to wait for the character + * @return The character read, -1 if EOF is reached, or -2 if the + * read timed out. + * @throws IOException + */ + public int read(long timeout) throws IOException { + if (!nonBlockingEnabled || isShutdown) { + throw new UnsupportedOperationException ("read() with timeout " + + "cannot be called as non-blocking operation is disabled"); + } + return read(timeout, false); + } + + /** + * Attempts to read a character from the input stream for a specific + * period of time. + * @param timeout The amount of time to wait for the character + * @return The character read, -1 if EOF is reached, or -2 if the + * read timed out. + * @throws IOException + */ + private synchronized int read(long timeout, boolean isPeek) throws IOException { + /* + * If the thread hit an IOException, we report it. + */ + if (exception != null) { + assert ch == -2; + IOException toBeThrown = exception; + if (!isPeek) + exception = null; + throw toBeThrown; + } + + /* + * If there was a pending character from the thread, then + * we send it. If the timeout is 0L or the thread was shut down + * then do a local read. + */ + if (ch >= -1) { + assert exception == null; + } + else if ((timeout == 0L || isShutdown) && !threadIsReading) { + ch = in.read(); + } + else { + /* + * If the thread isn't reading already, then ask it to do so. + */ + if (!threadIsReading) { + threadIsReading = true; + notify(); + } + + boolean isInfinite = (timeout <= 0L); + + /* + * So the thread is currently doing the reading for us. So + * now we play the waiting game. + */ + while (isInfinite || timeout > 0L) { + long start = System.currentTimeMillis (); + + try { + wait(timeout); + } + catch (InterruptedException e) { + /* IGNORED */ + } + + if (exception != null) { + assert ch == -2; + + IOException toBeThrown = exception; + if (!isPeek) + exception = null; + throw toBeThrown; + } + + if (ch >= -1) { + assert exception == null; + break; + } + + if (!isInfinite) { + timeout -= System.currentTimeMillis() - start; + } + } + } + + /* + * ch is the character that was just read. Either we set it because + * a local read was performed or the read thread set it (or failed to + * change it). We will return it's value, but if this was a peek + * operation, then we leave it in place. + */ + int ret = ch; + if (!isPeek) { + ch = -2; + } + return ret; + } + + /** + * This version of read() is very specific to jline's purposes, it + * will always always return a single byte at a time, rather than filling + * the entire buffer. + */ + @Override + public int read (byte[] b, int off, int len) throws IOException { + if (b == null) { + throw new NullPointerException(); + } else if (off < 0 || len < 0 || len > b.length - off) { + throw new IndexOutOfBoundsException(); + } else if (len == 0) { + return 0; + } + + int c; + if (nonBlockingEnabled) + c = this.read(0L); + else + c = in.read(); + + if (c == -1) { + return -1; + } + b[off] = (byte)c; + return 1; + } + + //@Override + public void run () { + Log.debug("NonBlockingInputStream start"); + boolean needToShutdown = false; + boolean needToRead = false; + + while (!needToShutdown) { + + /* + * Synchronize to grab variables accessed by both this thread + * and the accessing thread. + */ + synchronized (this) { + needToShutdown = this.isShutdown; + needToRead = this.threadIsReading; + + try { + /* + * Nothing to do? Then wait. + */ + if (!needToShutdown && !needToRead) { + wait(0); + } + } + catch (InterruptedException e) { + /* IGNORED */ + } + } + + /* + * We're not shutting down, but we need to read. This cannot + * happen while we are holding the lock (which we aren't now). + */ + if (!needToShutdown && needToRead) { + int charRead = -2; + IOException failure = null; + try { + charRead = in.read(); + } + catch (IOException e) { + failure = e; + } + + /* + * Re-grab the lock to update the state. + */ + synchronized (this) { + exception = failure; + ch = charRead; + threadIsReading = false; + notify(); + } + } + } + + Log.debug("NonBlockingInputStream shutdown"); + } +}