1 /*
   2  * Copyright 2000-2009 Sun Microsystems, Inc.  All Rights Reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Sun designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Sun in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
  22  * CA 95054 USA or visit www.sun.com if you need additional information or
  23  * have any questions.
  24  */
  25 
  26 package java.nio.channels;
  27 
  28 import java.io.FileInputStream;
  29 import java.io.FileOutputStream;
  30 import java.io.InputStream;
  31 import java.io.OutputStream;
  32 import java.io.Reader;
  33 import java.io.Writer;
  34 import java.io.IOException;
  35 import java.nio.ByteBuffer;
  36 import java.nio.charset.Charset;
  37 import java.nio.charset.CharsetDecoder;
  38 import java.nio.charset.CharsetEncoder;
  39 import java.nio.charset.UnsupportedCharsetException;
  40 import java.nio.channels.spi.AbstractInterruptibleChannel;
  41 import java.util.concurrent.ExecutionException;
  42 import sun.nio.ch.ChannelInputStream;
  43 import sun.nio.cs.StreamDecoder;
  44 import sun.nio.cs.StreamEncoder;
  45 
  46 
  47 /**
  48  * Utility methods for channels and streams.
  49  *
  50  * <p> This class defines static methods that support the interoperation of the
  51  * stream classes of the <tt>{@link java.io}</tt> package with the channel
  52  * classes of this package.  </p>
  53  *
  54  *
  55  * @author Mark Reinhold
  56  * @author Mike McCloskey
  57  * @author JSR-51 Expert Group
  58  * @since 1.4
  59  */
  60 
  61 public final class Channels {
  62 
  63     private Channels() { }              // No instantiation
  64 
  65     private static void checkNotNull(Object o, String name) {
  66         if (o == null)
  67             throw new NullPointerException("\"" + name + "\" is null!");
  68     }
  69 
  70     /**
  71      * Write all remaining bytes in buffer to the given channel.
  72      * If the channel is selectable then it must be configured blocking.
  73      */
  74     private static void writeFullyImpl(WritableByteChannel ch, ByteBuffer bb)
  75         throws IOException
  76     {
  77         while (bb.remaining() > 0) {
  78             int n = ch.write(bb);
  79             if (n <= 0)
  80                 throw new RuntimeException("no bytes written");
  81         }
  82     }
  83 
  84     /**
  85      * Write all remaining bytes in buffer to the given channel.
  86      *
  87      * @throws  IllegalBlockingException
  88      *          If the channel is selectable and configured non-blocking.
  89      */
  90     private static void writeFully(WritableByteChannel ch, ByteBuffer bb)
  91         throws IOException
  92     {
  93         if (ch instanceof SelectableChannel) {
  94             SelectableChannel sc = (SelectableChannel)ch;
  95             synchronized (sc.blockingLock()) {
  96                 if (!sc.isBlocking())
  97                     throw new IllegalBlockingModeException();
  98                 writeFullyImpl(ch, bb);
  99             }
 100         } else {
 101             writeFullyImpl(ch, bb);
 102         }
 103     }
 104 
 105     // -- Byte streams from channels --
 106 
 107     /**
 108      * Constructs a stream that reads bytes from the given channel.
 109      *
 110      * <p> The <tt>read</tt> methods of the resulting stream will throw an
 111      * {@link IllegalBlockingModeException} if invoked while the underlying
 112      * channel is in non-blocking mode.  The stream will not be buffered, and
 113      * it will not support the {@link InputStream#mark mark} or {@link
 114      * InputStream#reset reset} methods.  The stream will be safe for access by
 115      * multiple concurrent threads.  Closing the stream will in turn cause the
 116      * channel to be closed.  </p>
 117      *
 118      * @param  ch
 119      *         The channel from which bytes will be read
 120      *
 121      * @return  A new input stream
 122      */
 123     public static InputStream newInputStream(ReadableByteChannel ch) {
 124         checkNotNull(ch, "ch");
 125         return new sun.nio.ch.ChannelInputStream(ch);
 126     }
 127 
 128     /**
 129      * Constructs a stream that writes bytes to the given channel.
 130      *
 131      * <p> The <tt>write</tt> methods of the resulting stream will throw an
 132      * {@link IllegalBlockingModeException} if invoked while the underlying
 133      * channel is in non-blocking mode.  The stream will not be buffered.  The
 134      * stream will be safe for access by multiple concurrent threads.  Closing
 135      * the stream will in turn cause the channel to be closed.  </p>
 136      *
 137      * @param  ch
 138      *         The channel to which bytes will be written
 139      *
 140      * @return  A new output stream
 141      */
 142     public static OutputStream newOutputStream(final WritableByteChannel ch) {
 143         checkNotNull(ch, "ch");
 144 
 145         return new OutputStream() {
 146 
 147                 private ByteBuffer bb = null;
 148                 private byte[] bs = null;       // Invoker's previous array
 149                 private byte[] b1 = null;
 150 
 151                 public synchronized void write(int b) throws IOException {
 152                    if (b1 == null)
 153                         b1 = new byte[1];
 154                     b1[0] = (byte)b;
 155                     this.write(b1);
 156                 }
 157 
 158                 public synchronized void write(byte[] bs, int off, int len)
 159                     throws IOException
 160                 {
 161                     if ((off < 0) || (off > bs.length) || (len < 0) ||
 162                         ((off + len) > bs.length) || ((off + len) < 0)) {
 163                         throw new IndexOutOfBoundsException();
 164                     } else if (len == 0) {
 165                         return;
 166                     }
 167                     ByteBuffer bb = ((this.bs == bs)
 168                                      ? this.bb
 169                                      : ByteBuffer.wrap(bs));
 170                     bb.limit(Math.min(off + len, bb.capacity()));
 171                     bb.position(off);
 172                     this.bb = bb;
 173                     this.bs = bs;
 174                     Channels.writeFully(ch, bb);
 175                 }
 176 
 177                 public void close() throws IOException {
 178                     ch.close();
 179                 }
 180 
 181             };
 182     }
 183 
 184     /**
 185      * {@note new}
 186      * Constructs a stream that reads bytes from the given channel.
 187      *
 188      * <p> The stream will not be buffered, and it will not support the {@link
 189      * InputStream#mark mark} or {@link InputStream#reset reset} methods.  The
 190      * stream will be safe for access by multiple concurrent threads.  Closing
 191      * the stream will in turn cause the channel to be closed.  </p>
 192      *
 193      * @param  ch
 194      *         The channel from which bytes will be read
 195      *
 196      * @return  A new input stream
 197      *
 198      * @since 1.7
 199      */
 200     public static InputStream newInputStream(final AsynchronousByteChannel ch) {
 201         checkNotNull(ch, "ch");
 202         return new InputStream() {
 203 
 204             private ByteBuffer bb = null;
 205             private byte[] bs = null;           // Invoker's previous array
 206             private byte[] b1 = null;
 207 
 208             @Override
 209             public synchronized int read() throws IOException {
 210                 if (b1 == null)
 211                     b1 = new byte[1];
 212                 int n = this.read(b1);
 213                 if (n == 1)
 214                     return b1[0] & 0xff;
 215                 return -1;
 216             }
 217 
 218             @Override
 219             public synchronized int read(byte[] bs, int off, int len)
 220                 throws IOException
 221             {
 222                 if ((off < 0) || (off > bs.length) || (len < 0) ||
 223                     ((off + len) > bs.length) || ((off + len) < 0)) {
 224                     throw new IndexOutOfBoundsException();
 225                 } else if (len == 0)
 226                     return 0;
 227 
 228                 ByteBuffer bb = ((this.bs == bs)
 229                                  ? this.bb
 230                                  : ByteBuffer.wrap(bs));
 231                 bb.position(off);
 232                 bb.limit(Math.min(off + len, bb.capacity()));
 233                 this.bb = bb;
 234                 this.bs = bs;
 235 
 236                 boolean interrupted = false;
 237                 try {
 238                     for (;;) {
 239                         try {
 240                             return ch.read(bb).get();
 241                         } catch (ExecutionException ee) {
 242                             throw new IOException(ee.getCause());
 243                         } catch (InterruptedException ie) {
 244                             interrupted = true;
 245                         }
 246                     }
 247                 } finally {
 248                     if (interrupted)
 249                         Thread.currentThread().interrupt();
 250                 }
 251             }
 252 
 253             @Override
 254             public void close() throws IOException {
 255                 ch.close();
 256             }
 257         };
 258     }
 259 
 260     /**
 261      * {@note new}
 262      * Constructs a stream that writes bytes to the given channel.
 263      *
 264      * <p> The stream will not be buffered. The stream will be safe for access
 265      * by multiple concurrent threads.  Closing the stream will in turn cause
 266      * the channel to be closed.  </p>
 267      *
 268      * @param  ch
 269      *         The channel to which bytes will be written
 270      *
 271      * @return  A new output stream
 272      *
 273      * @since 1.7
 274      */
 275     public static OutputStream newOutputStream(final AsynchronousByteChannel ch) {
 276         checkNotNull(ch, "ch");
 277         return new OutputStream() {
 278 
 279             private ByteBuffer bb = null;
 280             private byte[] bs = null;   // Invoker's previous array
 281             private byte[] b1 = null;
 282 
 283             @Override
 284             public synchronized void write(int b) throws IOException {
 285                if (b1 == null)
 286                     b1 = new byte[1];
 287                 b1[0] = (byte)b;
 288                 this.write(b1);
 289             }
 290 
 291             @Override
 292             public synchronized void write(byte[] bs, int off, int len)
 293                 throws IOException
 294             {
 295                 if ((off < 0) || (off > bs.length) || (len < 0) ||
 296                     ((off + len) > bs.length) || ((off + len) < 0)) {
 297                     throw new IndexOutOfBoundsException();
 298                 } else if (len == 0) {
 299                     return;
 300                 }
 301                 ByteBuffer bb = ((this.bs == bs)
 302                                  ? this.bb
 303                                  : ByteBuffer.wrap(bs));
 304                 bb.limit(Math.min(off + len, bb.capacity()));
 305                 bb.position(off);
 306                 this.bb = bb;
 307                 this.bs = bs;
 308 
 309                 boolean interrupted = false;
 310                 try {
 311                     while (bb.remaining() > 0) {
 312                         try {
 313                             ch.write(bb).get();
 314                         } catch (ExecutionException ee) {
 315                             throw new IOException(ee.getCause());
 316                         } catch (InterruptedException ie) {
 317                             interrupted = true;
 318                         }
 319                     }
 320                 } finally {
 321                     if (interrupted)
 322                         Thread.currentThread().interrupt();
 323                 }
 324             }
 325 
 326             @Override
 327             public void close() throws IOException {
 328                 ch.close();
 329             }
 330         };
 331     }
 332 
 333 
 334     // -- Channels from streams --
 335 
 336     /**
 337      * Constructs a channel that reads bytes from the given stream.
 338      *
 339      * <p> The resulting channel will not be buffered; it will simply redirect
 340      * its I/O operations to the given stream.  Closing the channel will in
 341      * turn cause the stream to be closed.  </p>
 342      *
 343      * @param  in
 344      *         The stream from which bytes are to be read
 345      *
 346      * @return  A new readable byte channel
 347      */
 348     public static ReadableByteChannel newChannel(final InputStream in) {
 349         checkNotNull(in, "in");
 350 
 351         if (in instanceof FileInputStream &&
 352             FileInputStream.class.equals(in.getClass())) {
 353             return ((FileInputStream)in).getChannel();
 354         }
 355 
 356         return new ReadableByteChannelImpl(in);
 357     }
 358 
 359     private static class ReadableByteChannelImpl
 360         extends AbstractInterruptibleChannel    // Not really interruptible
 361         implements ReadableByteChannel
 362     {
 363         InputStream in;
 364         private static final int TRANSFER_SIZE = 8192;
 365         private byte buf[] = new byte[0];
 366         private boolean open = true;
 367         private Object readLock = new Object();
 368 
 369         ReadableByteChannelImpl(InputStream in) {
 370             this.in = in;
 371         }
 372 
 373         public int read(ByteBuffer dst) throws IOException {
 374             int len = dst.remaining();
 375             int totalRead = 0;
 376             int bytesRead = 0;
 377             synchronized (readLock) {
 378                 while (totalRead < len) {
 379                     int bytesToRead = Math.min((len - totalRead),
 380                                                TRANSFER_SIZE);
 381                     if (buf.length < bytesToRead)
 382                         buf = new byte[bytesToRead];
 383                     if ((totalRead > 0) && !(in.available() > 0))
 384                         break; // block at most once
 385                     try {
 386                         begin();
 387                         bytesRead = in.read(buf, 0, bytesToRead);
 388                     } finally {
 389                         end(bytesRead > 0);
 390                     }
 391                     if (bytesRead < 0)
 392                         break;
 393                     else
 394                         totalRead += bytesRead;
 395                     dst.put(buf, 0, bytesRead);
 396                 }
 397                 if ((bytesRead < 0) && (totalRead == 0))
 398                     return -1;
 399 
 400                 return totalRead;
 401             }
 402         }
 403 
 404         protected void implCloseChannel() throws IOException {
 405             in.close();
 406             open = false;
 407         }
 408     }
 409 
 410 
 411     /**
 412      * Constructs a channel that writes bytes to the given stream.
 413      *
 414      * <p> The resulting channel will not be buffered; it will simply redirect
 415      * its I/O operations to the given stream.  Closing the channel will in
 416      * turn cause the stream to be closed.  </p>
 417      *
 418      * @param  out
 419      *         The stream to which bytes are to be written
 420      *
 421      * @return  A new writable byte channel
 422      */
 423     public static WritableByteChannel newChannel(final OutputStream out) {
 424         checkNotNull(out, "out");
 425 
 426         if (out instanceof FileOutputStream &&
 427             FileOutputStream.class.equals(out.getClass())) {
 428                 return ((FileOutputStream)out).getChannel();
 429         }
 430 
 431         return new WritableByteChannelImpl(out);
 432     }
 433 
 434     private static class WritableByteChannelImpl
 435         extends AbstractInterruptibleChannel    // Not really interruptible
 436         implements WritableByteChannel
 437     {
 438         OutputStream out;
 439         private static final int TRANSFER_SIZE = 8192;
 440         private byte buf[] = new byte[0];
 441         private boolean open = true;
 442         private Object writeLock = new Object();
 443 
 444         WritableByteChannelImpl(OutputStream out) {
 445             this.out = out;
 446         }
 447 
 448         public int write(ByteBuffer src) throws IOException {
 449             int len = src.remaining();
 450             int totalWritten = 0;
 451             synchronized (writeLock) {
 452                 while (totalWritten < len) {
 453                     int bytesToWrite = Math.min((len - totalWritten),
 454                                                 TRANSFER_SIZE);
 455                     if (buf.length < bytesToWrite)
 456                         buf = new byte[bytesToWrite];
 457                     src.get(buf, 0, bytesToWrite);
 458                     try {
 459                         begin();
 460                         out.write(buf, 0, bytesToWrite);
 461                     } finally {
 462                         end(bytesToWrite > 0);
 463                     }
 464                     totalWritten += bytesToWrite;
 465                 }
 466                 return totalWritten;
 467             }
 468         }
 469 
 470         protected void implCloseChannel() throws IOException {
 471             out.close();
 472             open = false;
 473         }
 474     }
 475 
 476 
 477     // -- Character streams from channels --
 478 
 479     /**
 480      * Constructs a reader that decodes bytes from the given channel using the
 481      * given decoder.
 482      *
 483      * <p> The resulting stream will contain an internal input buffer of at
 484      * least <tt>minBufferCap</tt> bytes.  The stream's <tt>read</tt> methods
 485      * will, as needed, fill the buffer by reading bytes from the underlying
 486      * channel; if the channel is in non-blocking mode when bytes are to be
 487      * read then an {@link IllegalBlockingModeException} will be thrown.  The
 488      * resulting stream will not otherwise be buffered, and it will not support
 489      * the {@link Reader#mark mark} or {@link Reader#reset reset} methods.
 490      * Closing the stream will in turn cause the channel to be closed.  </p>
 491      *
 492      * @param  ch
 493      *         The channel from which bytes will be read
 494      *
 495      * @param  dec
 496      *         The charset decoder to be used
 497      *
 498      * @param  minBufferCap
 499      *         The minimum capacity of the internal byte buffer,
 500      *         or <tt>-1</tt> if an implementation-dependent
 501      *         default capacity is to be used
 502      *
 503      * @return  A new reader
 504      */
 505     public static Reader newReader(ReadableByteChannel ch,
 506                                    CharsetDecoder dec,
 507                                    int minBufferCap)
 508     {
 509         checkNotNull(ch, "ch");
 510         return StreamDecoder.forDecoder(ch, dec.reset(), minBufferCap);
 511     }
 512 
 513     /**
 514      * Constructs a reader that decodes bytes from the given channel according
 515      * to the named charset.
 516      *
 517      * <p> An invocation of this method of the form
 518      *
 519      * <blockquote><pre>
 520      * Channels.newReader(ch, csname)</pre></blockquote>
 521      *
 522      * behaves in exactly the same way as the expression
 523      *
 524      * <blockquote><pre>
 525      * Channels.newReader(ch,
 526      *                    Charset.forName(csName)
 527      *                        .newDecoder(),
 528      *                    -1);</pre></blockquote>
 529      *
 530      * @param  ch
 531      *         The channel from which bytes will be read
 532      *
 533      * @param  csName
 534      *         The name of the charset to be used
 535      *
 536      * @return  A new reader
 537      *
 538      * @throws  UnsupportedCharsetException
 539      *          If no support for the named charset is available
 540      *          in this instance of the Java virtual machine
 541      */
 542     public static Reader newReader(ReadableByteChannel ch,
 543                                    String csName)
 544     {
 545         checkNotNull(csName, "csName");
 546         return newReader(ch, Charset.forName(csName).newDecoder(), -1);
 547     }
 548 
 549     /**
 550      * Constructs a writer that encodes characters using the given encoder and
 551      * writes the resulting bytes to the given channel.
 552      *
 553      * <p> The resulting stream will contain an internal output buffer of at
 554      * least <tt>minBufferCap</tt> bytes.  The stream's <tt>write</tt> methods
 555      * will, as needed, flush the buffer by writing bytes to the underlying
 556      * channel; if the channel is in non-blocking mode when bytes are to be
 557      * written then an {@link IllegalBlockingModeException} will be thrown.
 558      * The resulting stream will not otherwise be buffered.  Closing the stream
 559      * will in turn cause the channel to be closed.  </p>
 560      *
 561      * @param  ch
 562      *         The channel to which bytes will be written
 563      *
 564      * @param  enc
 565      *         The charset encoder to be used
 566      *
 567      * @param  minBufferCap
 568      *         The minimum capacity of the internal byte buffer,
 569      *         or <tt>-1</tt> if an implementation-dependent
 570      *         default capacity is to be used
 571      *
 572      * @return  A new writer
 573      */
 574     public static Writer newWriter(final WritableByteChannel ch,
 575                                    final CharsetEncoder enc,
 576                                    final int minBufferCap)
 577     {
 578         checkNotNull(ch, "ch");
 579         return StreamEncoder.forEncoder(ch, enc.reset(), minBufferCap);
 580     }
 581 
 582     /**
 583      * Constructs a writer that encodes characters according to the named
 584      * charset and writes the resulting bytes to the given channel.
 585      *
 586      * <p> An invocation of this method of the form
 587      *
 588      * <blockquote><pre>
 589      * Channels.newWriter(ch, csname)</pre></blockquote>
 590      *
 591      * behaves in exactly the same way as the expression
 592      *
 593      * <blockquote><pre>
 594      * Channels.newWriter(ch,
 595      *                    Charset.forName(csName)
 596      *                        .newEncoder(),
 597      *                    -1);</pre></blockquote>
 598      *
 599      * @param  ch
 600      *         The channel to which bytes will be written
 601      *
 602      * @param  csName
 603      *         The name of the charset to be used
 604      *
 605      * @return  A new writer
 606      *
 607      * @throws  UnsupportedCharsetException
 608      *          If no support for the named charset is available
 609      *          in this instance of the Java virtual machine
 610      */
 611     public static Writer newWriter(WritableByteChannel ch,
 612                                    String csName)
 613     {
 614         checkNotNull(csName, "csName");
 615         return newWriter(ch, Charset.forName(csName).newEncoder(), -1);
 616     }
 617 }