1 /* 2 * Copyright (c) 2016, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package jdk.incubator.http; 27 28 import java.io.IOException; 29 import java.io.UncheckedIOException; 30 import java.net.URI; 31 import java.nio.ByteBuffer; 32 import java.nio.channels.FileChannel; 33 import java.nio.file.Files; 34 import java.nio.file.OpenOption; 35 import java.nio.file.Path; 36 import java.util.ArrayList; 37 import java.util.Collections; 38 import java.util.HashMap; 39 import java.util.List; 40 import java.util.Map; 41 import java.util.Optional; 42 import java.util.concurrent.CompletableFuture; 43 import java.util.concurrent.CompletionStage; 44 import java.util.concurrent.ConcurrentHashMap; 45 import java.util.concurrent.Flow; 46 import java.util.function.Consumer; 47 import java.util.function.Function; 48 import jdk.incubator.http.internal.common.MinimalFuture; 49 import jdk.incubator.http.internal.common.Utils; 50 import jdk.incubator.http.internal.common.Log; 51 52 class ResponseProcessors { 53 54 static class ConsumerProcessor implements HttpResponse.BodyProcessor<Void> { 55 private final Consumer<Optional<byte[]>> consumer; 56 private Flow.Subscription subscription; 57 private final CompletableFuture<Void> result = new MinimalFuture<>(); 58 59 ConsumerProcessor(Consumer<Optional<byte[]>> consumer) { 60 this.consumer = consumer; 61 } 62 63 @Override 64 public CompletionStage<Void> getBody() { 65 return result; 66 } 67 68 @Override 69 public void onSubscribe(Flow.Subscription subscription) { 70 this.subscription = subscription; 71 subscription.request(1); 72 } 73 74 @Override 75 public void onNext(List<ByteBuffer> items) { 76 for (ByteBuffer item : items) { 77 byte[] buf = new byte[item.remaining()]; 78 item.get(buf); 79 consumer.accept(Optional.of(buf)); 80 } 81 subscription.request(1); 82 } 83 84 @Override 85 public void onError(Throwable throwable) { 86 result.completeExceptionally(throwable); 87 } 88 89 @Override 90 public void onComplete() { 91 consumer.accept(Optional.empty()); 92 result.complete(null); 93 } 94 95 } 96 97 static class PathProcessor implements HttpResponse.BodyProcessor<Path> { 98 99 private final Path file; 100 private final CompletableFuture<Path> result = new MinimalFuture<>(); 101 102 private Flow.Subscription subscription; 103 private FileChannel out; 104 private final OpenOption[] options; 105 106 PathProcessor(Path file, OpenOption... options) { 107 this.file = file; 108 this.options = options; 109 } 110 111 @Override 112 public void onSubscribe(Flow.Subscription subscription) { 113 this.subscription = subscription; 114 try { 115 out = FileChannel.open(file, options); 116 } catch (IOException e) { 117 result.completeExceptionally(e); 118 subscription.cancel(); 119 return; 120 } 121 subscription.request(1); 122 } 123 124 @Override 125 public void onNext(List<ByteBuffer> items) { 126 try { 127 for (ByteBuffer item : items) { 128 out.write(item); 129 } 130 } catch (IOException ex) { 131 Utils.close(out); 132 subscription.cancel(); 133 result.completeExceptionally(ex); 134 } 135 subscription.request(1); 136 } 137 138 @Override 139 public void onError(Throwable e) { 140 result.completeExceptionally(e); 141 Utils.close(out); 142 } 143 144 @Override 145 public void onComplete() { 146 Utils.close(out); 147 result.complete(file); 148 } 149 150 @Override 151 public CompletionStage<Path> getBody() { 152 return result; 153 } 154 } 155 156 static class ByteArrayProcessor<T> implements HttpResponse.BodyProcessor<T> { 157 private final Function<byte[], T> finisher; 158 private final CompletableFuture<T> result = new MinimalFuture<>(); 159 private final List<ByteBuffer> received = new ArrayList<>(); 160 161 private Flow.Subscription subscription; 162 163 ByteArrayProcessor(Function<byte[],T> finisher) { 164 this.finisher = finisher; 165 } 166 167 @Override 168 public void onSubscribe(Flow.Subscription subscription) { 169 if (this.subscription != null) { 170 subscription.cancel(); 171 return; 172 } 173 this.subscription = subscription; 174 // We can handle whatever you've got 175 subscription.request(Long.MAX_VALUE); 176 } 177 178 @Override 179 public void onNext(List<ByteBuffer> items) { 180 // incoming buffers are allocated by http client internally, 181 // and won't be used anywhere except this place. 182 // So it's free simply to store them for further processing. 183 if(Utils.remaining(items) > 0) { 184 received.addAll(items); 185 } 186 } 187 188 @Override 189 public void onError(Throwable throwable) { 190 received.clear(); 191 result.completeExceptionally(throwable); 192 } 193 194 static private byte[] join(List<ByteBuffer> bytes) { 195 int size = Utils.remaining(bytes); 196 byte[] res = new byte[size]; 197 int from = 0; 198 for (ByteBuffer b : bytes) { 199 int l = b.remaining(); 200 b.get(res, from, l); 201 from += l; 202 } 203 return res; 204 } 205 206 @Override 207 public void onComplete() { 208 try { 209 result.complete(finisher.apply(join(received))); 210 received.clear(); 211 } catch (IllegalArgumentException e) { 212 result.completeExceptionally(e); 213 } 214 } 215 216 @Override 217 public CompletionStage<T> getBody() { 218 return result; 219 } 220 } 221 222 static class MultiProcessorImpl<V> implements HttpResponse.MultiProcessor<MultiMapResult<V>,V> { 223 private final MultiMapResult<V> results; 224 private final Function<HttpRequest,Optional<HttpResponse.BodyHandler<V>>> pushHandler; 225 private final boolean completion; // aggregate completes on last PP received or overall completion 226 227 MultiProcessorImpl(Function<HttpRequest,Optional<HttpResponse.BodyHandler<V>>> pushHandler, boolean completion) { 228 this.results = new MultiMapResult<V>(new ConcurrentHashMap<>()); 229 this.pushHandler = pushHandler; 230 this.completion = completion; 231 } 232 233 @Override 234 public Optional<HttpResponse.BodyHandler<V>> onRequest(HttpRequest request) { 235 return pushHandler.apply(request); 236 } 237 238 @Override 239 public void onResponse(HttpResponse<V> response) { 240 results.put(response.request(), CompletableFuture.completedFuture(response)); 241 } 242 243 @Override 244 public void onError(HttpRequest request, Throwable t) { 245 results.put(request, MinimalFuture.failedFuture(t)); 246 } 247 248 @Override 249 public CompletableFuture<MultiMapResult<V>> completion( 250 CompletableFuture<Void> onComplete, CompletableFuture<Void> onFinalPushPromise) { 251 if (completion) 252 return onComplete.thenApply((ignored)-> results); 253 else 254 return onFinalPushPromise.thenApply((ignored) -> results); 255 } 256 } 257 258 static class MultiFile { 259 260 final Path pathRoot; 261 262 MultiFile(Path destination) { 263 if (!destination.toFile().isDirectory()) 264 throw new UncheckedIOException(new IOException("destination is not a directory")); 265 pathRoot = destination; 266 } 267 268 Optional<HttpResponse.BodyHandler<Path>> handlePush(HttpRequest request) { 269 final URI uri = request.uri(); 270 String path = uri.getPath(); 271 while (path.startsWith("/")) 272 path = path.substring(1); 273 Path p = pathRoot.resolve(path); 274 if (Log.trace()) { 275 Log.logTrace("Creating file body processor for URI={0}, path={1}", 276 uri, p); 277 } 278 try { 279 Files.createDirectories(p.getParent()); 280 } catch (IOException ex) { 281 throw new UncheckedIOException(ex); 282 } 283 284 final HttpResponse.BodyHandler<Path> proc = 285 HttpResponse.BodyHandler.asFile(p); 286 287 return Optional.of(proc); 288 } 289 } 290 291 /** 292 * Currently this consumes all of the data and ignores it 293 */ 294 static class NullProcessor<T> implements HttpResponse.BodyProcessor<T> { 295 296 Flow.Subscription subscription; 297 final CompletableFuture<T> cf = new MinimalFuture<>(); 298 final Optional<T> result; 299 300 NullProcessor(Optional<T> result) { 301 this.result = result; 302 } 303 304 @Override 305 public void onSubscribe(Flow.Subscription subscription) { 306 this.subscription = subscription; 307 subscription.request(Long.MAX_VALUE); 308 } 309 310 @Override 311 public void onNext(List<ByteBuffer> items) { 312 // TODO: check whether this should consume the buffer, as in: 313 for (ByteBuffer item : items) { 314 item.position(item.limit()); 315 } 316 } 317 318 @Override 319 public void onError(Throwable throwable) { 320 cf.completeExceptionally(throwable); 321 } 322 323 @Override 324 public void onComplete() { 325 if (result.isPresent()) { 326 cf.complete(result.get()); 327 } else { 328 cf.complete(null); 329 } 330 } 331 332 @Override 333 public CompletionStage<T> getBody() { 334 return cf; 335 } 336 } 337 }