26 package jdk.incubator.http; 27 28 import java.io.File; 29 import java.io.FileInputStream; 30 import java.io.IOException; 31 import java.io.InputStream; 32 import java.io.UncheckedIOException; 33 import java.nio.ByteBuffer; 34 import java.nio.charset.Charset; 35 import java.nio.file.Path; 36 import java.security.AccessControlContext; 37 import java.security.AccessController; 38 import java.security.PrivilegedAction; 39 import java.security.PrivilegedActionException; 40 import java.security.PrivilegedExceptionAction; 41 import java.util.ArrayList; 42 import java.util.Collections; 43 import java.util.Iterator; 44 import java.util.List; 45 import java.util.NoSuchElementException; 46 import java.util.concurrent.ConcurrentLinkedQueue; 47 import java.util.concurrent.Flow; 48 import java.util.function.Supplier; 49 import jdk.incubator.http.HttpRequest.BodyPublisher; 50 import jdk.incubator.http.internal.common.Utils; 51 52 class RequestPublishers { 53 54 static class ByteArrayPublisher implements HttpRequest.BodyPublisher { 55 private volatile Flow.Publisher<ByteBuffer> delegate; 56 private final int length; 57 private final byte[] content; 58 private final int offset; 59 private final int bufSize; 60 61 ByteArrayPublisher(byte[] content) { 62 this(content, 0, content.length); 63 } 64 65 ByteArrayPublisher(byte[] content, int offset, int length) { 66 this(content, offset, length, Utils.BUFSIZE); 67 } 329 @Override 330 public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) { 331 PullPublisher<ByteBuffer> publisher; 332 InputStream is = streamSupplier.get(); 333 if (is == null) { 334 Throwable t = new IOException("streamSupplier returned null"); 335 publisher = new PullPublisher<>(null, t); 336 } else { 337 publisher = new PullPublisher<>(iterableOf(is), null); 338 } 339 publisher.subscribe(subscriber); 340 } 341 342 protected Iterable<ByteBuffer> iterableOf(InputStream is) { 343 return () -> new StreamIterator(is); 344 } 345 346 @Override 347 public long contentLength() { 348 return -1; 349 } 350 } 351 } | 26 package jdk.incubator.http; 27 28 import java.io.File; 29 import java.io.FileInputStream; 30 import java.io.IOException; 31 import java.io.InputStream; 32 import java.io.UncheckedIOException; 33 import java.nio.ByteBuffer; 34 import java.nio.charset.Charset; 35 import java.nio.file.Path; 36 import java.security.AccessControlContext; 37 import java.security.AccessController; 38 import java.security.PrivilegedAction; 39 import java.security.PrivilegedActionException; 40 import java.security.PrivilegedExceptionAction; 41 import java.util.ArrayList; 42 import java.util.Collections; 43 import java.util.Iterator; 44 import java.util.List; 45 import java.util.NoSuchElementException; 46 import java.util.Objects; 47 import java.util.concurrent.ConcurrentLinkedQueue; 48 import java.util.concurrent.Flow; 49 import java.util.concurrent.Flow.Publisher; 50 import java.util.function.Supplier; 51 import jdk.incubator.http.HttpRequest.BodyPublisher; 52 import jdk.incubator.http.internal.common.Utils; 53 54 class RequestPublishers { 55 56 static class ByteArrayPublisher implements HttpRequest.BodyPublisher { 57 private volatile Flow.Publisher<ByteBuffer> delegate; 58 private final int length; 59 private final byte[] content; 60 private final int offset; 61 private final int bufSize; 62 63 ByteArrayPublisher(byte[] content) { 64 this(content, 0, content.length); 65 } 66 67 ByteArrayPublisher(byte[] content, int offset, int length) { 68 this(content, offset, length, Utils.BUFSIZE); 69 } 331 @Override 332 public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) { 333 PullPublisher<ByteBuffer> publisher; 334 InputStream is = streamSupplier.get(); 335 if (is == null) { 336 Throwable t = new IOException("streamSupplier returned null"); 337 publisher = new PullPublisher<>(null, t); 338 } else { 339 publisher = new PullPublisher<>(iterableOf(is), null); 340 } 341 publisher.subscribe(subscriber); 342 } 343 344 protected Iterable<ByteBuffer> iterableOf(InputStream is) { 345 return () -> new StreamIterator(is); 346 } 347 348 @Override 349 public long contentLength() { 350 return -1; 351 } 352 } 353 354 static final class PublisherAdapter implements BodyPublisher { 355 356 private final Publisher<? extends ByteBuffer> publisher; 357 private final long contentLength; 358 359 PublisherAdapter(Publisher<? extends ByteBuffer> publisher, 360 long contentLength) { 361 this.publisher = Objects.requireNonNull(publisher); 362 this.contentLength = contentLength; 363 } 364 365 @Override 366 public final long contentLength() { 367 return contentLength; 368 } 369 370 @Override 371 public final void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) { 372 publisher.subscribe(subscriber); 373 } 374 } 375 } |