1 /*
   2  * Copyright (c) 2000, 2017, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package java.nio.channels;
  27 
  28 import java.io.FileInputStream;
  29 import java.io.FileOutputStream;
  30 import java.io.InputStream;
  31 import java.io.OutputStream;
  32 import java.io.Reader;
  33 import java.io.Writer;
  34 import java.io.IOException;
  35 import java.nio.ByteBuffer;
  36 import java.nio.charset.Charset;
  37 import java.nio.charset.CharsetDecoder;
  38 import java.nio.charset.CharsetEncoder;
  39 import java.nio.charset.UnsupportedCharsetException;
  40 import java.nio.channels.spi.AbstractInterruptibleChannel;
  41 import java.util.Objects;
  42 import java.util.concurrent.ExecutionException;
  43 import sun.nio.ch.ChannelInputStream;
  44 import sun.nio.cs.StreamDecoder;
  45 import sun.nio.cs.StreamEncoder;
  46 
  47 
  48 /**
  49  * Utility methods for channels and streams.
  50  *
  51  * <p> This class defines static methods that support the interoperation of the
  52  * stream classes of the {@link java.io} package with the channel classes
  53  * of this package.  </p>
  54  *
  55  *
  56  * @author Mark Reinhold
  57  * @author Mike McCloskey
  58  * @author JSR-51 Expert Group
  59  * @since 1.4
  60  */
  61 
  62 public final class Channels {
  63 
  64     private Channels() { throw new Error("no instances"); }
  65 
  66     /**
  67      * Write all remaining bytes in buffer to the given channel.
  68      * If the channel is selectable then it must be configured blocking.
  69      */
  70     private static void writeFullyImpl(WritableByteChannel ch, ByteBuffer bb)
  71         throws IOException
  72     {
  73         while (bb.remaining() > 0) {
  74             int n = ch.write(bb);
  75             if (n <= 0)
  76                 throw new RuntimeException("no bytes written");
  77         }
  78     }
  79 
  80     /**
  81      * Write all remaining bytes in buffer to the given channel.
  82      *
  83      * @throws  IllegalBlockingModeException
  84      *          If the channel is selectable and configured non-blocking.
  85      */
  86     private static void writeFully(WritableByteChannel ch, ByteBuffer bb)
  87         throws IOException
  88     {
  89         if (ch instanceof SelectableChannel) {
  90             SelectableChannel sc = (SelectableChannel) ch;
  91             synchronized (sc.blockingLock()) {
  92                 if (!sc.isBlocking())
  93                     throw new IllegalBlockingModeException();
  94                 writeFullyImpl(ch, bb);
  95             }
  96         } else {
  97             writeFullyImpl(ch, bb);
  98         }
  99     }
 100 
 101     // -- Byte streams from channels --
 102 
 103     /**
 104      * Constructs a stream that reads bytes from the given channel.
 105      *
 106      * <p> The {@code read} methods of the resulting stream will throw an
 107      * {@link IllegalBlockingModeException} if invoked while the underlying
 108      * channel is in non-blocking mode.  The stream will not be buffered, and
 109      * it will not support the {@link InputStream#mark mark} or {@link
 110      * InputStream#reset reset} methods.  The stream will be safe for access by
 111      * multiple concurrent threads.  Closing the stream will in turn cause the
 112      * channel to be closed.  </p>
 113      *
 114      * @param  ch
 115      *         The channel from which bytes will be read
 116      *
 117      * @return  A new input stream
 118      */
 119     public static InputStream newInputStream(ReadableByteChannel ch) {
 120         Objects.requireNonNull(ch, "ch");
 121         return new ChannelInputStream(ch);
 122     }
 123 
 124     /**
 125      * Constructs a stream that writes bytes to the given channel.
 126      *
 127      * <p> The {@code write} methods of the resulting stream will throw an
 128      * {@link IllegalBlockingModeException} if invoked while the underlying
 129      * channel is in non-blocking mode.  The stream will not be buffered.  The
 130      * stream will be safe for access by multiple concurrent threads.  Closing
 131      * the stream will in turn cause the channel to be closed.  </p>
 132      *
 133      * @param  ch
 134      *         The channel to which bytes will be written
 135      *
 136      * @return  A new output stream
 137      */
 138     public static OutputStream newOutputStream(WritableByteChannel ch) {
 139         Objects.requireNonNull(ch, "ch");
 140 
 141         return new OutputStream() {
 142 
 143             private ByteBuffer bb;
 144             private byte[] bs;       // Invoker's previous array
 145             private byte[] b1;
 146 
 147             @Override
 148             public synchronized void write(int b) throws IOException {
 149                 if (b1 == null)
 150                     b1 = new byte[1];
 151                 b1[0] = (byte) b;
 152                 this.write(b1);
 153             }
 154 
 155             @Override
 156             public synchronized void write(byte[] bs, int off, int len)
 157                     throws IOException
 158             {
 159                 if ((off < 0) || (off > bs.length) || (len < 0) ||
 160                     ((off + len) > bs.length) || ((off + len) < 0)) {
 161                     throw new IndexOutOfBoundsException();
 162                 } else if (len == 0) {
 163                     return;
 164                 }
 165                 ByteBuffer bb = ((this.bs == bs)
 166                                  ? this.bb
 167                                  : ByteBuffer.wrap(bs));
 168                 bb.limit(Math.min(off + len, bb.capacity()));
 169                 bb.position(off);
 170                 this.bb = bb;
 171                 this.bs = bs;
 172                 Channels.writeFully(ch, bb);
 173             }
 174 
 175             @Override
 176             public void close() throws IOException {
 177                 ch.close();
 178             }
 179 
 180         };
 181     }
 182 
 183     /**
 184      * Constructs a stream that reads bytes from the given channel.
 185      *
 186      * <p> The stream will not be buffered, and it will not support the {@link
 187      * InputStream#mark mark} or {@link InputStream#reset reset} methods.  The
 188      * stream will be safe for access by multiple concurrent threads.  Closing
 189      * the stream will in turn cause the channel to be closed.  </p>
 190      *
 191      * @param  ch
 192      *         The channel from which bytes will be read
 193      *
 194      * @return  A new input stream
 195      *
 196      * @since 1.7
 197      */
 198     public static InputStream newInputStream(AsynchronousByteChannel ch) {
 199         Objects.requireNonNull(ch, "ch");
 200         return new InputStream() {
 201 
 202             private ByteBuffer bb;
 203             private byte[] bs;           // Invoker's previous array
 204             private byte[] b1;
 205 
 206             @Override
 207             public synchronized int read() throws IOException {
 208                 if (b1 == null)
 209                     b1 = new byte[1];
 210                 int n = this.read(b1);
 211                 if (n == 1)
 212                     return b1[0] & 0xff;
 213                 return -1;
 214             }
 215 
 216             @Override
 217             public synchronized int read(byte[] bs, int off, int len)
 218                     throws IOException
 219             {
 220                 if ((off < 0) || (off > bs.length) || (len < 0) ||
 221                     ((off + len) > bs.length) || ((off + len) < 0)) {
 222                     throw new IndexOutOfBoundsException();
 223                 } else if (len == 0) {
 224                     return 0;
 225                 }
 226 
 227                 ByteBuffer bb = ((this.bs == bs)
 228                                  ? this.bb
 229                                  : ByteBuffer.wrap(bs));
 230                 bb.position(off);
 231                 bb.limit(Math.min(off + len, bb.capacity()));
 232                 this.bb = bb;
 233                 this.bs = bs;
 234 
 235                 boolean interrupted = false;
 236                 try {
 237                     for (;;) {
 238                         try {
 239                             return ch.read(bb).get();
 240                         } catch (ExecutionException ee) {
 241                             throw new IOException(ee.getCause());
 242                         } catch (InterruptedException ie) {
 243                             interrupted = true;
 244                         }
 245                     }
 246                 } finally {
 247                     if (interrupted)
 248                         Thread.currentThread().interrupt();
 249                 }
 250             }
 251 
 252             @Override
 253             public void close() throws IOException {
 254                 ch.close();
 255             }
 256         };
 257     }
 258 
 259     /**
 260      * Constructs a stream that writes bytes to the given channel.
 261      *
 262      * <p> The stream will not be buffered. The stream will be safe for access
 263      * by multiple concurrent threads.  Closing the stream will in turn cause
 264      * the channel to be closed.  </p>
 265      *
 266      * @param  ch
 267      *         The channel to which bytes will be written
 268      *
 269      * @return  A new output stream
 270      *
 271      * @since 1.7
 272      */
 273     public static OutputStream newOutputStream(AsynchronousByteChannel ch) {
 274         Objects.requireNonNull(ch, "ch");
 275         return new OutputStream() {
 276 
 277             private ByteBuffer bb;
 278             private byte[] bs;   // Invoker's previous array
 279             private byte[] b1;
 280 
 281             @Override
 282             public synchronized void write(int b) throws IOException {
 283                 if (b1 == null)
 284                     b1 = new byte[1];
 285                 b1[0] = (byte) b;
 286                 this.write(b1);
 287             }
 288 
 289             @Override
 290             public synchronized void write(byte[] bs, int off, int len)
 291                     throws IOException
 292             {
 293                 if ((off < 0) || (off > bs.length) || (len < 0) ||
 294                     ((off + len) > bs.length) || ((off + len) < 0)) {
 295                     throw new IndexOutOfBoundsException();
 296                 } else if (len == 0) {
 297                     return;
 298                 }
 299                 ByteBuffer bb = ((this.bs == bs)
 300                                  ? this.bb
 301                                  : ByteBuffer.wrap(bs));
 302                 bb.limit(Math.min(off + len, bb.capacity()));
 303                 bb.position(off);
 304                 this.bb = bb;
 305                 this.bs = bs;
 306 
 307                 boolean interrupted = false;
 308                 try {
 309                     while (bb.remaining() > 0) {
 310                         try {
 311                             ch.write(bb).get();
 312                         } catch (ExecutionException ee) {
 313                             throw new IOException(ee.getCause());
 314                         } catch (InterruptedException ie) {
 315                             interrupted = true;
 316                         }
 317                     }
 318                 } finally {
 319                     if (interrupted)
 320                         Thread.currentThread().interrupt();
 321                 }
 322             }
 323 
 324             @Override
 325             public void close() throws IOException {
 326                 ch.close();
 327             }
 328         };
 329     }
 330 
 331 
 332     // -- Channels from streams --
 333 
 334     /**
 335      * Constructs a channel that reads bytes from the given stream.
 336      *
 337      * <p> The resulting channel will not be buffered; it will simply redirect
 338      * its I/O operations to the given stream.  Closing the channel will in
 339      * turn cause the stream to be closed.  </p>
 340      *
 341      * @param  in
 342      *         The stream from which bytes are to be read
 343      *
 344      * @return  A new readable byte channel
 345      */
 346     public static ReadableByteChannel newChannel(InputStream in) {
 347         Objects.requireNonNull(in, "in");
 348 
 349         if (in.getClass() == FileInputStream.class) {
 350             return ((FileInputStream) in).getChannel();
 351         }
 352 
 353         return new ReadableByteChannelImpl(in);
 354     }
 355 
 356     private static class ReadableByteChannelImpl
 357         extends AbstractInterruptibleChannel    // Not really interruptible
 358         implements ReadableByteChannel
 359     {
 360         private final InputStream in;
 361         private static final int TRANSFER_SIZE = 8192;
 362         private byte[] buf = new byte[0];
 363         private final Object readLock = new Object();
 364 
 365         ReadableByteChannelImpl(InputStream in) {
 366             this.in = in;
 367         }
 368 
 369         @Override
 370         public int read(ByteBuffer dst) throws IOException {
 371             if (!isOpen()) {
 372                 throw new ClosedChannelException();
 373             }
 374 
 375             int len = dst.remaining();
 376             int totalRead = 0;
 377             int bytesRead = 0;
 378             synchronized (readLock) {
 379                 while (totalRead < len) {
 380                     int bytesToRead = Math.min((len - totalRead),
 381                                                TRANSFER_SIZE);
 382                     if (buf.length < bytesToRead)
 383                         buf = new byte[bytesToRead];
 384                     if ((totalRead > 0) && !(in.available() > 0))
 385                         break; // block at most once
 386                     try {
 387                         begin();
 388                         bytesRead = in.read(buf, 0, bytesToRead);
 389                     } finally {
 390                         end(bytesRead > 0);
 391                     }
 392                     if (bytesRead < 0)
 393                         break;
 394                     else
 395                         totalRead += bytesRead;
 396                     dst.put(buf, 0, bytesRead);
 397                 }
 398                 if ((bytesRead < 0) && (totalRead == 0))
 399                     return -1;
 400 
 401                 return totalRead;
 402             }
 403         }
 404 
 405         @Override
 406         protected void implCloseChannel() throws IOException {
 407             in.close();
 408         }
 409     }
 410 
 411 
 412     /**
 413      * Constructs a channel that writes bytes to the given stream.
 414      *
 415      * <p> The resulting channel will not be buffered; it will simply redirect
 416      * its I/O operations to the given stream.  Closing the channel will in
 417      * turn cause the stream to be closed.  </p>
 418      *
 419      * @param  out
 420      *         The stream to which bytes are to be written
 421      *
 422      * @return  A new writable byte channel
 423      */
 424     public static WritableByteChannel newChannel(OutputStream out) {
 425         Objects.requireNonNull(out, "out");
 426 
 427         if (out.getClass() == FileOutputStream.class) {
 428             return ((FileOutputStream) out).getChannel();
 429         }
 430 
 431         return new WritableByteChannelImpl(out);
 432     }
 433 
 434     private static class WritableByteChannelImpl
 435         extends AbstractInterruptibleChannel    // Not really interruptible
 436         implements WritableByteChannel
 437     {
 438         private final OutputStream out;
 439         private static final int TRANSFER_SIZE = 8192;
 440         private byte[] buf = new byte[0];
 441         private final Object writeLock = new Object();
 442 
 443         WritableByteChannelImpl(OutputStream out) {
 444             this.out = out;
 445         }
 446 
 447         @Override
 448         public int write(ByteBuffer src) throws IOException {
 449             if (!isOpen()) {
 450                 throw new ClosedChannelException();
 451             }
 452 
 453             int len = src.remaining();
 454             int totalWritten = 0;
 455             synchronized (writeLock) {
 456                 while (totalWritten < len) {
 457                     int bytesToWrite = Math.min((len - totalWritten),
 458                                                 TRANSFER_SIZE);
 459                     if (buf.length < bytesToWrite)
 460                         buf = new byte[bytesToWrite];
 461                     src.get(buf, 0, bytesToWrite);
 462                     try {
 463                         begin();
 464                         out.write(buf, 0, bytesToWrite);
 465                     } finally {
 466                         end(bytesToWrite > 0);
 467                     }
 468                     totalWritten += bytesToWrite;
 469                 }
 470                 return totalWritten;
 471             }
 472         }
 473 
 474         @Override
 475         protected void implCloseChannel() throws IOException {
 476             out.close();
 477         }
 478     }
 479 
 480 
 481     // -- Character streams from channels --
 482 
 483     /**
 484      * Constructs a reader that decodes bytes from the given channel using the
 485      * given decoder.
 486      *
 487      * <p> The resulting stream will contain an internal input buffer of at
 488      * least {@code minBufferCap} bytes.  The stream's {@code read} methods
 489      * will, as needed, fill the buffer by reading bytes from the underlying
 490      * channel; if the channel is in non-blocking mode when bytes are to be
 491      * read then an {@link IllegalBlockingModeException} will be thrown.  The
 492      * resulting stream will not otherwise be buffered, and it will not support
 493      * the {@link Reader#mark mark} or {@link Reader#reset reset} methods.
 494      * Closing the stream will in turn cause the channel to be closed.  </p>
 495      *
 496      * @param  ch
 497      *         The channel from which bytes will be read
 498      *
 499      * @param  dec
 500      *         The charset decoder to be used
 501      *
 502      * @param  minBufferCap
 503      *         The minimum capacity of the internal byte buffer,
 504      *         or {@code -1} if an implementation-dependent
 505      *         default capacity is to be used
 506      *
 507      * @return  A new reader
 508      */
 509     public static Reader newReader(ReadableByteChannel ch,
 510                                    CharsetDecoder dec,
 511                                    int minBufferCap)
 512     {
 513         Objects.requireNonNull(ch, "ch");
 514         return StreamDecoder.forDecoder(ch, dec.reset(), minBufferCap);
 515     }
 516 
 517     /**
 518      * Constructs a reader that decodes bytes from the given channel according
 519      * to the named charset.
 520      *
 521      * <p> An invocation of this method of the form
 522      *
 523      * <pre> {@code
 524      *     Channels.newReader(ch, csname)
 525      * } </pre>
 526      *
 527      * behaves in exactly the same way as the expression
 528      *
 529      * <pre> {@code
 530      *     Channels.newReader(ch, Charset.forName(csName))
 531      * } </pre>
 532      *
 533      * @param  ch
 534      *         The channel from which bytes will be read
 535      *
 536      * @param  csName
 537      *         The name of the charset to be used
 538      *
 539      * @return  A new reader
 540      *
 541      * @throws  UnsupportedCharsetException
 542      *          If no support for the named charset is available
 543      *          in this instance of the Java virtual machine
 544      */
 545     public static Reader newReader(ReadableByteChannel ch,
 546                                    String csName)
 547     {
 548         Objects.requireNonNull(csName, "csName");
 549         return newReader(ch, Charset.forName(csName).newDecoder(), -1);
 550     }
 551 
 552     /**
 553      * Constructs a reader that decodes bytes from the given channel according
 554      * to the given charset.
 555      *
 556      * <p> An invocation of this method of the form
 557      *
 558      * <pre> {@code
 559      *     Channels.newReader(ch, charset)
 560      * } </pre>
 561      *
 562      * behaves in exactly the same way as the expression
 563      *
 564      * <pre> {@code
 565      *     Channels.newReader(ch, Charset.forName(csName).newDecoder(), -1)
 566      * } </pre>
 567      *
 568      * <p> The reader's default action for malformed-input and unmappable-character
 569      * errors is to {@linkplain java.nio.charset.CodingErrorAction#REPORT report}
 570      * them. When more control over the error handling is required, the constructor
 571      * that takes a {@linkplain java.nio.charset.CharsetDecoder} should be used.
 572      *
 573      * @param  ch The channel from which bytes will be read
 574      *
 575      * @param  charset The charset to be used
 576      *
 577      * @return  A new reader
 578      */
 579     public static Reader newReader(ReadableByteChannel ch, Charset charset) {
 580         Objects.requireNonNull(charset, "charset");
 581         return newReader(ch, charset.newDecoder(), -1);
 582     }
 583 
 584     /**
 585      * Constructs a writer that encodes characters using the given encoder and
 586      * writes the resulting bytes to the given channel.
 587      *
 588      * <p> The resulting stream will contain an internal output buffer of at
 589      * least {@code minBufferCap} bytes.  The stream's {@code write} methods
 590      * will, as needed, flush the buffer by writing bytes to the underlying
 591      * channel; if the channel is in non-blocking mode when bytes are to be
 592      * written then an {@link IllegalBlockingModeException} will be thrown.
 593      * The resulting stream will not otherwise be buffered.  Closing the stream
 594      * will in turn cause the channel to be closed.  </p>
 595      *
 596      * @param  ch
 597      *         The channel to which bytes will be written
 598      *
 599      * @param  enc
 600      *         The charset encoder to be used
 601      *
 602      * @param  minBufferCap
 603      *         The minimum capacity of the internal byte buffer,
 604      *         or {@code -1} if an implementation-dependent
 605      *         default capacity is to be used
 606      *
 607      * @return  A new writer
 608      */
 609     public static Writer newWriter(WritableByteChannel ch,
 610                                    CharsetEncoder enc,
 611                                    int minBufferCap)
 612     {
 613         Objects.requireNonNull(ch, "ch");
 614         return StreamEncoder.forEncoder(ch, enc.reset(), minBufferCap);
 615     }
 616 
 617     /**
 618      * Constructs a writer that encodes characters according to the named
 619      * charset and writes the resulting bytes to the given channel.
 620      *
 621      * <p> An invocation of this method of the form
 622      *
 623      * <pre> {@code
 624      *     Channels.newWriter(ch, csname)
 625      * } </pre>
 626      *
 627      * behaves in exactly the same way as the expression
 628      *
 629      * <pre> {@code
 630      *     Channels.newWriter(ch, Charset.forName(csName))
 631      * } </pre>
 632      *
 633      * @param  ch
 634      *         The channel to which bytes will be written
 635      *
 636      * @param  csName
 637      *         The name of the charset to be used
 638      *
 639      * @return  A new writer
 640      *
 641      * @throws  UnsupportedCharsetException
 642      *          If no support for the named charset is available
 643      *          in this instance of the Java virtual machine
 644      */
 645     public static Writer newWriter(WritableByteChannel ch,
 646                                    String csName)
 647     {
 648         Objects.requireNonNull(csName, "csName");
 649         return newWriter(ch, Charset.forName(csName).newEncoder(), -1);
 650     }
 651 
 652     /**
 653      * Constructs a writer that encodes characters according to the given
 654      * charset and writes the resulting bytes to the given channel.
 655      *
 656      * <p> An invocation of this method of the form
 657      *
 658      * <pre> {@code
 659      *     Channels.newWriter(ch, charset)
 660      * } </pre>
 661      *
 662      * behaves in exactly the same way as the expression
 663      *
 664      * <pre> {@code
 665      *     Channels.newWriter(ch, Charset.forName(csName).newEncoder(), -1)
 666      * } </pre>
 667      *
 668      * <p> The writer's default action for malformed-input and unmappable-character
 669      * errors is to {@linkplain java.nio.charset.CodingErrorAction#REPORT report}
 670      * them. When more control over the error handling is required, the constructor
 671      * that takes a {@linkplain java.nio.charset.CharsetEncoder} should be used.
 672      *
 673      * @param  ch
 674      *         The channel to which bytes will be written
 675      *
 676      * @param  charset
 677      *         The charset to be used
 678      *
 679      * @return  A new writer
 680      */
 681     public static Writer newWriter(WritableByteChannel ch, Charset charset) {
 682         Objects.requireNonNull(charset, "charset");
 683         return newWriter(ch, charset.newEncoder(), -1);
 684     }
 685 }