1 /*
   2  * Copyright (c) 2016, 2017, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package jdk.incubator.http;
  27 
  28 import java.io.File;
  29 import java.io.FileInputStream;
  30 import java.io.IOException;
  31 import java.io.InputStream;
  32 import java.io.UncheckedIOException;
  33 import java.nio.ByteBuffer;
  34 import java.nio.charset.Charset;
  35 import java.nio.file.Path;
  36 import java.security.AccessControlContext;
  37 import java.security.AccessController;
  38 import java.security.PrivilegedAction;
  39 import java.security.PrivilegedActionException;
  40 import java.security.PrivilegedExceptionAction;
  41 import java.util.ArrayList;
  42 import java.util.Collections;
  43 import java.util.Iterator;
  44 import java.util.List;
  45 import java.util.NoSuchElementException;
  46 import java.util.Objects;
  47 import java.util.concurrent.ConcurrentLinkedQueue;
  48 import java.util.concurrent.Flow;
  49 import java.util.concurrent.Flow.Publisher;
  50 import java.util.function.Supplier;
  51 import jdk.incubator.http.HttpRequest.BodyPublisher;
  52 import jdk.incubator.http.internal.common.Utils;
  53 
  54 class RequestPublishers {
  55 
  56     static class ByteArrayPublisher implements HttpRequest.BodyPublisher {
  57         private volatile Flow.Publisher<ByteBuffer> delegate;
  58         private final int length;
  59         private final byte[] content;
  60         private final int offset;
  61         private final int bufSize;
  62 
  63         ByteArrayPublisher(byte[] content) {
  64             this(content, 0, content.length);
  65         }
  66 
  67         ByteArrayPublisher(byte[] content, int offset, int length) {
  68             this(content, offset, length, Utils.BUFSIZE);
  69         }
  70 
  71         /* bufSize exposed for testing purposes */
  72         ByteArrayPublisher(byte[] content, int offset, int length, int bufSize) {
  73             this.content = content;
  74             this.offset = offset;
  75             this.length = length;
  76             this.bufSize = bufSize;
  77         }
  78 
  79         List<ByteBuffer> copy(byte[] content, int offset, int length) {
  80             List<ByteBuffer> bufs = new ArrayList<>();
  81             while (length > 0) {
  82                 ByteBuffer b = ByteBuffer.allocate(Math.min(bufSize, length));
  83                 int max = b.capacity();
  84                 int tocopy = Math.min(max, length);
  85                 b.put(content, offset, tocopy);
  86                 offset += tocopy;
  87                 length -= tocopy;
  88                 b.flip();
  89                 bufs.add(b);
  90             }
  91             return bufs;
  92         }
  93 
  94         @Override
  95         public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
  96             List<ByteBuffer> copy = copy(content, offset, length);
  97             this.delegate = new PullPublisher<>(copy);
  98             delegate.subscribe(subscriber);
  99         }
 100 
 101         @Override
 102         public long contentLength() {
 103             return length;
 104         }
 105     }
 106 
 107     // This implementation has lots of room for improvement.
 108     static class IterablePublisher implements HttpRequest.BodyPublisher {
 109         private volatile Flow.Publisher<ByteBuffer> delegate;
 110         private final Iterable<byte[]> content;
 111         private volatile long contentLength;
 112 
 113         IterablePublisher(Iterable<byte[]> content) {
 114             this.content = content;
 115         }
 116 
 117         // The ByteBufferIterator will iterate over the byte[] arrays in
 118         // the content one at the time.
 119         //
 120         class ByteBufferIterator implements Iterator<ByteBuffer> {
 121             final ConcurrentLinkedQueue<ByteBuffer> buffers = new ConcurrentLinkedQueue<>();
 122             final Iterator<byte[]> iterator = content.iterator();
 123             @Override
 124             public boolean hasNext() {
 125                 return !buffers.isEmpty() || iterator.hasNext();
 126             }
 127 
 128             @Override
 129             public ByteBuffer next() {
 130                 ByteBuffer buffer = buffers.poll();
 131                 while (buffer == null) {
 132                     copy();
 133                     buffer = buffers.poll();
 134                 }
 135                 return buffer;
 136             }
 137 
 138             ByteBuffer getBuffer() {
 139                 return Utils.getBuffer();
 140             }
 141 
 142             void copy() {
 143                 byte[] bytes = iterator.next();
 144                 int length = bytes.length;
 145                 if (length == 0 && iterator.hasNext()) {
 146                     // avoid inserting empty buffers, except
 147                     // if that's the last.
 148                     return;
 149                 }
 150                 int offset = 0;
 151                 do {
 152                     ByteBuffer b = getBuffer();
 153                     int max = b.capacity();
 154 
 155                     int tocopy = Math.min(max, length);
 156                     b.put(bytes, offset, tocopy);
 157                     offset += tocopy;
 158                     length -= tocopy;
 159                     b.flip();
 160                     buffers.add(b);
 161                 } while (length > 0);
 162             }
 163         }
 164 
 165         public Iterator<ByteBuffer> iterator() {
 166             return new ByteBufferIterator();
 167         }
 168 
 169         @Override
 170         public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
 171             Iterable<ByteBuffer> iterable = this::iterator;
 172             this.delegate = new PullPublisher<>(iterable);
 173             delegate.subscribe(subscriber);
 174         }
 175 
 176         static long computeLength(Iterable<byte[]> bytes) {
 177             long len = 0;
 178             for (byte[] b : bytes) {
 179                 len = Math.addExact(len, (long)b.length);
 180             }
 181             return len;
 182         }
 183 
 184         @Override
 185         public long contentLength() {
 186             if (contentLength == 0) {
 187                 synchronized(this) {
 188                     if (contentLength == 0) {
 189                         contentLength = computeLength(content);
 190                     }
 191                 }
 192             }
 193             return contentLength;
 194         }
 195     }
 196 
 197     static class StringPublisher extends ByteArrayPublisher {
 198         public StringPublisher(String content, Charset charset) {
 199             super(content.getBytes(charset));
 200         }
 201     }
 202 
 203     static class EmptyPublisher implements HttpRequest.BodyPublisher {
 204         private final Flow.Publisher<ByteBuffer> delegate =
 205                 new PullPublisher<ByteBuffer>(Collections.emptyList(), null);
 206 
 207         @Override
 208         public long contentLength() {
 209             return 0;
 210         }
 211 
 212         @Override
 213         public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
 214             delegate.subscribe(subscriber);
 215         }
 216     }
 217 
 218     static class FilePublisher implements BodyPublisher  {
 219         private final File file;
 220         private volatile AccessControlContext acc;
 221 
 222         FilePublisher(Path name) {
 223             file = name.toFile();
 224         }
 225 
 226         void setAccessControlContext(AccessControlContext acc) {
 227             this.acc = acc;
 228         }
 229 
 230         @Override
 231         public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
 232             if (System.getSecurityManager() != null && acc == null)
 233                 throw new InternalError(
 234                         "Unexpected null acc when security manager has been installed");
 235 
 236             InputStream is;
 237             try {
 238                 PrivilegedExceptionAction<FileInputStream> pa =
 239                         () -> new FileInputStream(file);
 240                 is = AccessController.doPrivileged(pa, acc);
 241             } catch (PrivilegedActionException pae) {
 242                 throw new UncheckedIOException((IOException)pae.getCause());
 243             }
 244             PullPublisher<ByteBuffer> publisher =
 245                     new PullPublisher<>(() -> new StreamIterator(is));
 246             publisher.subscribe(subscriber);
 247         }
 248 
 249         @Override
 250         public long contentLength() {
 251             assert System.getSecurityManager() != null ? acc != null: true;
 252             PrivilegedAction<Long> pa = () -> file.length();
 253             return AccessController.doPrivileged(pa, acc);
 254         }
 255     }
 256 
 257     /**
 258      * Reads one buffer ahead all the time, blocking in hasNext()
 259      */
 260     static class StreamIterator implements Iterator<ByteBuffer> {
 261         final InputStream is;
 262         final Supplier<? extends ByteBuffer> bufSupplier;
 263         volatile ByteBuffer nextBuffer;
 264         volatile boolean need2Read = true;
 265         volatile boolean haveNext;
 266 
 267         StreamIterator(InputStream is) {
 268             this(is, Utils::getBuffer);
 269         }
 270 
 271         StreamIterator(InputStream is, Supplier<? extends ByteBuffer> bufSupplier) {
 272             this.is = is;
 273             this.bufSupplier = bufSupplier;
 274         }
 275 
 276 //        Throwable error() {
 277 //            return error;
 278 //        }
 279 
 280         private int read() {
 281             nextBuffer = bufSupplier.get();
 282             nextBuffer.clear();
 283             byte[] buf = nextBuffer.array();
 284             int offset = nextBuffer.arrayOffset();
 285             int cap = nextBuffer.capacity();
 286             try {
 287                 int n = is.read(buf, offset, cap);
 288                 if (n == -1) {
 289                     is.close();
 290                     return -1;
 291                 }
 292                 //flip
 293                 nextBuffer.limit(n);
 294                 nextBuffer.position(0);
 295                 return n;
 296             } catch (IOException ex) {
 297                 return -1;
 298             }
 299         }
 300 
 301         @Override
 302         public synchronized boolean hasNext() {
 303             if (need2Read) {
 304                 haveNext = read() != -1;
 305                 if (haveNext) {
 306                     need2Read = false;
 307                 }
 308                 return haveNext;
 309             }
 310             return haveNext;
 311         }
 312 
 313         @Override
 314         public synchronized ByteBuffer next() {
 315             if (!hasNext()) {
 316                 throw new NoSuchElementException();
 317             }
 318             need2Read = true;
 319             return nextBuffer;
 320         }
 321 
 322     }
 323 
 324     static class InputStreamPublisher implements BodyPublisher {
 325         private final Supplier<? extends InputStream> streamSupplier;
 326 
 327         InputStreamPublisher(Supplier<? extends InputStream> streamSupplier) {
 328             this.streamSupplier = streamSupplier;
 329         }
 330 
 331         @Override
 332         public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
 333             PullPublisher<ByteBuffer> publisher;
 334             InputStream is = streamSupplier.get();
 335             if (is == null) {
 336                 Throwable t = new IOException("streamSupplier returned null");
 337                 publisher = new PullPublisher<>(null, t);
 338             } else  {
 339                 publisher = new PullPublisher<>(iterableOf(is), null);
 340             }
 341             publisher.subscribe(subscriber);
 342         }
 343 
 344         protected Iterable<ByteBuffer> iterableOf(InputStream is) {
 345             return () -> new StreamIterator(is);
 346         }
 347 
 348         @Override
 349         public long contentLength() {
 350             return -1;
 351         }
 352     }
 353 
 354     static final class PublisherAdapter implements BodyPublisher {
 355 
 356         private final Publisher<? extends ByteBuffer> publisher;
 357         private final long contentLength;
 358 
 359         PublisherAdapter(Publisher<? extends ByteBuffer> publisher,
 360                          long contentLength) {
 361             this.publisher = Objects.requireNonNull(publisher);
 362             this.contentLength = contentLength;
 363         }
 364 
 365         @Override
 366         public final long contentLength() {
 367             return contentLength;
 368         }
 369 
 370         @Override
 371         public final void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
 372             publisher.subscribe(subscriber);
 373         }
 374     }
 375 }