26 package jdk.incubator.http;
27
28 import java.io.File;
29 import java.io.FileInputStream;
30 import java.io.IOException;
31 import java.io.InputStream;
32 import java.io.UncheckedIOException;
33 import java.nio.ByteBuffer;
34 import java.nio.charset.Charset;
35 import java.nio.file.Path;
36 import java.security.AccessControlContext;
37 import java.security.AccessController;
38 import java.security.PrivilegedAction;
39 import java.security.PrivilegedActionException;
40 import java.security.PrivilegedExceptionAction;
41 import java.util.ArrayList;
42 import java.util.Collections;
43 import java.util.Iterator;
44 import java.util.List;
45 import java.util.NoSuchElementException;
46 import java.util.concurrent.ConcurrentLinkedQueue;
47 import java.util.concurrent.Flow;
48 import java.util.function.Supplier;
49 import jdk.incubator.http.HttpRequest.BodyPublisher;
50 import jdk.incubator.http.internal.common.Utils;
51
52 class RequestPublishers {
53
54 static class ByteArrayPublisher implements HttpRequest.BodyPublisher {
55 private volatile Flow.Publisher<ByteBuffer> delegate;
56 private final int length;
57 private final byte[] content;
58 private final int offset;
59 private final int bufSize;
60
61 ByteArrayPublisher(byte[] content) {
62 this(content, 0, content.length);
63 }
64
65 ByteArrayPublisher(byte[] content, int offset, int length) {
92 @Override
93 public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
94 List<ByteBuffer> copy = copy(content, offset, length);
95 this.delegate = new PullPublisher<>(copy);
96 delegate.subscribe(subscriber);
97 }
98
99 @Override
100 public long contentLength() {
101 return length;
102 }
103 }
104
105 // This implementation has lots of room for improvement.
106 static class IterablePublisher implements HttpRequest.BodyPublisher {
107 private volatile Flow.Publisher<ByteBuffer> delegate;
108 private final Iterable<byte[]> content;
109 private volatile long contentLength;
110
111 IterablePublisher(Iterable<byte[]> content) {
112 this.content = content;
113 }
114
115 // The ByteBufferIterator will iterate over the byte[] arrays in
116 // the content one at the time.
117 //
118 class ByteBufferIterator implements Iterator<ByteBuffer> {
119 final ConcurrentLinkedQueue<ByteBuffer> buffers = new ConcurrentLinkedQueue<>();
120 final Iterator<byte[]> iterator = content.iterator();
121 @Override
122 public boolean hasNext() {
123 return !buffers.isEmpty() || iterator.hasNext();
124 }
125
126 @Override
127 public ByteBuffer next() {
128 ByteBuffer buffer = buffers.poll();
129 while (buffer == null) {
130 copy();
131 buffer = buffers.poll();
132 }
306 return haveNext;
307 }
308 return haveNext;
309 }
310
311 @Override
312 public synchronized ByteBuffer next() {
313 if (!hasNext()) {
314 throw new NoSuchElementException();
315 }
316 need2Read = true;
317 return nextBuffer;
318 }
319
320 }
321
322 static class InputStreamPublisher implements BodyPublisher {
323 private final Supplier<? extends InputStream> streamSupplier;
324
325 InputStreamPublisher(Supplier<? extends InputStream> streamSupplier) {
326 this.streamSupplier = streamSupplier;
327 }
328
329 @Override
330 public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
331 PullPublisher<ByteBuffer> publisher;
332 InputStream is = streamSupplier.get();
333 if (is == null) {
334 Throwable t = new IOException("streamSupplier returned null");
335 publisher = new PullPublisher<>(null, t);
336 } else {
337 publisher = new PullPublisher<>(iterableOf(is), null);
338 }
339 publisher.subscribe(subscriber);
340 }
341
342 protected Iterable<ByteBuffer> iterableOf(InputStream is) {
343 return () -> new StreamIterator(is);
344 }
345
346 @Override
|
26 package jdk.incubator.http;
27
28 import java.io.File;
29 import java.io.FileInputStream;
30 import java.io.IOException;
31 import java.io.InputStream;
32 import java.io.UncheckedIOException;
33 import java.nio.ByteBuffer;
34 import java.nio.charset.Charset;
35 import java.nio.file.Path;
36 import java.security.AccessControlContext;
37 import java.security.AccessController;
38 import java.security.PrivilegedAction;
39 import java.security.PrivilegedActionException;
40 import java.security.PrivilegedExceptionAction;
41 import java.util.ArrayList;
42 import java.util.Collections;
43 import java.util.Iterator;
44 import java.util.List;
45 import java.util.NoSuchElementException;
46 import java.util.Objects;
47 import java.util.concurrent.ConcurrentLinkedQueue;
48 import java.util.concurrent.Flow;
49 import java.util.function.Supplier;
50 import jdk.incubator.http.HttpRequest.BodyPublisher;
51 import jdk.incubator.http.internal.common.Utils;
52
53 class RequestPublishers {
54
55 static class ByteArrayPublisher implements HttpRequest.BodyPublisher {
56 private volatile Flow.Publisher<ByteBuffer> delegate;
57 private final int length;
58 private final byte[] content;
59 private final int offset;
60 private final int bufSize;
61
62 ByteArrayPublisher(byte[] content) {
63 this(content, 0, content.length);
64 }
65
66 ByteArrayPublisher(byte[] content, int offset, int length) {
93 @Override
94 public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
95 List<ByteBuffer> copy = copy(content, offset, length);
96 this.delegate = new PullPublisher<>(copy);
97 delegate.subscribe(subscriber);
98 }
99
100 @Override
101 public long contentLength() {
102 return length;
103 }
104 }
105
106 // This implementation has lots of room for improvement.
107 static class IterablePublisher implements HttpRequest.BodyPublisher {
108 private volatile Flow.Publisher<ByteBuffer> delegate;
109 private final Iterable<byte[]> content;
110 private volatile long contentLength;
111
112 IterablePublisher(Iterable<byte[]> content) {
113 this.content = Objects.requireNonNull(content);
114 }
115
116 // The ByteBufferIterator will iterate over the byte[] arrays in
117 // the content one at the time.
118 //
119 class ByteBufferIterator implements Iterator<ByteBuffer> {
120 final ConcurrentLinkedQueue<ByteBuffer> buffers = new ConcurrentLinkedQueue<>();
121 final Iterator<byte[]> iterator = content.iterator();
122 @Override
123 public boolean hasNext() {
124 return !buffers.isEmpty() || iterator.hasNext();
125 }
126
127 @Override
128 public ByteBuffer next() {
129 ByteBuffer buffer = buffers.poll();
130 while (buffer == null) {
131 copy();
132 buffer = buffers.poll();
133 }
307 return haveNext;
308 }
309 return haveNext;
310 }
311
312 @Override
313 public synchronized ByteBuffer next() {
314 if (!hasNext()) {
315 throw new NoSuchElementException();
316 }
317 need2Read = true;
318 return nextBuffer;
319 }
320
321 }
322
323 static class InputStreamPublisher implements BodyPublisher {
324 private final Supplier<? extends InputStream> streamSupplier;
325
326 InputStreamPublisher(Supplier<? extends InputStream> streamSupplier) {
327 this.streamSupplier = Objects.requireNonNull(streamSupplier);
328 }
329
330 @Override
331 public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
332 PullPublisher<ByteBuffer> publisher;
333 InputStream is = streamSupplier.get();
334 if (is == null) {
335 Throwable t = new IOException("streamSupplier returned null");
336 publisher = new PullPublisher<>(null, t);
337 } else {
338 publisher = new PullPublisher<>(iterableOf(is), null);
339 }
340 publisher.subscribe(subscriber);
341 }
342
343 protected Iterable<ByteBuffer> iterableOf(InputStream is) {
344 return () -> new StreamIterator(is);
345 }
346
347 @Override
|