1 /* 2 * Copyright (c) 2016, 2017, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package jdk.incubator.http; 27 28 import java.io.File; 29 import java.io.FileInputStream; 30 import java.io.IOException; 31 import java.io.InputStream; 32 import java.io.UncheckedIOException; 33 import java.nio.ByteBuffer; 34 import java.nio.charset.Charset; 35 import java.nio.file.Path; 36 import java.security.AccessControlContext; 37 import java.security.AccessController; 38 import java.security.PrivilegedAction; 39 import java.security.PrivilegedActionException; 40 import java.security.PrivilegedExceptionAction; 41 import java.util.ArrayList; 42 import java.util.Collections; 43 import java.util.Iterator; 44 import java.util.List; 45 import java.util.NoSuchElementException; 46 import java.util.concurrent.ConcurrentLinkedQueue; 47 import java.util.concurrent.Flow; 48 import java.util.function.Supplier; 49 import jdk.incubator.http.HttpRequest.BodyPublisher; 50 import jdk.incubator.http.internal.common.Utils; 51 52 class RequestPublishers { 53 54 static class ByteArrayPublisher implements HttpRequest.BodyPublisher { 55 private volatile Flow.Publisher<ByteBuffer> delegate; 56 private final int length; 57 private final byte[] content; 58 private final int offset; 59 private final int bufSize; 60 61 ByteArrayPublisher(byte[] content) { 62 this(content, 0, content.length); 63 } 64 65 ByteArrayPublisher(byte[] content, int offset, int length) { 66 this(content, offset, length, Utils.BUFSIZE); 67 } 68 69 /* bufSize exposed for testing purposes */ 70 ByteArrayPublisher(byte[] content, int offset, int length, int bufSize) { 71 this.content = content; 72 this.offset = offset; 73 this.length = length; 74 this.bufSize = bufSize; 75 } 76 77 List<ByteBuffer> copy(byte[] content, int offset, int length) { 78 List<ByteBuffer> bufs = new ArrayList<>(); 79 while (length > 0) { 80 ByteBuffer b = ByteBuffer.allocate(Math.min(bufSize, length)); 81 int max = b.capacity(); 82 int tocopy = Math.min(max, length); 83 b.put(content, offset, tocopy); 84 offset += tocopy; 85 length -= tocopy; 86 b.flip(); 87 bufs.add(b); 88 } 89 return bufs; 90 } 91 92 @Override 93 public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) { 94 List<ByteBuffer> copy = copy(content, offset, length); 95 this.delegate = new PullPublisher<>(copy); 96 delegate.subscribe(subscriber); 97 } 98 99 @Override 100 public long contentLength() { 101 return length; 102 } 103 } 104 105 // This implementation has lots of room for improvement. 106 static class IterablePublisher implements HttpRequest.BodyPublisher { 107 private volatile Flow.Publisher<ByteBuffer> delegate; 108 private final Iterable<byte[]> content; 109 private volatile long contentLength; 110 111 IterablePublisher(Iterable<byte[]> content) { 112 this.content = content; 113 } 114 115 // The ByteBufferIterator will iterate over the byte[] arrays in 116 // the content one at the time. 117 // 118 class ByteBufferIterator implements Iterator<ByteBuffer> { 119 final ConcurrentLinkedQueue<ByteBuffer> buffers = new ConcurrentLinkedQueue<>(); 120 final Iterator<byte[]> iterator = content.iterator(); 121 @Override 122 public boolean hasNext() { 123 return !buffers.isEmpty() || iterator.hasNext(); 124 } 125 126 @Override 127 public ByteBuffer next() { 128 ByteBuffer buffer = buffers.poll(); 129 while (buffer == null) { 130 copy(); 131 buffer = buffers.poll(); 132 } 133 return buffer; 134 } 135 136 ByteBuffer getBuffer() { 137 return Utils.getBuffer(); 138 } 139 140 void copy() { 141 byte[] bytes = iterator.next(); 142 int length = bytes.length; 143 if (length == 0 && iterator.hasNext()) { 144 // avoid inserting empty buffers, except 145 // if that's the last. 146 return; 147 } 148 int offset = 0; 149 do { 150 ByteBuffer b = getBuffer(); 151 int max = b.capacity(); 152 153 int tocopy = Math.min(max, length); 154 b.put(bytes, offset, tocopy); 155 offset += tocopy; 156 length -= tocopy; 157 b.flip(); 158 buffers.add(b); 159 } while (length > 0); 160 } 161 } 162 163 public Iterator<ByteBuffer> iterator() { 164 return new ByteBufferIterator(); 165 } 166 167 @Override 168 public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) { 169 Iterable<ByteBuffer> iterable = this::iterator; 170 this.delegate = new PullPublisher<>(iterable); 171 delegate.subscribe(subscriber); 172 } 173 174 static long computeLength(Iterable<byte[]> bytes) { 175 long len = 0; 176 for (byte[] b : bytes) { 177 len = Math.addExact(len, (long)b.length); 178 } 179 return len; 180 } 181 182 @Override 183 public long contentLength() { 184 if (contentLength == 0) { 185 synchronized(this) { 186 if (contentLength == 0) { 187 contentLength = computeLength(content); 188 } 189 } 190 } 191 return contentLength; 192 } 193 } 194 195 static class StringPublisher extends ByteArrayPublisher { 196 public StringPublisher(String content, Charset charset) { 197 super(content.getBytes(charset)); 198 } 199 } 200 201 static class EmptyPublisher implements HttpRequest.BodyPublisher { 202 private final Flow.Publisher<ByteBuffer> delegate = 203 new PullPublisher<ByteBuffer>(Collections.emptyList(), null); 204 205 @Override 206 public long contentLength() { 207 return 0; 208 } 209 210 @Override 211 public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) { 212 delegate.subscribe(subscriber); 213 } 214 } 215 216 static class FilePublisher implements BodyPublisher { 217 private final File file; 218 private volatile AccessControlContext acc; 219 220 FilePublisher(Path name) { 221 file = name.toFile(); 222 } 223 224 void setAccessControlContext(AccessControlContext acc) { 225 this.acc = acc; 226 } 227 228 @Override 229 public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) { 230 if (System.getSecurityManager() != null && acc == null) 231 throw new InternalError( 232 "Unexpected null acc when security manager has been installed"); 233 234 InputStream is; 235 try { 236 PrivilegedExceptionAction<FileInputStream> pa = 237 () -> new FileInputStream(file); 238 is = AccessController.doPrivileged(pa, acc); 239 } catch (PrivilegedActionException pae) { 240 throw new UncheckedIOException((IOException)pae.getCause()); 241 } 242 PullPublisher<ByteBuffer> publisher = 243 new PullPublisher<>(() -> new StreamIterator(is)); 244 publisher.subscribe(subscriber); 245 } 246 247 @Override 248 public long contentLength() { 249 assert System.getSecurityManager() != null ? acc != null: true; 250 PrivilegedAction<Long> pa = () -> file.length(); 251 return AccessController.doPrivileged(pa, acc); 252 } 253 } 254 255 /** 256 * Reads one buffer ahead all the time, blocking in hasNext() 257 */ 258 static class StreamIterator implements Iterator<ByteBuffer> { 259 final InputStream is; 260 final Supplier<? extends ByteBuffer> bufSupplier; 261 volatile ByteBuffer nextBuffer; 262 volatile boolean need2Read = true; 263 volatile boolean haveNext; 264 265 StreamIterator(InputStream is) { 266 this(is, Utils::getBuffer); 267 } 268 269 StreamIterator(InputStream is, Supplier<? extends ByteBuffer> bufSupplier) { 270 this.is = is; 271 this.bufSupplier = bufSupplier; 272 } 273 274 // Throwable error() { 275 // return error; 276 // } 277 278 private int read() { 279 nextBuffer = bufSupplier.get(); 280 nextBuffer.clear(); 281 byte[] buf = nextBuffer.array(); 282 int offset = nextBuffer.arrayOffset(); 283 int cap = nextBuffer.capacity(); 284 try { 285 int n = is.read(buf, offset, cap); 286 if (n == -1) { 287 is.close(); 288 return -1; 289 } 290 //flip 291 nextBuffer.limit(n); 292 nextBuffer.position(0); 293 return n; 294 } catch (IOException ex) { 295 return -1; 296 } 297 } 298 299 @Override 300 public synchronized boolean hasNext() { 301 if (need2Read) { 302 haveNext = read() != -1; 303 if (haveNext) { 304 need2Read = false; 305 } 306 return haveNext; 307 } 308 return haveNext; 309 } 310 311 @Override 312 public synchronized ByteBuffer next() { 313 if (!hasNext()) { 314 throw new NoSuchElementException(); 315 } 316 need2Read = true; 317 return nextBuffer; 318 } 319 320 } 321 322 static class InputStreamPublisher implements BodyPublisher { 323 private final Supplier<? extends InputStream> streamSupplier; 324 325 InputStreamPublisher(Supplier<? extends InputStream> streamSupplier) { 326 this.streamSupplier = streamSupplier; 327 } 328 329 @Override 330 public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) { 331 PullPublisher<ByteBuffer> publisher; 332 InputStream is = streamSupplier.get(); 333 if (is == null) { 334 Throwable t = new IOException("streamSupplier returned null"); 335 publisher = new PullPublisher<>(null, t); 336 } else { 337 publisher = new PullPublisher<>(iterableOf(is), null); 338 } 339 publisher.subscribe(subscriber); 340 } 341 342 protected Iterable<ByteBuffer> iterableOf(InputStream is) { 343 return () -> new StreamIterator(is); 344 } 345 346 @Override 347 public long contentLength() { 348 return -1; 349 } 350 } 351 }