1 /* 2 * Copyright (c) 2000, 2017, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package java.nio.channels; 27 28 import java.io.FileInputStream; 29 import java.io.FileOutputStream; 30 import java.io.InputStream; 31 import java.io.OutputStream; 32 import java.io.Reader; 33 import java.io.Writer; 34 import java.io.IOException; 35 import java.nio.ByteBuffer; 36 import java.nio.charset.Charset; 37 import java.nio.charset.CharsetDecoder; 38 import java.nio.charset.CharsetEncoder; 39 import java.nio.charset.UnsupportedCharsetException; 40 import java.nio.channels.spi.AbstractInterruptibleChannel; 41 import java.util.Objects; 42 import java.util.concurrent.ExecutionException; 43 import sun.nio.ch.ChannelInputStream; 44 import sun.nio.cs.StreamDecoder; 45 import sun.nio.cs.StreamEncoder; 46 47 48 /** 49 * Utility methods for channels and streams. 50 * 51 * <p> This class defines static methods that support the interoperation of the 52 * stream classes of the {@link java.io} package with the channel classes 53 * of this package. </p> 54 * 55 * 56 * @author Mark Reinhold 57 * @author Mike McCloskey 58 * @author JSR-51 Expert Group 59 * @since 1.4 60 */ 61 62 public final class Channels { 63 64 private Channels() { throw new Error("no instances"); } 65 66 /** 67 * Write all remaining bytes in buffer to the given channel. 68 * If the channel is selectable then it must be configured blocking. 69 */ 70 private static void writeFullyImpl(WritableByteChannel ch, ByteBuffer bb) 71 throws IOException 72 { 73 while (bb.remaining() > 0) { 74 int n = ch.write(bb); 75 if (n <= 0) 76 throw new RuntimeException("no bytes written"); 77 } 78 } 79 80 /** 81 * Write all remaining bytes in buffer to the given channel. 82 * 83 * @throws IllegalBlockingModeException 84 * If the channel is selectable and configured non-blocking. 85 */ 86 private static void writeFully(WritableByteChannel ch, ByteBuffer bb) 87 throws IOException 88 { 89 if (ch instanceof SelectableChannel) { 90 SelectableChannel sc = (SelectableChannel) ch; 91 synchronized (sc.blockingLock()) { 92 if (!sc.isBlocking()) 93 throw new IllegalBlockingModeException(); 94 writeFullyImpl(ch, bb); 95 } 96 } else { 97 writeFullyImpl(ch, bb); 98 } 99 } 100 101 // -- Byte streams from channels -- 102 103 /** 104 * Constructs a stream that reads bytes from the given channel. 105 * 106 * <p> The {@code read} methods of the resulting stream will throw an 107 * {@link IllegalBlockingModeException} if invoked while the underlying 108 * channel is in non-blocking mode. The stream will not be buffered, and 109 * it will not support the {@link InputStream#mark mark} or {@link 110 * InputStream#reset reset} methods. The stream will be safe for access by 111 * multiple concurrent threads. Closing the stream will in turn cause the 112 * channel to be closed. </p> 113 * 114 * @param ch 115 * The channel from which bytes will be read 116 * 117 * @return A new input stream 118 */ 119 public static InputStream newInputStream(ReadableByteChannel ch) { 120 Objects.requireNonNull(ch, "ch"); 121 return new ChannelInputStream(ch); 122 } 123 124 /** 125 * Constructs a stream that writes bytes to the given channel. 126 * 127 * <p> The {@code write} methods of the resulting stream will throw an 128 * {@link IllegalBlockingModeException} if invoked while the underlying 129 * channel is in non-blocking mode. The stream will not be buffered. The 130 * stream will be safe for access by multiple concurrent threads. Closing 131 * the stream will in turn cause the channel to be closed. </p> 132 * 133 * @param ch 134 * The channel to which bytes will be written 135 * 136 * @return A new output stream 137 */ 138 public static OutputStream newOutputStream(WritableByteChannel ch) { 139 Objects.requireNonNull(ch, "ch"); 140 141 return new OutputStream() { 142 143 private ByteBuffer bb; 144 private byte[] bs; // Invoker's previous array 145 private byte[] b1; 146 147 @Override 148 public synchronized void write(int b) throws IOException { 149 if (b1 == null) 150 b1 = new byte[1]; 151 b1[0] = (byte) b; 152 this.write(b1); 153 } 154 155 @Override 156 public synchronized void write(byte[] bs, int off, int len) 157 throws IOException 158 { 159 if ((off < 0) || (off > bs.length) || (len < 0) || 160 ((off + len) > bs.length) || ((off + len) < 0)) { 161 throw new IndexOutOfBoundsException(); 162 } else if (len == 0) { 163 return; 164 } 165 ByteBuffer bb = ((this.bs == bs) 166 ? this.bb 167 : ByteBuffer.wrap(bs)); 168 bb.limit(Math.min(off + len, bb.capacity())); 169 bb.position(off); 170 this.bb = bb; 171 this.bs = bs; 172 Channels.writeFully(ch, bb); 173 } 174 175 @Override 176 public void close() throws IOException { 177 ch.close(); 178 } 179 180 }; 181 } 182 183 /** 184 * Constructs a stream that reads bytes from the given channel. 185 * 186 * <p> The stream will not be buffered, and it will not support the {@link 187 * InputStream#mark mark} or {@link InputStream#reset reset} methods. The 188 * stream will be safe for access by multiple concurrent threads. Closing 189 * the stream will in turn cause the channel to be closed. </p> 190 * 191 * @param ch 192 * The channel from which bytes will be read 193 * 194 * @return A new input stream 195 * 196 * @since 1.7 197 */ 198 public static InputStream newInputStream(AsynchronousByteChannel ch) { 199 Objects.requireNonNull(ch, "ch"); 200 return new InputStream() { 201 202 private ByteBuffer bb; 203 private byte[] bs; // Invoker's previous array 204 private byte[] b1; 205 206 @Override 207 public synchronized int read() throws IOException { 208 if (b1 == null) 209 b1 = new byte[1]; 210 int n = this.read(b1); 211 if (n == 1) 212 return b1[0] & 0xff; 213 return -1; 214 } 215 216 @Override 217 public synchronized int read(byte[] bs, int off, int len) 218 throws IOException 219 { 220 if ((off < 0) || (off > bs.length) || (len < 0) || 221 ((off + len) > bs.length) || ((off + len) < 0)) { 222 throw new IndexOutOfBoundsException(); 223 } else if (len == 0) { 224 return 0; 225 } 226 227 ByteBuffer bb = ((this.bs == bs) 228 ? this.bb 229 : ByteBuffer.wrap(bs)); 230 bb.position(off); 231 bb.limit(Math.min(off + len, bb.capacity())); 232 this.bb = bb; 233 this.bs = bs; 234 235 boolean interrupted = false; 236 try { 237 for (;;) { 238 try { 239 return ch.read(bb).get(); 240 } catch (ExecutionException ee) { 241 throw new IOException(ee.getCause()); 242 } catch (InterruptedException ie) { 243 interrupted = true; 244 } 245 } 246 } finally { 247 if (interrupted) 248 Thread.currentThread().interrupt(); 249 } 250 } 251 252 @Override 253 public void close() throws IOException { 254 ch.close(); 255 } 256 }; 257 } 258 259 /** 260 * Constructs a stream that writes bytes to the given channel. 261 * 262 * <p> The stream will not be buffered. The stream will be safe for access 263 * by multiple concurrent threads. Closing the stream will in turn cause 264 * the channel to be closed. </p> 265 * 266 * @param ch 267 * The channel to which bytes will be written 268 * 269 * @return A new output stream 270 * 271 * @since 1.7 272 */ 273 public static OutputStream newOutputStream(AsynchronousByteChannel ch) { 274 Objects.requireNonNull(ch, "ch"); 275 return new OutputStream() { 276 277 private ByteBuffer bb; 278 private byte[] bs; // Invoker's previous array 279 private byte[] b1; 280 281 @Override 282 public synchronized void write(int b) throws IOException { 283 if (b1 == null) 284 b1 = new byte[1]; 285 b1[0] = (byte) b; 286 this.write(b1); 287 } 288 289 @Override 290 public synchronized void write(byte[] bs, int off, int len) 291 throws IOException 292 { 293 if ((off < 0) || (off > bs.length) || (len < 0) || 294 ((off + len) > bs.length) || ((off + len) < 0)) { 295 throw new IndexOutOfBoundsException(); 296 } else if (len == 0) { 297 return; 298 } 299 ByteBuffer bb = ((this.bs == bs) 300 ? this.bb 301 : ByteBuffer.wrap(bs)); 302 bb.limit(Math.min(off + len, bb.capacity())); 303 bb.position(off); 304 this.bb = bb; 305 this.bs = bs; 306 307 boolean interrupted = false; 308 try { 309 while (bb.remaining() > 0) { 310 try { 311 ch.write(bb).get(); 312 } catch (ExecutionException ee) { 313 throw new IOException(ee.getCause()); 314 } catch (InterruptedException ie) { 315 interrupted = true; 316 } 317 } 318 } finally { 319 if (interrupted) 320 Thread.currentThread().interrupt(); 321 } 322 } 323 324 @Override 325 public void close() throws IOException { 326 ch.close(); 327 } 328 }; 329 } 330 331 332 // -- Channels from streams -- 333 334 /** 335 * Constructs a channel that reads bytes from the given stream. 336 * 337 * <p> The resulting channel will not be buffered; it will simply redirect 338 * its I/O operations to the given stream. Closing the channel will in 339 * turn cause the stream to be closed. </p> 340 * 341 * @param in 342 * The stream from which bytes are to be read 343 * 344 * @return A new readable byte channel 345 */ 346 public static ReadableByteChannel newChannel(InputStream in) { 347 Objects.requireNonNull(in, "in"); 348 349 if (in.getClass() == FileInputStream.class) { 350 return ((FileInputStream) in).getChannel(); 351 } 352 353 return new ReadableByteChannelImpl(in); 354 } 355 356 private static class ReadableByteChannelImpl 357 extends AbstractInterruptibleChannel // Not really interruptible 358 implements ReadableByteChannel 359 { 360 private final InputStream in; 361 private static final int TRANSFER_SIZE = 8192; 362 private byte[] buf = new byte[0]; 363 private final Object readLock = new Object(); 364 365 ReadableByteChannelImpl(InputStream in) { 366 this.in = in; 367 } 368 369 @Override 370 public int read(ByteBuffer dst) throws IOException { 371 if (!isOpen()) { 372 throw new ClosedChannelException(); 373 } 374 375 int len = dst.remaining(); 376 int totalRead = 0; 377 int bytesRead = 0; 378 synchronized (readLock) { 379 while (totalRead < len) { 380 int bytesToRead = Math.min((len - totalRead), 381 TRANSFER_SIZE); 382 if (buf.length < bytesToRead) 383 buf = new byte[bytesToRead]; 384 if ((totalRead > 0) && !(in.available() > 0)) 385 break; // block at most once 386 try { 387 begin(); 388 bytesRead = in.read(buf, 0, bytesToRead); 389 } finally { 390 end(bytesRead > 0); 391 } 392 if (bytesRead < 0) 393 break; 394 else 395 totalRead += bytesRead; 396 dst.put(buf, 0, bytesRead); 397 } 398 if ((bytesRead < 0) && (totalRead == 0)) 399 return -1; 400 401 return totalRead; 402 } 403 } 404 405 @Override 406 protected void implCloseChannel() throws IOException { 407 in.close(); 408 } 409 } 410 411 412 /** 413 * Constructs a channel that writes bytes to the given stream. 414 * 415 * <p> The resulting channel will not be buffered; it will simply redirect 416 * its I/O operations to the given stream. Closing the channel will in 417 * turn cause the stream to be closed. </p> 418 * 419 * @param out 420 * The stream to which bytes are to be written 421 * 422 * @return A new writable byte channel 423 */ 424 public static WritableByteChannel newChannel(OutputStream out) { 425 Objects.requireNonNull(out, "out"); 426 427 if (out.getClass() == FileOutputStream.class) { 428 return ((FileOutputStream) out).getChannel(); 429 } 430 431 return new WritableByteChannelImpl(out); 432 } 433 434 private static class WritableByteChannelImpl 435 extends AbstractInterruptibleChannel // Not really interruptible 436 implements WritableByteChannel 437 { 438 private final OutputStream out; 439 private static final int TRANSFER_SIZE = 8192; 440 private byte[] buf = new byte[0]; 441 private final Object writeLock = new Object(); 442 443 WritableByteChannelImpl(OutputStream out) { 444 this.out = out; 445 } 446 447 @Override 448 public int write(ByteBuffer src) throws IOException { 449 if (!isOpen()) { 450 throw new ClosedChannelException(); 451 } 452 453 int len = src.remaining(); 454 int totalWritten = 0; 455 synchronized (writeLock) { 456 while (totalWritten < len) { 457 int bytesToWrite = Math.min((len - totalWritten), 458 TRANSFER_SIZE); 459 if (buf.length < bytesToWrite) 460 buf = new byte[bytesToWrite]; 461 src.get(buf, 0, bytesToWrite); 462 try { 463 begin(); 464 out.write(buf, 0, bytesToWrite); 465 } finally { 466 end(bytesToWrite > 0); 467 } 468 totalWritten += bytesToWrite; 469 } 470 return totalWritten; 471 } 472 } 473 474 @Override 475 protected void implCloseChannel() throws IOException { 476 out.close(); 477 } 478 } 479 480 481 // -- Character streams from channels -- 482 483 /** 484 * Constructs a reader that decodes bytes from the given channel using the 485 * given decoder. 486 * 487 * <p> The resulting stream will contain an internal input buffer of at 488 * least {@code minBufferCap} bytes. The stream's {@code read} methods 489 * will, as needed, fill the buffer by reading bytes from the underlying 490 * channel; if the channel is in non-blocking mode when bytes are to be 491 * read then an {@link IllegalBlockingModeException} will be thrown. The 492 * resulting stream will not otherwise be buffered, and it will not support 493 * the {@link Reader#mark mark} or {@link Reader#reset reset} methods. 494 * Closing the stream will in turn cause the channel to be closed. </p> 495 * 496 * @param ch 497 * The channel from which bytes will be read 498 * 499 * @param dec 500 * The charset decoder to be used 501 * 502 * @param minBufferCap 503 * The minimum capacity of the internal byte buffer, 504 * or {@code -1} if an implementation-dependent 505 * default capacity is to be used 506 * 507 * @return A new reader 508 */ 509 public static Reader newReader(ReadableByteChannel ch, 510 CharsetDecoder dec, 511 int minBufferCap) 512 { 513 Objects.requireNonNull(ch, "ch"); 514 return StreamDecoder.forDecoder(ch, dec.reset(), minBufferCap); 515 } 516 517 /** 518 * Constructs a reader that decodes bytes from the given channel according 519 * to the named charset. 520 * 521 * <p> An invocation of this method of the form 522 * 523 * <pre> {@code 524 * Channels.newReader(ch, csname) 525 * } </pre> 526 * 527 * behaves in exactly the same way as the expression 528 * 529 * <pre> {@code 530 * Channels.newReader(ch, Charset.forName(csName)) 531 * } </pre> 532 * 533 * @param ch 534 * The channel from which bytes will be read 535 * 536 * @param csName 537 * The name of the charset to be used 538 * 539 * @return A new reader 540 * 541 * @throws UnsupportedCharsetException 542 * If no support for the named charset is available 543 * in this instance of the Java virtual machine 544 */ 545 public static Reader newReader(ReadableByteChannel ch, 546 String csName) 547 { 548 Objects.requireNonNull(csName, "csName"); 549 return newReader(ch, Charset.forName(csName).newDecoder(), -1); 550 } 551 552 /** 553 * Constructs a reader that decodes bytes from the given channel according 554 * to the given charset. 555 * 556 * <p> An invocation of this method of the form 557 * 558 * <pre> {@code 559 * Channels.newReader(ch, charset) 560 * } </pre> 561 * 562 * behaves in exactly the same way as the expression 563 * 564 * <pre> {@code 565 * Channels.newReader(ch, Charset.forName(csName).newDecoder(), -1) 566 * } </pre> 567 * 568 * <p> The reader's default action for malformed-input and unmappable-character 569 * errors is to {@linkplain java.nio.charset.CodingErrorAction#REPORT report} 570 * them. When more control over the error handling is required, the constructor 571 * that takes a {@linkplain java.nio.charset.CharsetDecoder} should be used. 572 * 573 * @param ch The channel from which bytes will be read 574 * 575 * @param charset The charset to be used 576 * 577 * @return A new reader 578 */ 579 public static Reader newReader(ReadableByteChannel ch, Charset charset) { 580 Objects.requireNonNull(charset, "charset"); 581 return newReader(ch, charset.newDecoder(), -1); 582 } 583 584 /** 585 * Constructs a writer that encodes characters using the given encoder and 586 * writes the resulting bytes to the given channel. 587 * 588 * <p> The resulting stream will contain an internal output buffer of at 589 * least {@code minBufferCap} bytes. The stream's {@code write} methods 590 * will, as needed, flush the buffer by writing bytes to the underlying 591 * channel; if the channel is in non-blocking mode when bytes are to be 592 * written then an {@link IllegalBlockingModeException} will be thrown. 593 * The resulting stream will not otherwise be buffered. Closing the stream 594 * will in turn cause the channel to be closed. </p> 595 * 596 * @param ch 597 * The channel to which bytes will be written 598 * 599 * @param enc 600 * The charset encoder to be used 601 * 602 * @param minBufferCap 603 * The minimum capacity of the internal byte buffer, 604 * or {@code -1} if an implementation-dependent 605 * default capacity is to be used 606 * 607 * @return A new writer 608 */ 609 public static Writer newWriter(WritableByteChannel ch, 610 CharsetEncoder enc, 611 int minBufferCap) 612 { 613 Objects.requireNonNull(ch, "ch"); 614 return StreamEncoder.forEncoder(ch, enc.reset(), minBufferCap); 615 } 616 617 /** 618 * Constructs a writer that encodes characters according to the named 619 * charset and writes the resulting bytes to the given channel. 620 * 621 * <p> An invocation of this method of the form 622 * 623 * <pre> {@code 624 * Channels.newWriter(ch, csname) 625 * } </pre> 626 * 627 * behaves in exactly the same way as the expression 628 * 629 * <pre> {@code 630 * Channels.newWriter(ch, Charset.forName(csName)) 631 * } </pre> 632 * 633 * @param ch 634 * The channel to which bytes will be written 635 * 636 * @param csName 637 * The name of the charset to be used 638 * 639 * @return A new writer 640 * 641 * @throws UnsupportedCharsetException 642 * If no support for the named charset is available 643 * in this instance of the Java virtual machine 644 */ 645 public static Writer newWriter(WritableByteChannel ch, 646 String csName) 647 { 648 Objects.requireNonNull(csName, "csName"); 649 return newWriter(ch, Charset.forName(csName).newEncoder(), -1); 650 } 651 652 /** 653 * Constructs a writer that encodes characters according to the given 654 * charset and writes the resulting bytes to the given channel. 655 * 656 * <p> An invocation of this method of the form 657 * 658 * <pre> {@code 659 * Channels.newWriter(ch, charset) 660 * } </pre> 661 * 662 * behaves in exactly the same way as the expression 663 * 664 * <pre> {@code 665 * Channels.newWriter(ch, Charset.forName(csName).newEncoder(), -1) 666 * } </pre> 667 * 668 * <p> The writer's default action for malformed-input and unmappable-character 669 * errors is to {@linkplain java.nio.charset.CodingErrorAction#REPORT report} 670 * them. When more control over the error handling is required, the constructor 671 * that takes a {@linkplain java.nio.charset.CharsetEncoder} should be used. 672 * 673 * @param ch 674 * The channel to which bytes will be written 675 * 676 * @param charset 677 * The charset to be used 678 * 679 * @return A new writer 680 */ 681 public static Writer newWriter(WritableByteChannel ch, Charset charset) { 682 Objects.requireNonNull(charset, "charset"); 683 return newWriter(ch, charset.newEncoder(), -1); 684 } 685 }