1 /*
   2  * Copyright (c) 2002-2012, the original author or authors.
   3  *
   4  * This software is distributable under the BSD license. See the terms of the
   5  * BSD license in the documentation provided with this software.
   6  *
   7  * http://www.opensource.org/licenses/bsd-license.php
   8  */
   9 package jdk.internal.jline.internal;
  10 
  11 import java.io.IOException;
  12 import java.io.InputStream;
  13 
  14 /**
  15  * This class wraps a regular input stream and allows it to appear as if it
  16  * is non-blocking; that is, reads can be performed against it that timeout
  17  * if no data is seen for a period of time.  This effect is achieved by having
  18  * a separate thread perform all non-blocking read requests and then
  19  * waiting on the thread to complete.
  20  *
  21  * <p>VERY IMPORTANT NOTES
  22  * <ul>
  23  *   <li> This class is not thread safe. It expects at most one reader.
  24  *   <li> The {@link #shutdown()} method must be called in order to shut down
  25  *          the thread that handles blocking I/O.
  26  * </ul>
  27  * @since 2.7
  28  * @author Scott C. Gray <scottgray1@gmail.com>
  29  */
  30 public class NonBlockingInputStream
  31     extends InputStream
  32     implements Runnable
  33 {
  34     private InputStream in;               // The actual input stream
  35     private int    ch   = -2;             // Recently read character
  36 
  37     private boolean     threadIsReading      = false;
  38     private boolean     isShutdown           = false;
  39     private IOException exception            = null;
  40     private boolean     nonBlockingEnabled;
  41 
  42     /**
  43      * Creates a <code>NonBlockingInputStream</code> out of a normal blocking
  44      * stream. Note that this call also spawn a separate thread to perform the
  45      * blocking I/O on behalf of the thread that is using this class. The
  46      * {@link #shutdown()} method must be called in order to shut this thread down.
  47      * @param in The input stream to wrap
  48      * @param isNonBlockingEnabled If true, then the non-blocking methods
  49      *   {@link #read(long)} and {@link #peek(long)} will be available and,
  50      *   more importantly, the thread will be started to provide support for the
  51      *   feature.  If false, then this class acts as a clean-passthru for the
  52      *   underlying I/O stream and provides very little overhead.
  53      */
  54     public NonBlockingInputStream (InputStream in, boolean isNonBlockingEnabled) {
  55         this.in                 = in;
  56         this.nonBlockingEnabled = isNonBlockingEnabled;
  57 
  58         if (isNonBlockingEnabled) {
  59             Thread t = new Thread(this);
  60             t.setName("NonBlockingInputStreamThread");
  61             t.setDaemon(true);
  62             t.start();
  63         }
  64     }
  65 
  66     /**
  67      * Shuts down the thread that is handling blocking I/O. Note that if the
  68      * thread is currently blocked waiting for I/O it will not actually
  69      * shut down until the I/O is received.  Shutting down the I/O thread
  70      * does not prevent this class from being used, but causes the
  71      * non-blocking methods to fail if called and causes {@link #isNonBlockingEnabled()}
  72      * to return false.
  73      */
  74     public synchronized void shutdown() {
  75         if (!isShutdown && nonBlockingEnabled) {
  76             isShutdown = true;
  77             notify();
  78         }
  79     }
  80 
  81     /**
  82      * Non-blocking is considered enabled if the feature is enabled and the
  83      * I/O thread has not been shut down.
  84      * @return true if non-blocking mode is enabled.
  85      */
  86     public boolean isNonBlockingEnabled() {
  87         return nonBlockingEnabled && !isShutdown;
  88     }
  89 
  90     @Override
  91     public void close() throws IOException {
  92         /*
  93          * The underlying input stream is closed first. This means that if the
  94          * I/O thread was blocked waiting on input, it will be woken for us.
  95          */
  96         in.close();
  97         shutdown();
  98     }
  99 
 100     @Override
 101     public int read() throws IOException {
 102         if (nonBlockingEnabled)
 103             return read(0L, false);
 104         return in.read ();
 105     }
 106 
 107     /**
 108      * Peeks to see if there is a byte waiting in the input stream without
 109      * actually consuming the byte.
 110      *
 111      * @param timeout The amount of time to wait, 0 == forever
 112      * @return -1 on eof, -2 if the timeout expired with no available input
 113      *   or the character that was read (without consuming it).
 114      * @throws IOException
 115      */
 116     public int peek(long timeout) throws IOException {
 117         if (!nonBlockingEnabled || isShutdown) {
 118             throw new UnsupportedOperationException ("peek() "
 119                 + "cannot be called as non-blocking operation is disabled");
 120         }
 121         return read(timeout, true);
 122     }
 123 
 124     /**
 125      * Attempts to read a character from the input stream for a specific
 126      * period of time.
 127      * @param timeout The amount of time to wait for the character
 128      * @return The character read, -1 if EOF is reached, or -2 if the
 129      *   read timed out.
 130      * @throws IOException
 131      */
 132     public int read(long timeout) throws IOException {
 133         if (!nonBlockingEnabled || isShutdown) {
 134             throw new UnsupportedOperationException ("read() with timeout "
 135                 + "cannot be called as non-blocking operation is disabled");
 136         }
 137         return read(timeout, false);
 138     }
 139 
 140     /**
 141      * Attempts to read a character from the input stream for a specific
 142      * period of time.
 143      * @param timeout The amount of time to wait for the character
 144      * @return The character read, -1 if EOF is reached, or -2 if the
 145      *   read timed out.
 146      * @throws IOException
 147      */
 148     private synchronized int read(long timeout, boolean isPeek) throws IOException {
 149         /*
 150          * If the thread hit an IOException, we report it.
 151          */
 152         if (exception != null) {
 153             assert ch == -2;
 154             IOException toBeThrown = exception;
 155             if (!isPeek)
 156                 exception = null;
 157             throw toBeThrown;
 158         }
 159 
 160         /*
 161          * If there was a pending character from the thread, then
 162          * we send it. If the timeout is 0L or the thread was shut down
 163          * then do a local read.
 164          */
 165         if (ch >= -1) {
 166             assert exception == null;
 167         }
 168         else if ((timeout == 0L || isShutdown) && !threadIsReading) {
 169             ch = in.read();
 170         }
 171         else {
 172             /*
 173              * If the thread isn't reading already, then ask it to do so.
 174              */
 175             if (!threadIsReading) {
 176                 threadIsReading = true;
 177                 notify();
 178             }
 179 
 180             boolean isInfinite = (timeout <= 0L);
 181 
 182             /*
 183              * So the thread is currently doing the reading for us. So
 184              * now we play the waiting game.
 185              */
 186             while (isInfinite || timeout > 0L)  {
 187                 long start = System.currentTimeMillis ();
 188 
 189                 try {
 190                     wait(timeout);
 191                 }
 192                 catch (InterruptedException e) {
 193                     /* IGNORED */
 194                 }
 195 
 196                 if (exception != null) {
 197                     assert ch == -2;
 198 
 199                     IOException toBeThrown = exception;
 200                     if (!isPeek)
 201                         exception = null;
 202                     throw toBeThrown;
 203                 }
 204 
 205                 if (ch >= -1) {
 206                     assert exception == null;
 207                     break;
 208                 }
 209 
 210                 if (!isInfinite) {
 211                     timeout -= System.currentTimeMillis() - start;
 212                 }
 213             }
 214         }
 215 
 216         /*
 217          * ch is the character that was just read. Either we set it because
 218          * a local read was performed or the read thread set it (or failed to
 219          * change it).  We will return it's value, but if this was a peek
 220          * operation, then we leave it in place.
 221          */
 222         int ret = ch;
 223         if (!isPeek) {
 224             ch = -2;
 225         }
 226         return ret;
 227     }
 228 
 229     /**
 230      * This version of read() is very specific to jline's purposes, it
 231      * will always always return a single byte at a time, rather than filling
 232      * the entire buffer.
 233      */
 234     @Override
 235     public int read (byte[] b, int off, int len) throws IOException {
 236         if (b == null) {
 237             throw new NullPointerException();
 238         } else if (off < 0 || len < 0 || len > b.length - off) {
 239             throw new IndexOutOfBoundsException();
 240         } else if (len == 0) {
 241             return 0;
 242         }
 243 
 244         int c;
 245         if (nonBlockingEnabled)
 246             c = this.read(0L);
 247         else
 248             c = in.read();
 249 
 250         if (c == -1) {
 251             return -1;
 252         }
 253         b[off] = (byte)c;
 254         return 1;
 255     }
 256 
 257     //@Override
 258     public void run () {
 259         Log.debug("NonBlockingInputStream start");
 260         boolean needToShutdown = false;
 261         boolean needToRead = false;
 262 
 263         while (!needToShutdown) {
 264 
 265             /*
 266              * Synchronize to grab variables accessed by both this thread
 267              * and the accessing thread.
 268              */
 269             synchronized (this) {
 270                 needToShutdown = this.isShutdown;
 271                 needToRead     = this.threadIsReading;
 272 
 273                 try {
 274                     /*
 275                      * Nothing to do? Then wait.
 276                      */
 277                     if (!needToShutdown && !needToRead) {
 278                         wait(0);
 279                     }
 280                 }
 281                 catch (InterruptedException e) {
 282                     /* IGNORED */
 283                 }
 284             }
 285 
 286             /*
 287              * We're not shutting down, but we need to read. This cannot
 288              * happen while we are holding the lock (which we aren't now).
 289              */
 290             if (!needToShutdown && needToRead) {
 291                 int          charRead = -2;
 292                 IOException  failure = null;
 293                 try {
 294                     charRead = in.read();
 295                 }
 296                 catch (IOException e) {
 297                     failure = e;
 298                 }
 299 
 300                 /*
 301                  * Re-grab the lock to update the state.
 302                  */
 303                 synchronized (this) {
 304                     exception       = failure;
 305                     ch              = charRead;
 306                     threadIsReading = false;
 307                     notify();
 308                 }
 309             }
 310         }
 311 
 312         Log.debug("NonBlockingInputStream shutdown");
 313     }
 314 }