1 /* 2 * Copyright (c) 2016, 2017, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package jdk.incubator.http; 27 28 import java.io.File; 29 import java.io.FileInputStream; 30 import java.io.IOException; 31 import java.io.InputStream; 32 import java.io.UncheckedIOException; 33 import java.nio.ByteBuffer; 34 import java.nio.charset.Charset; 35 import java.nio.file.Path; 36 import java.security.AccessControlContext; 37 import java.security.AccessController; 38 import java.security.PrivilegedAction; 39 import java.security.PrivilegedActionException; 40 import java.security.PrivilegedExceptionAction; 41 import java.util.ArrayList; 42 import java.util.Collections; 43 import java.util.Iterator; 44 import java.util.List; 45 import java.util.NoSuchElementException; 46 import java.util.Objects; 47 import java.util.concurrent.ConcurrentLinkedQueue; 48 import java.util.concurrent.Flow; 49 import java.util.concurrent.Flow.Publisher; 50 import java.util.function.Supplier; 51 import jdk.incubator.http.HttpRequest.BodyPublisher; 52 import jdk.incubator.http.internal.common.Utils; 53 54 class RequestPublishers { 55 56 static class ByteArrayPublisher implements HttpRequest.BodyPublisher { 57 private volatile Flow.Publisher<ByteBuffer> delegate; 58 private final int length; 59 private final byte[] content; 60 private final int offset; 61 private final int bufSize; 62 63 ByteArrayPublisher(byte[] content) { 64 this(content, 0, content.length); 65 } 66 67 ByteArrayPublisher(byte[] content, int offset, int length) { 68 this(content, offset, length, Utils.BUFSIZE); 69 } 70 71 /* bufSize exposed for testing purposes */ 72 ByteArrayPublisher(byte[] content, int offset, int length, int bufSize) { 73 this.content = content; 74 this.offset = offset; 75 this.length = length; 76 this.bufSize = bufSize; 77 } 78 79 List<ByteBuffer> copy(byte[] content, int offset, int length) { 80 List<ByteBuffer> bufs = new ArrayList<>(); 81 while (length > 0) { 82 ByteBuffer b = ByteBuffer.allocate(Math.min(bufSize, length)); 83 int max = b.capacity(); 84 int tocopy = Math.min(max, length); 85 b.put(content, offset, tocopy); 86 offset += tocopy; 87 length -= tocopy; 88 b.flip(); 89 bufs.add(b); 90 } 91 return bufs; 92 } 93 94 @Override 95 public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) { 96 List<ByteBuffer> copy = copy(content, offset, length); 97 this.delegate = new PullPublisher<>(copy); 98 delegate.subscribe(subscriber); 99 } 100 101 @Override 102 public long contentLength() { 103 return length; 104 } 105 } 106 107 // This implementation has lots of room for improvement. 108 static class IterablePublisher implements HttpRequest.BodyPublisher { 109 private volatile Flow.Publisher<ByteBuffer> delegate; 110 private final Iterable<byte[]> content; 111 private volatile long contentLength; 112 113 IterablePublisher(Iterable<byte[]> content) { 114 this.content = content; 115 } 116 117 // The ByteBufferIterator will iterate over the byte[] arrays in 118 // the content one at the time. 119 // 120 class ByteBufferIterator implements Iterator<ByteBuffer> { 121 final ConcurrentLinkedQueue<ByteBuffer> buffers = new ConcurrentLinkedQueue<>(); 122 final Iterator<byte[]> iterator = content.iterator(); 123 @Override 124 public boolean hasNext() { 125 return !buffers.isEmpty() || iterator.hasNext(); 126 } 127 128 @Override 129 public ByteBuffer next() { 130 ByteBuffer buffer = buffers.poll(); 131 while (buffer == null) { 132 copy(); 133 buffer = buffers.poll(); 134 } 135 return buffer; 136 } 137 138 ByteBuffer getBuffer() { 139 return Utils.getBuffer(); 140 } 141 142 void copy() { 143 byte[] bytes = iterator.next(); 144 int length = bytes.length; 145 if (length == 0 && iterator.hasNext()) { 146 // avoid inserting empty buffers, except 147 // if that's the last. 148 return; 149 } 150 int offset = 0; 151 do { 152 ByteBuffer b = getBuffer(); 153 int max = b.capacity(); 154 155 int tocopy = Math.min(max, length); 156 b.put(bytes, offset, tocopy); 157 offset += tocopy; 158 length -= tocopy; 159 b.flip(); 160 buffers.add(b); 161 } while (length > 0); 162 } 163 } 164 165 public Iterator<ByteBuffer> iterator() { 166 return new ByteBufferIterator(); 167 } 168 169 @Override 170 public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) { 171 Iterable<ByteBuffer> iterable = this::iterator; 172 this.delegate = new PullPublisher<>(iterable); 173 delegate.subscribe(subscriber); 174 } 175 176 static long computeLength(Iterable<byte[]> bytes) { 177 long len = 0; 178 for (byte[] b : bytes) { 179 len = Math.addExact(len, (long)b.length); 180 } 181 return len; 182 } 183 184 @Override 185 public long contentLength() { 186 if (contentLength == 0) { 187 synchronized(this) { 188 if (contentLength == 0) { 189 contentLength = computeLength(content); 190 } 191 } 192 } 193 return contentLength; 194 } 195 } 196 197 static class StringPublisher extends ByteArrayPublisher { 198 public StringPublisher(String content, Charset charset) { 199 super(content.getBytes(charset)); 200 } 201 } 202 203 static class EmptyPublisher implements HttpRequest.BodyPublisher { 204 private final Flow.Publisher<ByteBuffer> delegate = 205 new PullPublisher<ByteBuffer>(Collections.emptyList(), null); 206 207 @Override 208 public long contentLength() { 209 return 0; 210 } 211 212 @Override 213 public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) { 214 delegate.subscribe(subscriber); 215 } 216 } 217 218 static class FilePublisher implements BodyPublisher { 219 private final File file; 220 private volatile AccessControlContext acc; 221 222 FilePublisher(Path name) { 223 file = name.toFile(); 224 } 225 226 void setAccessControlContext(AccessControlContext acc) { 227 this.acc = acc; 228 } 229 230 @Override 231 public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) { 232 if (System.getSecurityManager() != null && acc == null) 233 throw new InternalError( 234 "Unexpected null acc when security manager has been installed"); 235 236 InputStream is; 237 try { 238 PrivilegedExceptionAction<FileInputStream> pa = 239 () -> new FileInputStream(file); 240 is = AccessController.doPrivileged(pa, acc); 241 } catch (PrivilegedActionException pae) { 242 throw new UncheckedIOException((IOException)pae.getCause()); 243 } 244 PullPublisher<ByteBuffer> publisher = 245 new PullPublisher<>(() -> new StreamIterator(is)); 246 publisher.subscribe(subscriber); 247 } 248 249 @Override 250 public long contentLength() { 251 assert System.getSecurityManager() != null ? acc != null: true; 252 PrivilegedAction<Long> pa = () -> file.length(); 253 return AccessController.doPrivileged(pa, acc); 254 } 255 } 256 257 /** 258 * Reads one buffer ahead all the time, blocking in hasNext() 259 */ 260 static class StreamIterator implements Iterator<ByteBuffer> { 261 final InputStream is; 262 final Supplier<? extends ByteBuffer> bufSupplier; 263 volatile ByteBuffer nextBuffer; 264 volatile boolean need2Read = true; 265 volatile boolean haveNext; 266 267 StreamIterator(InputStream is) { 268 this(is, Utils::getBuffer); 269 } 270 271 StreamIterator(InputStream is, Supplier<? extends ByteBuffer> bufSupplier) { 272 this.is = is; 273 this.bufSupplier = bufSupplier; 274 } 275 276 // Throwable error() { 277 // return error; 278 // } 279 280 private int read() { 281 nextBuffer = bufSupplier.get(); 282 nextBuffer.clear(); 283 byte[] buf = nextBuffer.array(); 284 int offset = nextBuffer.arrayOffset(); 285 int cap = nextBuffer.capacity(); 286 try { 287 int n = is.read(buf, offset, cap); 288 if (n == -1) { 289 is.close(); 290 return -1; 291 } 292 //flip 293 nextBuffer.limit(n); 294 nextBuffer.position(0); 295 return n; 296 } catch (IOException ex) { 297 return -1; 298 } 299 } 300 301 @Override 302 public synchronized boolean hasNext() { 303 if (need2Read) { 304 haveNext = read() != -1; 305 if (haveNext) { 306 need2Read = false; 307 } 308 return haveNext; 309 } 310 return haveNext; 311 } 312 313 @Override 314 public synchronized ByteBuffer next() { 315 if (!hasNext()) { 316 throw new NoSuchElementException(); 317 } 318 need2Read = true; 319 return nextBuffer; 320 } 321 322 } 323 324 static class InputStreamPublisher implements BodyPublisher { 325 private final Supplier<? extends InputStream> streamSupplier; 326 327 InputStreamPublisher(Supplier<? extends InputStream> streamSupplier) { 328 this.streamSupplier = streamSupplier; 329 } 330 331 @Override 332 public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) { 333 PullPublisher<ByteBuffer> publisher; 334 InputStream is = streamSupplier.get(); 335 if (is == null) { 336 Throwable t = new IOException("streamSupplier returned null"); 337 publisher = new PullPublisher<>(null, t); 338 } else { 339 publisher = new PullPublisher<>(iterableOf(is), null); 340 } 341 publisher.subscribe(subscriber); 342 } 343 344 protected Iterable<ByteBuffer> iterableOf(InputStream is) { 345 return () -> new StreamIterator(is); 346 } 347 348 @Override 349 public long contentLength() { 350 return -1; 351 } 352 } 353 354 static final class PublisherAdapter implements BodyPublisher { 355 356 private final Publisher<? extends ByteBuffer> publisher; 357 private final long contentLength; 358 359 PublisherAdapter(Publisher<? extends ByteBuffer> publisher, 360 long contentLength) { 361 this.publisher = Objects.requireNonNull(publisher); 362 this.contentLength = contentLength; 363 } 364 365 @Override 366 public final long contentLength() { 367 return contentLength; 368 } 369 370 @Override 371 public final void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) { 372 publisher.subscribe(subscriber); 373 } 374 } 375 }