1 /* 2 * Copyright 2000-2009 Sun Microsystems, Inc. All Rights Reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Sun designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Sun in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, 22 * CA 95054 USA or visit www.sun.com if you need additional information or 23 * have any questions. 24 */ 25 26 package java.nio.channels; 27 28 import java.io.FileInputStream; 29 import java.io.FileOutputStream; 30 import java.io.InputStream; 31 import java.io.OutputStream; 32 import java.io.Reader; 33 import java.io.Writer; 34 import java.io.IOException; 35 import java.nio.ByteBuffer; 36 import java.nio.charset.Charset; 37 import java.nio.charset.CharsetDecoder; 38 import java.nio.charset.CharsetEncoder; 39 import java.nio.charset.UnsupportedCharsetException; 40 import java.nio.channels.spi.AbstractInterruptibleChannel; 41 import java.util.concurrent.ExecutionException; 42 import sun.nio.ch.ChannelInputStream; 43 import sun.nio.cs.StreamDecoder; 44 import sun.nio.cs.StreamEncoder; 45 46 47 /** 48 * Utility methods for channels and streams. 49 * 50 * <p> This class defines static methods that support the interoperation of the 51 * stream classes of the <tt>{@link java.io}</tt> package with the channel 52 * classes of this package. </p> 53 * 54 * 55 * @author Mark Reinhold 56 * @author Mike McCloskey 57 * @author JSR-51 Expert Group 58 * @since 1.4 59 */ 60 61 public final class Channels { 62 63 private Channels() { } // No instantiation 64 65 private static void checkNotNull(Object o, String name) { 66 if (o == null) 67 throw new NullPointerException("\"" + name + "\" is null!"); 68 } 69 70 /** 71 * Write all remaining bytes in buffer to the given channel. 72 * If the channel is selectable then it must be configured blocking. 73 */ 74 private static void writeFullyImpl(WritableByteChannel ch, ByteBuffer bb) 75 throws IOException 76 { 77 while (bb.remaining() > 0) { 78 int n = ch.write(bb); 79 if (n <= 0) 80 throw new RuntimeException("no bytes written"); 81 } 82 } 83 84 /** 85 * Write all remaining bytes in buffer to the given channel. 86 * 87 * @throws IllegalBlockingException 88 * If the channel is selectable and configured non-blocking. 89 */ 90 private static void writeFully(WritableByteChannel ch, ByteBuffer bb) 91 throws IOException 92 { 93 if (ch instanceof SelectableChannel) { 94 SelectableChannel sc = (SelectableChannel)ch; 95 synchronized (sc.blockingLock()) { 96 if (!sc.isBlocking()) 97 throw new IllegalBlockingModeException(); 98 writeFullyImpl(ch, bb); 99 } 100 } else { 101 writeFullyImpl(ch, bb); 102 } 103 } 104 105 // -- Byte streams from channels -- 106 107 /** 108 * Constructs a stream that reads bytes from the given channel. 109 * 110 * <p> The <tt>read</tt> methods of the resulting stream will throw an 111 * {@link IllegalBlockingModeException} if invoked while the underlying 112 * channel is in non-blocking mode. The stream will not be buffered, and 113 * it will not support the {@link InputStream#mark mark} or {@link 114 * InputStream#reset reset} methods. The stream will be safe for access by 115 * multiple concurrent threads. Closing the stream will in turn cause the 116 * channel to be closed. </p> 117 * 118 * @param ch 119 * The channel from which bytes will be read 120 * 121 * @return A new input stream 122 */ 123 public static InputStream newInputStream(ReadableByteChannel ch) { 124 checkNotNull(ch, "ch"); 125 return new sun.nio.ch.ChannelInputStream(ch); 126 } 127 128 /** 129 * Constructs a stream that writes bytes to the given channel. 130 * 131 * <p> The <tt>write</tt> methods of the resulting stream will throw an 132 * {@link IllegalBlockingModeException} if invoked while the underlying 133 * channel is in non-blocking mode. The stream will not be buffered. The 134 * stream will be safe for access by multiple concurrent threads. Closing 135 * the stream will in turn cause the channel to be closed. </p> 136 * 137 * @param ch 138 * The channel to which bytes will be written 139 * 140 * @return A new output stream 141 */ 142 public static OutputStream newOutputStream(final WritableByteChannel ch) { 143 checkNotNull(ch, "ch"); 144 145 return new OutputStream() { 146 147 private ByteBuffer bb = null; 148 private byte[] bs = null; // Invoker's previous array 149 private byte[] b1 = null; 150 151 public synchronized void write(int b) throws IOException { 152 if (b1 == null) 153 b1 = new byte[1]; 154 b1[0] = (byte)b; 155 this.write(b1); 156 } 157 158 public synchronized void write(byte[] bs, int off, int len) 159 throws IOException 160 { 161 if ((off < 0) || (off > bs.length) || (len < 0) || 162 ((off + len) > bs.length) || ((off + len) < 0)) { 163 throw new IndexOutOfBoundsException(); 164 } else if (len == 0) { 165 return; 166 } 167 ByteBuffer bb = ((this.bs == bs) 168 ? this.bb 169 : ByteBuffer.wrap(bs)); 170 bb.limit(Math.min(off + len, bb.capacity())); 171 bb.position(off); 172 this.bb = bb; 173 this.bs = bs; 174 Channels.writeFully(ch, bb); 175 } 176 177 public void close() throws IOException { 178 ch.close(); 179 } 180 181 }; 182 } 183 184 /** 185 * {@note new} 186 * Constructs a stream that reads bytes from the given channel. 187 * 188 * <p> The stream will not be buffered, and it will not support the {@link 189 * InputStream#mark mark} or {@link InputStream#reset reset} methods. The 190 * stream will be safe for access by multiple concurrent threads. Closing 191 * the stream will in turn cause the channel to be closed. </p> 192 * 193 * @param ch 194 * The channel from which bytes will be read 195 * 196 * @return A new input stream 197 * 198 * @since 1.7 199 */ 200 public static InputStream newInputStream(final AsynchronousByteChannel ch) { 201 checkNotNull(ch, "ch"); 202 return new InputStream() { 203 204 private ByteBuffer bb = null; 205 private byte[] bs = null; // Invoker's previous array 206 private byte[] b1 = null; 207 208 @Override 209 public synchronized int read() throws IOException { 210 if (b1 == null) 211 b1 = new byte[1]; 212 int n = this.read(b1); 213 if (n == 1) 214 return b1[0] & 0xff; 215 return -1; 216 } 217 218 @Override 219 public synchronized int read(byte[] bs, int off, int len) 220 throws IOException 221 { 222 if ((off < 0) || (off > bs.length) || (len < 0) || 223 ((off + len) > bs.length) || ((off + len) < 0)) { 224 throw new IndexOutOfBoundsException(); 225 } else if (len == 0) 226 return 0; 227 228 ByteBuffer bb = ((this.bs == bs) 229 ? this.bb 230 : ByteBuffer.wrap(bs)); 231 bb.position(off); 232 bb.limit(Math.min(off + len, bb.capacity())); 233 this.bb = bb; 234 this.bs = bs; 235 236 boolean interrupted = false; 237 try { 238 for (;;) { 239 try { 240 return ch.read(bb).get(); 241 } catch (ExecutionException ee) { 242 throw new IOException(ee.getCause()); 243 } catch (InterruptedException ie) { 244 interrupted = true; 245 } 246 } 247 } finally { 248 if (interrupted) 249 Thread.currentThread().interrupt(); 250 } 251 } 252 253 @Override 254 public void close() throws IOException { 255 ch.close(); 256 } 257 }; 258 } 259 260 /** 261 * {@note new} 262 * Constructs a stream that writes bytes to the given channel. 263 * 264 * <p> The stream will not be buffered. The stream will be safe for access 265 * by multiple concurrent threads. Closing the stream will in turn cause 266 * the channel to be closed. </p> 267 * 268 * @param ch 269 * The channel to which bytes will be written 270 * 271 * @return A new output stream 272 * 273 * @since 1.7 274 */ 275 public static OutputStream newOutputStream(final AsynchronousByteChannel ch) { 276 checkNotNull(ch, "ch"); 277 return new OutputStream() { 278 279 private ByteBuffer bb = null; 280 private byte[] bs = null; // Invoker's previous array 281 private byte[] b1 = null; 282 283 @Override 284 public synchronized void write(int b) throws IOException { 285 if (b1 == null) 286 b1 = new byte[1]; 287 b1[0] = (byte)b; 288 this.write(b1); 289 } 290 291 @Override 292 public synchronized void write(byte[] bs, int off, int len) 293 throws IOException 294 { 295 if ((off < 0) || (off > bs.length) || (len < 0) || 296 ((off + len) > bs.length) || ((off + len) < 0)) { 297 throw new IndexOutOfBoundsException(); 298 } else if (len == 0) { 299 return; 300 } 301 ByteBuffer bb = ((this.bs == bs) 302 ? this.bb 303 : ByteBuffer.wrap(bs)); 304 bb.limit(Math.min(off + len, bb.capacity())); 305 bb.position(off); 306 this.bb = bb; 307 this.bs = bs; 308 309 boolean interrupted = false; 310 try { 311 while (bb.remaining() > 0) { 312 try { 313 ch.write(bb).get(); 314 } catch (ExecutionException ee) { 315 throw new IOException(ee.getCause()); 316 } catch (InterruptedException ie) { 317 interrupted = true; 318 } 319 } 320 } finally { 321 if (interrupted) 322 Thread.currentThread().interrupt(); 323 } 324 } 325 326 @Override 327 public void close() throws IOException { 328 ch.close(); 329 } 330 }; 331 } 332 333 334 // -- Channels from streams -- 335 336 /** 337 * Constructs a channel that reads bytes from the given stream. 338 * 339 * <p> The resulting channel will not be buffered; it will simply redirect 340 * its I/O operations to the given stream. Closing the channel will in 341 * turn cause the stream to be closed. </p> 342 * 343 * @param in 344 * The stream from which bytes are to be read 345 * 346 * @return A new readable byte channel 347 */ 348 public static ReadableByteChannel newChannel(final InputStream in) { 349 checkNotNull(in, "in"); 350 351 if (in instanceof FileInputStream && 352 FileInputStream.class.equals(in.getClass())) { 353 return ((FileInputStream)in).getChannel(); 354 } 355 356 return new ReadableByteChannelImpl(in); 357 } 358 359 private static class ReadableByteChannelImpl 360 extends AbstractInterruptibleChannel // Not really interruptible 361 implements ReadableByteChannel 362 { 363 InputStream in; 364 private static final int TRANSFER_SIZE = 8192; 365 private byte buf[] = new byte[0]; 366 private boolean open = true; 367 private Object readLock = new Object(); 368 369 ReadableByteChannelImpl(InputStream in) { 370 this.in = in; 371 } 372 373 public int read(ByteBuffer dst) throws IOException { 374 int len = dst.remaining(); 375 int totalRead = 0; 376 int bytesRead = 0; 377 synchronized (readLock) { 378 while (totalRead < len) { 379 int bytesToRead = Math.min((len - totalRead), 380 TRANSFER_SIZE); 381 if (buf.length < bytesToRead) 382 buf = new byte[bytesToRead]; 383 if ((totalRead > 0) && !(in.available() > 0)) 384 break; // block at most once 385 try { 386 begin(); 387 bytesRead = in.read(buf, 0, bytesToRead); 388 } finally { 389 end(bytesRead > 0); 390 } 391 if (bytesRead < 0) 392 break; 393 else 394 totalRead += bytesRead; 395 dst.put(buf, 0, bytesRead); 396 } 397 if ((bytesRead < 0) && (totalRead == 0)) 398 return -1; 399 400 return totalRead; 401 } 402 } 403 404 protected void implCloseChannel() throws IOException { 405 in.close(); 406 open = false; 407 } 408 } 409 410 411 /** 412 * Constructs a channel that writes bytes to the given stream. 413 * 414 * <p> The resulting channel will not be buffered; it will simply redirect 415 * its I/O operations to the given stream. Closing the channel will in 416 * turn cause the stream to be closed. </p> 417 * 418 * @param out 419 * The stream to which bytes are to be written 420 * 421 * @return A new writable byte channel 422 */ 423 public static WritableByteChannel newChannel(final OutputStream out) { 424 checkNotNull(out, "out"); 425 426 if (out instanceof FileOutputStream && 427 FileOutputStream.class.equals(out.getClass())) { 428 return ((FileOutputStream)out).getChannel(); 429 } 430 431 return new WritableByteChannelImpl(out); 432 } 433 434 private static class WritableByteChannelImpl 435 extends AbstractInterruptibleChannel // Not really interruptible 436 implements WritableByteChannel 437 { 438 OutputStream out; 439 private static final int TRANSFER_SIZE = 8192; 440 private byte buf[] = new byte[0]; 441 private boolean open = true; 442 private Object writeLock = new Object(); 443 444 WritableByteChannelImpl(OutputStream out) { 445 this.out = out; 446 } 447 448 public int write(ByteBuffer src) throws IOException { 449 int len = src.remaining(); 450 int totalWritten = 0; 451 synchronized (writeLock) { 452 while (totalWritten < len) { 453 int bytesToWrite = Math.min((len - totalWritten), 454 TRANSFER_SIZE); 455 if (buf.length < bytesToWrite) 456 buf = new byte[bytesToWrite]; 457 src.get(buf, 0, bytesToWrite); 458 try { 459 begin(); 460 out.write(buf, 0, bytesToWrite); 461 } finally { 462 end(bytesToWrite > 0); 463 } 464 totalWritten += bytesToWrite; 465 } 466 return totalWritten; 467 } 468 } 469 470 protected void implCloseChannel() throws IOException { 471 out.close(); 472 open = false; 473 } 474 } 475 476 477 // -- Character streams from channels -- 478 479 /** 480 * Constructs a reader that decodes bytes from the given channel using the 481 * given decoder. 482 * 483 * <p> The resulting stream will contain an internal input buffer of at 484 * least <tt>minBufferCap</tt> bytes. The stream's <tt>read</tt> methods 485 * will, as needed, fill the buffer by reading bytes from the underlying 486 * channel; if the channel is in non-blocking mode when bytes are to be 487 * read then an {@link IllegalBlockingModeException} will be thrown. The 488 * resulting stream will not otherwise be buffered, and it will not support 489 * the {@link Reader#mark mark} or {@link Reader#reset reset} methods. 490 * Closing the stream will in turn cause the channel to be closed. </p> 491 * 492 * @param ch 493 * The channel from which bytes will be read 494 * 495 * @param dec 496 * The charset decoder to be used 497 * 498 * @param minBufferCap 499 * The minimum capacity of the internal byte buffer, 500 * or <tt>-1</tt> if an implementation-dependent 501 * default capacity is to be used 502 * 503 * @return A new reader 504 */ 505 public static Reader newReader(ReadableByteChannel ch, 506 CharsetDecoder dec, 507 int minBufferCap) 508 { 509 checkNotNull(ch, "ch"); 510 return StreamDecoder.forDecoder(ch, dec.reset(), minBufferCap); 511 } 512 513 /** 514 * Constructs a reader that decodes bytes from the given channel according 515 * to the named charset. 516 * 517 * <p> An invocation of this method of the form 518 * 519 * <blockquote><pre> 520 * Channels.newReader(ch, csname)</pre></blockquote> 521 * 522 * behaves in exactly the same way as the expression 523 * 524 * <blockquote><pre> 525 * Channels.newReader(ch, 526 * Charset.forName(csName) 527 * .newDecoder(), 528 * -1);</pre></blockquote> 529 * 530 * @param ch 531 * The channel from which bytes will be read 532 * 533 * @param csName 534 * The name of the charset to be used 535 * 536 * @return A new reader 537 * 538 * @throws UnsupportedCharsetException 539 * If no support for the named charset is available 540 * in this instance of the Java virtual machine 541 */ 542 public static Reader newReader(ReadableByteChannel ch, 543 String csName) 544 { 545 checkNotNull(csName, "csName"); 546 return newReader(ch, Charset.forName(csName).newDecoder(), -1); 547 } 548 549 /** 550 * Constructs a writer that encodes characters using the given encoder and 551 * writes the resulting bytes to the given channel. 552 * 553 * <p> The resulting stream will contain an internal output buffer of at 554 * least <tt>minBufferCap</tt> bytes. The stream's <tt>write</tt> methods 555 * will, as needed, flush the buffer by writing bytes to the underlying 556 * channel; if the channel is in non-blocking mode when bytes are to be 557 * written then an {@link IllegalBlockingModeException} will be thrown. 558 * The resulting stream will not otherwise be buffered. Closing the stream 559 * will in turn cause the channel to be closed. </p> 560 * 561 * @param ch 562 * The channel to which bytes will be written 563 * 564 * @param enc 565 * The charset encoder to be used 566 * 567 * @param minBufferCap 568 * The minimum capacity of the internal byte buffer, 569 * or <tt>-1</tt> if an implementation-dependent 570 * default capacity is to be used 571 * 572 * @return A new writer 573 */ 574 public static Writer newWriter(final WritableByteChannel ch, 575 final CharsetEncoder enc, 576 final int minBufferCap) 577 { 578 checkNotNull(ch, "ch"); 579 return StreamEncoder.forEncoder(ch, enc.reset(), minBufferCap); 580 } 581 582 /** 583 * Constructs a writer that encodes characters according to the named 584 * charset and writes the resulting bytes to the given channel. 585 * 586 * <p> An invocation of this method of the form 587 * 588 * <blockquote><pre> 589 * Channels.newWriter(ch, csname)</pre></blockquote> 590 * 591 * behaves in exactly the same way as the expression 592 * 593 * <blockquote><pre> 594 * Channels.newWriter(ch, 595 * Charset.forName(csName) 596 * .newEncoder(), 597 * -1);</pre></blockquote> 598 * 599 * @param ch 600 * The channel to which bytes will be written 601 * 602 * @param csName 603 * The name of the charset to be used 604 * 605 * @return A new writer 606 * 607 * @throws UnsupportedCharsetException 608 * If no support for the named charset is available 609 * in this instance of the Java virtual machine 610 */ 611 public static Writer newWriter(WritableByteChannel ch, 612 String csName) 613 { 614 checkNotNull(csName, "csName"); 615 return newWriter(ch, Charset.forName(csName).newEncoder(), -1); 616 } 617 }