55 private final Consumer<Optional<byte[]>> consumer;
56 private Flow.Subscription subscription;
57 private final CompletableFuture<Void> result = new MinimalFuture<>();
58
59 ConsumerProcessor(Consumer<Optional<byte[]>> consumer) {
60 this.consumer = consumer;
61 }
62
63 @Override
64 public CompletionStage<Void> getBody() {
65 return result;
66 }
67
68 @Override
69 public void onSubscribe(Flow.Subscription subscription) {
70 this.subscription = subscription;
71 subscription.request(1);
72 }
73
74 @Override
75 public void onNext(ByteBuffer item) {
76 byte[] buf = new byte[item.remaining()];
77 item.get(buf);
78 consumer.accept(Optional.of(buf));
79 subscription.request(1);
80 }
81
82 @Override
83 public void onError(Throwable throwable) {
84 result.completeExceptionally(throwable);
85 }
86
87 @Override
88 public void onComplete() {
89 consumer.accept(Optional.empty());
90 result.complete(null);
91 }
92
93 }
94
95 static class PathProcessor implements HttpResponse.BodyProcessor<Path> {
96
97 private final Path file;
98 private final CompletableFuture<Path> result = new MinimalFuture<>();
103
104 PathProcessor(Path file, OpenOption... options) {
105 this.file = file;
106 this.options = options;
107 }
108
109 @Override
110 public void onSubscribe(Flow.Subscription subscription) {
111 this.subscription = subscription;
112 try {
113 out = FileChannel.open(file, options);
114 } catch (IOException e) {
115 result.completeExceptionally(e);
116 subscription.cancel();
117 return;
118 }
119 subscription.request(1);
120 }
121
122 @Override
123 public void onNext(ByteBuffer item) {
124 try {
125 out.write(item);
126 } catch (IOException ex) {
127 Utils.close(out);
128 subscription.cancel();
129 result.completeExceptionally(ex);
130 }
131 subscription.request(1);
132 }
133
134 @Override
135 public void onError(Throwable e) {
136 result.completeExceptionally(e);
137 Utils.close(out);
138 }
139
140 @Override
141 public void onComplete() {
142 Utils.close(out);
143 result.complete(file);
144 }
145
155 private final List<ByteBuffer> received = new ArrayList<>();
156
157 private Flow.Subscription subscription;
158
159 ByteArrayProcessor(Function<byte[],T> finisher) {
160 this.finisher = finisher;
161 }
162
163 @Override
164 public void onSubscribe(Flow.Subscription subscription) {
165 if (this.subscription != null) {
166 subscription.cancel();
167 return;
168 }
169 this.subscription = subscription;
170 // We can handle whatever you've got
171 subscription.request(Long.MAX_VALUE);
172 }
173
174 @Override
175 public void onNext(ByteBuffer item) {
176 // incoming buffers are allocated by http client internally,
177 // and won't be used anywhere except this place.
178 // So it's free simply to store them for further processing.
179 if(item.hasRemaining()) {
180 received.add(item);
181 }
182 }
183
184 @Override
185 public void onError(Throwable throwable) {
186 received.clear();
187 result.completeExceptionally(throwable);
188 }
189
190 static private byte[] join(List<ByteBuffer> bytes) {
191 int size = Utils.remaining(bytes);
192 byte[] res = new byte[size];
193 int from = 0;
194 for (ByteBuffer b : bytes) {
195 int l = b.remaining();
196 b.get(res, from, l);
197 from += l;
198 }
199 return res;
200 }
287 /**
288 * Currently this consumes all of the data and ignores it
289 */
290 static class NullProcessor<T> implements HttpResponse.BodyProcessor<T> {
291
292 Flow.Subscription subscription;
293 final CompletableFuture<T> cf = new MinimalFuture<>();
294 final Optional<T> result;
295
296 NullProcessor(Optional<T> result) {
297 this.result = result;
298 }
299
300 @Override
301 public void onSubscribe(Flow.Subscription subscription) {
302 this.subscription = subscription;
303 subscription.request(Long.MAX_VALUE);
304 }
305
306 @Override
307 public void onNext(ByteBuffer item) {
308 // TODO: check whether this should consume the buffer, as in:
309 item.position(item.limit());
310 }
311
312 @Override
313 public void onError(Throwable throwable) {
314 cf.completeExceptionally(throwable);
315 }
316
317 @Override
318 public void onComplete() {
319 if (result.isPresent()) {
320 cf.complete(result.get());
321 } else {
322 cf.complete(null);
323 }
324 }
325
326 @Override
327 public CompletionStage<T> getBody() {
328 return cf;
329 }
330 }
|
55 private final Consumer<Optional<byte[]>> consumer;
56 private Flow.Subscription subscription;
57 private final CompletableFuture<Void> result = new MinimalFuture<>();
58
59 ConsumerProcessor(Consumer<Optional<byte[]>> consumer) {
60 this.consumer = consumer;
61 }
62
63 @Override
64 public CompletionStage<Void> getBody() {
65 return result;
66 }
67
68 @Override
69 public void onSubscribe(Flow.Subscription subscription) {
70 this.subscription = subscription;
71 subscription.request(1);
72 }
73
74 @Override
75 public void onNext(List<ByteBuffer> items) {
76 for (ByteBuffer item : items) {
77 byte[] buf = new byte[item.remaining()];
78 item.get(buf);
79 consumer.accept(Optional.of(buf));
80 }
81 subscription.request(1);
82 }
83
84 @Override
85 public void onError(Throwable throwable) {
86 result.completeExceptionally(throwable);
87 }
88
89 @Override
90 public void onComplete() {
91 consumer.accept(Optional.empty());
92 result.complete(null);
93 }
94
95 }
96
97 static class PathProcessor implements HttpResponse.BodyProcessor<Path> {
98
99 private final Path file;
100 private final CompletableFuture<Path> result = new MinimalFuture<>();
105
106 PathProcessor(Path file, OpenOption... options) {
107 this.file = file;
108 this.options = options;
109 }
110
111 @Override
112 public void onSubscribe(Flow.Subscription subscription) {
113 this.subscription = subscription;
114 try {
115 out = FileChannel.open(file, options);
116 } catch (IOException e) {
117 result.completeExceptionally(e);
118 subscription.cancel();
119 return;
120 }
121 subscription.request(1);
122 }
123
124 @Override
125 public void onNext(List<ByteBuffer> items) {
126 try {
127 for (ByteBuffer item : items) {
128 out.write(item);
129 }
130 } catch (IOException ex) {
131 Utils.close(out);
132 subscription.cancel();
133 result.completeExceptionally(ex);
134 }
135 subscription.request(1);
136 }
137
138 @Override
139 public void onError(Throwable e) {
140 result.completeExceptionally(e);
141 Utils.close(out);
142 }
143
144 @Override
145 public void onComplete() {
146 Utils.close(out);
147 result.complete(file);
148 }
149
159 private final List<ByteBuffer> received = new ArrayList<>();
160
161 private Flow.Subscription subscription;
162
163 ByteArrayProcessor(Function<byte[],T> finisher) {
164 this.finisher = finisher;
165 }
166
167 @Override
168 public void onSubscribe(Flow.Subscription subscription) {
169 if (this.subscription != null) {
170 subscription.cancel();
171 return;
172 }
173 this.subscription = subscription;
174 // We can handle whatever you've got
175 subscription.request(Long.MAX_VALUE);
176 }
177
178 @Override
179 public void onNext(List<ByteBuffer> items) {
180 // incoming buffers are allocated by http client internally,
181 // and won't be used anywhere except this place.
182 // So it's free simply to store them for further processing.
183 if(Utils.remaining(items) > 0) {
184 received.addAll(items);
185 }
186 }
187
188 @Override
189 public void onError(Throwable throwable) {
190 received.clear();
191 result.completeExceptionally(throwable);
192 }
193
194 static private byte[] join(List<ByteBuffer> bytes) {
195 int size = Utils.remaining(bytes);
196 byte[] res = new byte[size];
197 int from = 0;
198 for (ByteBuffer b : bytes) {
199 int l = b.remaining();
200 b.get(res, from, l);
201 from += l;
202 }
203 return res;
204 }
291 /**
292 * Currently this consumes all of the data and ignores it
293 */
294 static class NullProcessor<T> implements HttpResponse.BodyProcessor<T> {
295
296 Flow.Subscription subscription;
297 final CompletableFuture<T> cf = new MinimalFuture<>();
298 final Optional<T> result;
299
300 NullProcessor(Optional<T> result) {
301 this.result = result;
302 }
303
304 @Override
305 public void onSubscribe(Flow.Subscription subscription) {
306 this.subscription = subscription;
307 subscription.request(Long.MAX_VALUE);
308 }
309
310 @Override
311 public void onNext(List<ByteBuffer> items) {
312 // TODO: check whether this should consume the buffer, as in:
313 for (ByteBuffer item : items) {
314 item.position(item.limit());
315 }
316 }
317
318 @Override
319 public void onError(Throwable throwable) {
320 cf.completeExceptionally(throwable);
321 }
322
323 @Override
324 public void onComplete() {
325 if (result.isPresent()) {
326 cf.complete(result.get());
327 } else {
328 cf.complete(null);
329 }
330 }
331
332 @Override
333 public CompletionStage<T> getBody() {
334 return cf;
335 }
336 }
|