1 /* 2 * Copyright (c) 2002-2012, the original author or authors. 3 * 4 * This software is distributable under the BSD license. See the terms of the 5 * BSD license in the documentation provided with this software. 6 * 7 * http://www.opensource.org/licenses/bsd-license.php 8 */ 9 package jdk.internal.jline.internal; 10 11 import java.io.IOException; 12 import java.io.InputStream; 13 14 /** 15 * This class wraps a regular input stream and allows it to appear as if it 16 * is non-blocking; that is, reads can be performed against it that timeout 17 * if no data is seen for a period of time. This effect is achieved by having 18 * a separate thread perform all non-blocking read requests and then 19 * waiting on the thread to complete. 20 * 21 * <p>VERY IMPORTANT NOTES 22 * <ul> 23 * <li> This class is not thread safe. It expects at most one reader. 24 * <li> The {@link #shutdown()} method must be called in order to shut down 25 * the thread that handles blocking I/O. 26 * </ul> 27 * @since 2.7 28 * @author Scott C. Gray <scottgray1@gmail.com> 29 */ 30 public class NonBlockingInputStream 31 extends InputStream 32 implements Runnable 33 { 34 private InputStream in; // The actual input stream 35 private int ch = -2; // Recently read character 36 37 private boolean threadIsReading = false; 38 private boolean isShutdown = false; 39 private IOException exception = null; 40 private boolean nonBlockingEnabled; 41 42 /** 43 * Creates a <code>NonBlockingInputStream</code> out of a normal blocking 44 * stream. Note that this call also spawn a separate thread to perform the 45 * blocking I/O on behalf of the thread that is using this class. The 46 * {@link #shutdown()} method must be called in order to shut this thread down. 47 * @param in The input stream to wrap 48 * @param isNonBlockingEnabled If true, then the non-blocking methods 49 * {@link #read(long)} and {@link #peek(long)} will be available and, 50 * more importantly, the thread will be started to provide support for the 51 * feature. If false, then this class acts as a clean-passthru for the 52 * underlying I/O stream and provides very little overhead. 53 */ 54 public NonBlockingInputStream (InputStream in, boolean isNonBlockingEnabled) { 55 this.in = in; 56 this.nonBlockingEnabled = isNonBlockingEnabled; 57 58 if (isNonBlockingEnabled) { 59 Thread t = new Thread(this); 60 t.setName("NonBlockingInputStreamThread"); 61 t.setDaemon(true); 62 t.start(); 63 } 64 } 65 66 /** 67 * Shuts down the thread that is handling blocking I/O. Note that if the 68 * thread is currently blocked waiting for I/O it will not actually 69 * shut down until the I/O is received. Shutting down the I/O thread 70 * does not prevent this class from being used, but causes the 71 * non-blocking methods to fail if called and causes {@link #isNonBlockingEnabled()} 72 * to return false. 73 */ 74 public synchronized void shutdown() { 75 if (!isShutdown && nonBlockingEnabled) { 76 isShutdown = true; 77 notify(); 78 } 79 } 80 81 /** 82 * Non-blocking is considered enabled if the feature is enabled and the 83 * I/O thread has not been shut down. 84 * @return true if non-blocking mode is enabled. 85 */ 86 public boolean isNonBlockingEnabled() { 87 return nonBlockingEnabled && !isShutdown; 88 } 89 90 @Override 91 public void close() throws IOException { 92 /* 93 * The underlying input stream is closed first. This means that if the 94 * I/O thread was blocked waiting on input, it will be woken for us. 95 */ 96 in.close(); 97 shutdown(); 98 } 99 100 @Override 101 public int read() throws IOException { 102 if (nonBlockingEnabled) 103 return read(0L, false); 104 return in.read (); 105 } 106 107 /** 108 * Peeks to see if there is a byte waiting in the input stream without 109 * actually consuming the byte. 110 * 111 * @param timeout The amount of time to wait, 0 == forever 112 * @return -1 on eof, -2 if the timeout expired with no available input 113 * or the character that was read (without consuming it). 114 * @throws IOException 115 */ 116 public int peek(long timeout) throws IOException { 117 if (!nonBlockingEnabled || isShutdown) { 118 throw new UnsupportedOperationException ("peek() " 119 + "cannot be called as non-blocking operation is disabled"); 120 } 121 return read(timeout, true); 122 } 123 124 /** 125 * Attempts to read a character from the input stream for a specific 126 * period of time. 127 * @param timeout The amount of time to wait for the character 128 * @return The character read, -1 if EOF is reached, or -2 if the 129 * read timed out. 130 * @throws IOException 131 */ 132 public int read(long timeout) throws IOException { 133 if (!nonBlockingEnabled || isShutdown) { 134 throw new UnsupportedOperationException ("read() with timeout " 135 + "cannot be called as non-blocking operation is disabled"); 136 } 137 return read(timeout, false); 138 } 139 140 /** 141 * Attempts to read a character from the input stream for a specific 142 * period of time. 143 * @param timeout The amount of time to wait for the character 144 * @return The character read, -1 if EOF is reached, or -2 if the 145 * read timed out. 146 * @throws IOException 147 */ 148 private synchronized int read(long timeout, boolean isPeek) throws IOException { 149 /* 150 * If the thread hit an IOException, we report it. 151 */ 152 if (exception != null) { 153 assert ch == -2; 154 IOException toBeThrown = exception; 155 if (!isPeek) 156 exception = null; 157 throw toBeThrown; 158 } 159 160 /* 161 * If there was a pending character from the thread, then 162 * we send it. If the timeout is 0L or the thread was shut down 163 * then do a local read. 164 */ 165 if (ch >= -1) { 166 assert exception == null; 167 } 168 else if ((timeout == 0L || isShutdown) && !threadIsReading) { 169 ch = in.read(); 170 } 171 else { 172 /* 173 * If the thread isn't reading already, then ask it to do so. 174 */ 175 if (!threadIsReading) { 176 threadIsReading = true; 177 notify(); 178 } 179 180 boolean isInfinite = (timeout <= 0L); 181 182 /* 183 * So the thread is currently doing the reading for us. So 184 * now we play the waiting game. 185 */ 186 while (isInfinite || timeout > 0L) { 187 long start = System.currentTimeMillis (); 188 189 try { 190 wait(timeout); 191 } 192 catch (InterruptedException e) { 193 /* IGNORED */ 194 } 195 196 if (exception != null) { 197 assert ch == -2; 198 199 IOException toBeThrown = exception; 200 if (!isPeek) 201 exception = null; 202 throw toBeThrown; 203 } 204 205 if (ch >= -1) { 206 assert exception == null; 207 break; 208 } 209 210 if (!isInfinite) { 211 timeout -= System.currentTimeMillis() - start; 212 } 213 } 214 } 215 216 /* 217 * ch is the character that was just read. Either we set it because 218 * a local read was performed or the read thread set it (or failed to 219 * change it). We will return it's value, but if this was a peek 220 * operation, then we leave it in place. 221 */ 222 int ret = ch; 223 if (!isPeek) { 224 ch = -2; 225 } 226 return ret; 227 } 228 229 /** 230 * This version of read() is very specific to jline's purposes, it 231 * will always always return a single byte at a time, rather than filling 232 * the entire buffer. 233 */ 234 @Override 235 public int read (byte[] b, int off, int len) throws IOException { 236 if (b == null) { 237 throw new NullPointerException(); 238 } else if (off < 0 || len < 0 || len > b.length - off) { 239 throw new IndexOutOfBoundsException(); 240 } else if (len == 0) { 241 return 0; 242 } 243 244 int c; 245 if (nonBlockingEnabled) 246 c = this.read(0L); 247 else 248 c = in.read(); 249 250 if (c == -1) { 251 return -1; 252 } 253 b[off] = (byte)c; 254 return 1; 255 } 256 257 //@Override 258 public void run () { 259 Log.debug("NonBlockingInputStream start"); 260 boolean needToShutdown = false; 261 boolean needToRead = false; 262 263 while (!needToShutdown) { 264 265 /* 266 * Synchronize to grab variables accessed by both this thread 267 * and the accessing thread. 268 */ 269 synchronized (this) { 270 needToShutdown = this.isShutdown; 271 needToRead = this.threadIsReading; 272 273 try { 274 /* 275 * Nothing to do? Then wait. 276 */ 277 if (!needToShutdown && !needToRead) { 278 wait(0); 279 } 280 } 281 catch (InterruptedException e) { 282 /* IGNORED */ 283 } 284 } 285 286 /* 287 * We're not shutting down, but we need to read. This cannot 288 * happen while we are holding the lock (which we aren't now). 289 */ 290 if (!needToShutdown && needToRead) { 291 int charRead = -2; 292 IOException failure = null; 293 try { 294 charRead = in.read(); 295 } 296 catch (IOException e) { 297 failure = e; 298 } 299 300 /* 301 * Re-grab the lock to update the state. 302 */ 303 synchronized (this) { 304 exception = failure; 305 ch = charRead; 306 threadIsReading = false; 307 notify(); 308 } 309 } 310 } 311 312 Log.debug("NonBlockingInputStream shutdown"); 313 } 314 }