1 /* 2 * Copyright (c) 2000, 2009, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package java.nio.channels; 27 28 import java.io.FileInputStream; 29 import java.io.FileOutputStream; 30 import java.io.InputStream; 31 import java.io.OutputStream; 32 import java.io.Reader; 33 import java.io.Writer; 34 import java.io.IOException; 35 import java.nio.ByteBuffer; 36 import java.nio.charset.Charset; 37 import java.nio.charset.CharsetDecoder; 38 import java.nio.charset.CharsetEncoder; 39 import java.nio.charset.UnsupportedCharsetException; 40 import java.nio.channels.spi.AbstractInterruptibleChannel; 41 import java.util.concurrent.ExecutionException; 42 import sun.nio.ch.ChannelInputStream; 43 import sun.nio.cs.StreamDecoder; 44 import sun.nio.cs.StreamEncoder; 45 46 47 /** 48 * Utility methods for channels and streams. 49 * 50 * <p> This class defines static methods that support the interoperation of the 51 * stream classes of the <tt>{@link java.io}</tt> package with the channel 52 * classes of this package. </p> 53 * 54 * 55 * @author Mark Reinhold 56 * @author Mike McCloskey 57 * @author JSR-51 Expert Group 58 * @since 1.4 59 */ 60 61 public final class Channels { 62 63 private Channels() { } // No instantiation 64 65 private static void checkNotNull(Object o, String name) { 66 if (o == null) 67 throw new NullPointerException("\"" + name + "\" is null!"); 68 } 69 70 /** 71 * Write all remaining bytes in buffer to the given channel. 72 * If the channel is selectable then it must be configured blocking. 73 */ 74 private static void writeFullyImpl(WritableByteChannel ch, ByteBuffer bb) 75 throws IOException 76 { 77 while (bb.remaining() > 0) { 78 int n = ch.write(bb); 79 if (n <= 0) 80 throw new RuntimeException("no bytes written"); 81 } 82 } 83 84 /** 85 * Write all remaining bytes in buffer to the given channel. 86 * 87 * @throws IllegalBlockingException 88 * If the channel is selectable and configured non-blocking. 89 */ 90 private static void writeFully(WritableByteChannel ch, ByteBuffer bb) 91 throws IOException 92 { 93 if (ch instanceof SelectableChannel) { 94 SelectableChannel sc = (SelectableChannel)ch; 95 synchronized (sc.blockingLock()) { 96 if (!sc.isBlocking()) 97 throw new IllegalBlockingModeException(); 98 writeFullyImpl(ch, bb); 99 } 100 } else { 101 writeFullyImpl(ch, bb); 102 } 103 } 104 105 // -- Byte streams from channels -- 106 107 /** 108 * Constructs a stream that reads bytes from the given channel. 109 * 110 * <p> The <tt>read</tt> methods of the resulting stream will throw an 111 * {@link IllegalBlockingModeException} if invoked while the underlying 112 * channel is in non-blocking mode. The stream will not be buffered, and 113 * it will not support the {@link InputStream#mark mark} or {@link 114 * InputStream#reset reset} methods. The stream will be safe for access by 115 * multiple concurrent threads. Closing the stream will in turn cause the 116 * channel to be closed. </p> 117 * 118 * @param ch 119 * The channel from which bytes will be read 120 * 121 * @return A new input stream 122 */ 123 public static InputStream newInputStream(ReadableByteChannel ch) { 124 checkNotNull(ch, "ch"); 125 return new sun.nio.ch.ChannelInputStream(ch); 126 } 127 128 /** 129 * Constructs a stream that writes bytes to the given channel. 130 * 131 * <p> The <tt>write</tt> methods of the resulting stream will throw an 132 * {@link IllegalBlockingModeException} if invoked while the underlying 133 * channel is in non-blocking mode. The stream will not be buffered. The 134 * stream will be safe for access by multiple concurrent threads. Closing 135 * the stream will in turn cause the channel to be closed. </p> 136 * 137 * @param ch 138 * The channel to which bytes will be written 139 * 140 * @return A new output stream 141 */ 142 public static OutputStream newOutputStream(final WritableByteChannel ch) { 143 checkNotNull(ch, "ch"); 144 145 return new OutputStream() { 146 147 private ByteBuffer bb = null; 148 private byte[] bs = null; // Invoker's previous array 149 private byte[] b1 = null; 150 151 public synchronized void write(int b) throws IOException { 152 if (b1 == null) 153 b1 = new byte[1]; 154 b1[0] = (byte)b; 155 this.write(b1); 156 } 157 158 public synchronized void write(byte[] bs, int off, int len) 159 throws IOException 160 { 161 if ((off < 0) || (off > bs.length) || (len < 0) || 162 ((off + len) > bs.length) || ((off + len) < 0)) { 163 throw new IndexOutOfBoundsException(); 164 } else if (len == 0) { 165 return; 166 } 167 ByteBuffer bb = ((this.bs == bs) 168 ? this.bb 169 : ByteBuffer.wrap(bs)); 170 bb.limit(Math.min(off + len, bb.capacity())); 171 bb.position(off); 172 this.bb = bb; 173 this.bs = bs; 174 Channels.writeFully(ch, bb); 175 } 176 177 public void close() throws IOException { 178 ch.close(); 179 } 180 181 }; 182 } 183 184 /** 185 * Constructs a stream that reads bytes from the given channel. 186 * 187 * <p> The stream will not be buffered, and it will not support the {@link 188 * InputStream#mark mark} or {@link InputStream#reset reset} methods. The 189 * stream will be safe for access by multiple concurrent threads. Closing 190 * the stream will in turn cause the channel to be closed. </p> 191 * 192 * @param ch 193 * The channel from which bytes will be read 194 * 195 * @return A new input stream 196 * 197 * @since 1.7 198 */ 199 public static InputStream newInputStream(final AsynchronousByteChannel ch) { 200 checkNotNull(ch, "ch"); 201 return new InputStream() { 202 203 private ByteBuffer bb = null; 204 private byte[] bs = null; // Invoker's previous array 205 private byte[] b1 = null; 206 207 @Override 208 public synchronized int read() throws IOException { 209 if (b1 == null) 210 b1 = new byte[1]; 211 int n = this.read(b1); 212 if (n == 1) 213 return b1[0] & 0xff; 214 return -1; 215 } 216 217 @Override 218 public synchronized int read(byte[] bs, int off, int len) 219 throws IOException 220 { 221 if ((off < 0) || (off > bs.length) || (len < 0) || 222 ((off + len) > bs.length) || ((off + len) < 0)) { 223 throw new IndexOutOfBoundsException(); 224 } else if (len == 0) 225 return 0; 226 227 ByteBuffer bb = ((this.bs == bs) 228 ? this.bb 229 : ByteBuffer.wrap(bs)); 230 bb.position(off); 231 bb.limit(Math.min(off + len, bb.capacity())); 232 this.bb = bb; 233 this.bs = bs; 234 235 boolean interrupted = false; 236 try { 237 for (;;) { 238 try { 239 return ch.read(bb).get(); 240 } catch (ExecutionException ee) { 241 throw new IOException(ee.getCause()); 242 } catch (InterruptedException ie) { 243 interrupted = true; 244 } 245 } 246 } finally { 247 if (interrupted) 248 Thread.currentThread().interrupt(); 249 } 250 } 251 252 @Override 253 public void close() throws IOException { 254 ch.close(); 255 } 256 }; 257 } 258 259 /** 260 * Constructs a stream that writes bytes to the given channel. 261 * 262 * <p> The stream will not be buffered. The stream will be safe for access 263 * by multiple concurrent threads. Closing the stream will in turn cause 264 * the channel to be closed. </p> 265 * 266 * @param ch 267 * The channel to which bytes will be written 268 * 269 * @return A new output stream 270 * 271 * @since 1.7 272 */ 273 public static OutputStream newOutputStream(final AsynchronousByteChannel ch) { 274 checkNotNull(ch, "ch"); 275 return new OutputStream() { 276 277 private ByteBuffer bb = null; 278 private byte[] bs = null; // Invoker's previous array 279 private byte[] b1 = null; 280 281 @Override 282 public synchronized void write(int b) throws IOException { 283 if (b1 == null) 284 b1 = new byte[1]; 285 b1[0] = (byte)b; 286 this.write(b1); 287 } 288 289 @Override 290 public synchronized void write(byte[] bs, int off, int len) 291 throws IOException 292 { 293 if ((off < 0) || (off > bs.length) || (len < 0) || 294 ((off + len) > bs.length) || ((off + len) < 0)) { 295 throw new IndexOutOfBoundsException(); 296 } else if (len == 0) { 297 return; 298 } 299 ByteBuffer bb = ((this.bs == bs) 300 ? this.bb 301 : ByteBuffer.wrap(bs)); 302 bb.limit(Math.min(off + len, bb.capacity())); 303 bb.position(off); 304 this.bb = bb; 305 this.bs = bs; 306 307 boolean interrupted = false; 308 try { 309 while (bb.remaining() > 0) { 310 try { 311 ch.write(bb).get(); 312 } catch (ExecutionException ee) { 313 throw new IOException(ee.getCause()); 314 } catch (InterruptedException ie) { 315 interrupted = true; 316 } 317 } 318 } finally { 319 if (interrupted) 320 Thread.currentThread().interrupt(); 321 } 322 } 323 324 @Override 325 public void close() throws IOException { 326 ch.close(); 327 } 328 }; 329 } 330 331 332 // -- Channels from streams -- 333 334 /** 335 * Constructs a channel that reads bytes from the given stream. 336 * 337 * <p> The resulting channel will not be buffered; it will simply redirect 338 * its I/O operations to the given stream. Closing the channel will in 339 * turn cause the stream to be closed. </p> 340 * 341 * @param in 342 * The stream from which bytes are to be read 343 * 344 * @return A new readable byte channel 345 */ 346 public static ReadableByteChannel newChannel(final InputStream in) { 347 checkNotNull(in, "in"); 348 349 if (in instanceof FileInputStream && 350 FileInputStream.class.equals(in.getClass())) { 351 return ((FileInputStream)in).getChannel(); 352 } 353 354 return new ReadableByteChannelImpl(in); 355 } 356 357 private static class ReadableByteChannelImpl 358 extends AbstractInterruptibleChannel // Not really interruptible 359 implements ReadableByteChannel 360 { 361 InputStream in; 362 private static final int TRANSFER_SIZE = 8192; 363 private byte buf[] = new byte[0]; 364 private boolean open = true; 365 private Object readLock = new Object(); 366 367 ReadableByteChannelImpl(InputStream in) { 368 this.in = in; 369 } 370 371 public int read(ByteBuffer dst) throws IOException { 372 int len = dst.remaining(); 373 int totalRead = 0; 374 int bytesRead = 0; 375 synchronized (readLock) { 376 while (totalRead < len) { 377 int bytesToRead = Math.min((len - totalRead), 378 TRANSFER_SIZE); 379 if (buf.length < bytesToRead) 380 buf = new byte[bytesToRead]; 381 if ((totalRead > 0) && !(in.available() > 0)) 382 break; // block at most once 383 try { 384 begin(); 385 bytesRead = in.read(buf, 0, bytesToRead); 386 } finally { 387 end(bytesRead > 0); 388 } 389 if (bytesRead < 0) 390 break; 391 else 392 totalRead += bytesRead; 393 dst.put(buf, 0, bytesRead); 394 } 395 if ((bytesRead < 0) && (totalRead == 0)) 396 return -1; 397 398 return totalRead; 399 } 400 } 401 402 protected void implCloseChannel() throws IOException { 403 in.close(); 404 open = false; 405 } 406 } 407 408 409 /** 410 * Constructs a channel that writes bytes to the given stream. 411 * 412 * <p> The resulting channel will not be buffered; it will simply redirect 413 * its I/O operations to the given stream. Closing the channel will in 414 * turn cause the stream to be closed. </p> 415 * 416 * @param out 417 * The stream to which bytes are to be written 418 * 419 * @return A new writable byte channel 420 */ 421 public static WritableByteChannel newChannel(final OutputStream out) { 422 checkNotNull(out, "out"); 423 424 if (out instanceof FileOutputStream && 425 FileOutputStream.class.equals(out.getClass())) { 426 return ((FileOutputStream)out).getChannel(); 427 } 428 429 return new WritableByteChannelImpl(out); 430 } 431 432 private static class WritableByteChannelImpl 433 extends AbstractInterruptibleChannel // Not really interruptible 434 implements WritableByteChannel 435 { 436 OutputStream out; 437 private static final int TRANSFER_SIZE = 8192; 438 private byte buf[] = new byte[0]; 439 private boolean open = true; 440 private Object writeLock = new Object(); 441 442 WritableByteChannelImpl(OutputStream out) { 443 this.out = out; 444 } 445 446 public int write(ByteBuffer src) throws IOException { 447 int len = src.remaining(); 448 int totalWritten = 0; 449 synchronized (writeLock) { 450 while (totalWritten < len) { 451 int bytesToWrite = Math.min((len - totalWritten), 452 TRANSFER_SIZE); 453 if (buf.length < bytesToWrite) 454 buf = new byte[bytesToWrite]; 455 src.get(buf, 0, bytesToWrite); 456 try { 457 begin(); 458 out.write(buf, 0, bytesToWrite); 459 } finally { 460 end(bytesToWrite > 0); 461 } 462 totalWritten += bytesToWrite; 463 } 464 return totalWritten; 465 } 466 } 467 468 protected void implCloseChannel() throws IOException { 469 out.close(); 470 open = false; 471 } 472 } 473 474 475 // -- Character streams from channels -- 476 477 /** 478 * Constructs a reader that decodes bytes from the given channel using the 479 * given decoder. 480 * 481 * <p> The resulting stream will contain an internal input buffer of at 482 * least <tt>minBufferCap</tt> bytes. The stream's <tt>read</tt> methods 483 * will, as needed, fill the buffer by reading bytes from the underlying 484 * channel; if the channel is in non-blocking mode when bytes are to be 485 * read then an {@link IllegalBlockingModeException} will be thrown. The 486 * resulting stream will not otherwise be buffered, and it will not support 487 * the {@link Reader#mark mark} or {@link Reader#reset reset} methods. 488 * Closing the stream will in turn cause the channel to be closed. </p> 489 * 490 * @param ch 491 * The channel from which bytes will be read 492 * 493 * @param dec 494 * The charset decoder to be used 495 * 496 * @param minBufferCap 497 * The minimum capacity of the internal byte buffer, 498 * or <tt>-1</tt> if an implementation-dependent 499 * default capacity is to be used 500 * 501 * @return A new reader 502 */ 503 public static Reader newReader(ReadableByteChannel ch, 504 CharsetDecoder dec, 505 int minBufferCap) 506 { 507 checkNotNull(ch, "ch"); 508 return StreamDecoder.forDecoder(ch, dec.reset(), minBufferCap); 509 } 510 511 /** 512 * Constructs a reader that decodes bytes from the given channel according 513 * to the named charset. 514 * 515 * <p> An invocation of this method of the form 516 * 517 * <blockquote><pre> 518 * Channels.newReader(ch, csname)</pre></blockquote> 519 * 520 * behaves in exactly the same way as the expression 521 * 522 * <blockquote><pre> 523 * Channels.newReader(ch, 524 * Charset.forName(csName) 525 * .newDecoder(), 526 * -1);</pre></blockquote> 527 * 528 * @param ch 529 * The channel from which bytes will be read 530 * 531 * @param csName 532 * The name of the charset to be used 533 * 534 * @return A new reader 535 * 536 * @throws UnsupportedCharsetException 537 * If no support for the named charset is available 538 * in this instance of the Java virtual machine 539 */ 540 public static Reader newReader(ReadableByteChannel ch, 541 String csName) 542 { 543 checkNotNull(csName, "csName"); 544 return newReader(ch, Charset.forName(csName).newDecoder(), -1); 545 } 546 547 /** 548 * Constructs a writer that encodes characters using the given encoder and 549 * writes the resulting bytes to the given channel. 550 * 551 * <p> The resulting stream will contain an internal output buffer of at 552 * least <tt>minBufferCap</tt> bytes. The stream's <tt>write</tt> methods 553 * will, as needed, flush the buffer by writing bytes to the underlying 554 * channel; if the channel is in non-blocking mode when bytes are to be 555 * written then an {@link IllegalBlockingModeException} will be thrown. 556 * The resulting stream will not otherwise be buffered. Closing the stream 557 * will in turn cause the channel to be closed. </p> 558 * 559 * @param ch 560 * The channel to which bytes will be written 561 * 562 * @param enc 563 * The charset encoder to be used 564 * 565 * @param minBufferCap 566 * The minimum capacity of the internal byte buffer, 567 * or <tt>-1</tt> if an implementation-dependent 568 * default capacity is to be used 569 * 570 * @return A new writer 571 */ 572 public static Writer newWriter(final WritableByteChannel ch, 573 final CharsetEncoder enc, 574 final int minBufferCap) 575 { 576 checkNotNull(ch, "ch"); 577 return StreamEncoder.forEncoder(ch, enc.reset(), minBufferCap); 578 } 579 580 /** 581 * Constructs a writer that encodes characters according to the named 582 * charset and writes the resulting bytes to the given channel. 583 * 584 * <p> An invocation of this method of the form 585 * 586 * <blockquote><pre> 587 * Channels.newWriter(ch, csname)</pre></blockquote> 588 * 589 * behaves in exactly the same way as the expression 590 * 591 * <blockquote><pre> 592 * Channels.newWriter(ch, 593 * Charset.forName(csName) 594 * .newEncoder(), 595 * -1);</pre></blockquote> 596 * 597 * @param ch 598 * The channel to which bytes will be written 599 * 600 * @param csName 601 * The name of the charset to be used 602 * 603 * @return A new writer 604 * 605 * @throws UnsupportedCharsetException 606 * If no support for the named charset is available 607 * in this instance of the Java virtual machine 608 */ 609 public static Writer newWriter(WritableByteChannel ch, 610 String csName) 611 { 612 checkNotNull(csName, "csName"); 613 return newWriter(ch, Charset.forName(csName).newEncoder(), -1); 614 } 615 }