1 /* 2 * Copyright (c) 2016, 2017, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package jdk.incubator.http; 27 28 import java.io.IOException; 29 import java.io.UncheckedIOException; 30 import java.net.URI; 31 import java.nio.ByteBuffer; 32 import java.nio.channels.FileChannel; 33 import java.nio.file.Files; 34 import java.nio.file.OpenOption; 35 import java.nio.file.Path; 36 import java.util.ArrayList; 37 import java.util.Collections; 38 import java.util.HashMap; 39 import java.util.List; 40 import java.util.Map; 41 import java.util.Optional; 42 import java.util.concurrent.CompletableFuture; 43 import java.util.concurrent.CompletionStage; 44 import java.util.concurrent.ConcurrentHashMap; 45 import java.util.concurrent.Flow; 46 import java.util.function.Consumer; 47 import java.util.function.Function; 48 import jdk.incubator.http.internal.common.MinimalFuture; 49 import jdk.incubator.http.internal.common.Utils; 50 import jdk.incubator.http.internal.common.Log; 51 52 class ResponseProcessors { 53 54 static class ConsumerProcessor implements HttpResponse.BodyProcessor<Void> { 55 private final Consumer<Optional<byte[]>> consumer; 56 private Flow.Subscription subscription; 57 private final CompletableFuture<Void> result = new MinimalFuture<>(); 58 59 ConsumerProcessor(Consumer<Optional<byte[]>> consumer) { 60 this.consumer = consumer; 61 } 62 63 @Override 64 public CompletionStage<Void> getBody() { 65 return result; 66 } 67 68 @Override 69 public void onSubscribe(Flow.Subscription subscription) { 70 this.subscription = subscription; 71 subscription.request(1); 72 } 73 74 @Override 75 public void onNext(List<ByteBuffer> items) { 76 for (ByteBuffer item : items) { 77 byte[] buf = new byte[item.remaining()]; 78 item.get(buf); 79 consumer.accept(Optional.of(buf)); 80 } 81 subscription.request(1); 82 } 83 84 @Override 85 public void onError(Throwable throwable) { 86 result.completeExceptionally(throwable); 87 } 88 89 @Override 90 public void onComplete() { 91 consumer.accept(Optional.empty()); 92 result.complete(null); 93 } 94 95 } 96 97 static class PathProcessor implements HttpResponse.BodyProcessor<Path> { 98 99 private final Path file; 100 private final CompletableFuture<Path> result = new MinimalFuture<>(); 101 102 private Flow.Subscription subscription; 103 private FileChannel out; 104 private final OpenOption[] options; 105 106 PathProcessor(Path file, OpenOption... options) { 107 this.file = file; 108 this.options = options; 109 } 110 111 @Override 112 public void onSubscribe(Flow.Subscription subscription) { 113 this.subscription = subscription; 114 try { 115 out = FileChannel.open(file, options); 116 } catch (IOException e) { 117 result.completeExceptionally(e); 118 subscription.cancel(); 119 return; 120 } 121 subscription.request(1); 122 } 123 124 @Override 125 public void onNext(List<ByteBuffer> items) { 126 try { 127 out.write(items.toArray(new ByteBuffer[0])); 128 } catch (IOException ex) { 129 Utils.close(out); 130 subscription.cancel(); 131 result.completeExceptionally(ex); 132 } 133 subscription.request(1); 134 } 135 136 @Override 137 public void onError(Throwable e) { 138 result.completeExceptionally(e); 139 Utils.close(out); 140 } 141 142 @Override 143 public void onComplete() { 144 Utils.close(out); 145 result.complete(file); 146 } 147 148 @Override 149 public CompletionStage<Path> getBody() { 150 return result; 151 } 152 } 153 154 static class ByteArrayProcessor<T> implements HttpResponse.BodyProcessor<T> { 155 private final Function<byte[], T> finisher; 156 private final CompletableFuture<T> result = new MinimalFuture<>(); 157 private final List<ByteBuffer> received = new ArrayList<>(); 158 159 private Flow.Subscription subscription; 160 161 ByteArrayProcessor(Function<byte[],T> finisher) { 162 this.finisher = finisher; 163 } 164 165 @Override 166 public void onSubscribe(Flow.Subscription subscription) { 167 if (this.subscription != null) { 168 subscription.cancel(); 169 return; 170 } 171 this.subscription = subscription; 172 // We can handle whatever you've got 173 subscription.request(Long.MAX_VALUE); 174 } 175 176 @Override 177 public void onNext(List<ByteBuffer> items) { 178 // incoming buffers are allocated by http client internally, 179 // and won't be used anywhere except this place. 180 // So it's free simply to store them for further processing. 181 assert Utils.remaining(items) > 0; // TODO: is this really necessary? 182 received.addAll(items); 183 } 184 185 @Override 186 public void onError(Throwable throwable) { 187 received.clear(); 188 result.completeExceptionally(throwable); 189 } 190 191 static private byte[] join(List<ByteBuffer> bytes) { 192 int size = Utils.remaining(bytes); 193 byte[] res = new byte[size]; 194 int from = 0; 195 for (ByteBuffer b : bytes) { 196 int l = b.remaining(); 197 b.get(res, from, l); 198 from += l; 199 } 200 return res; 201 } 202 203 @Override 204 public void onComplete() { 205 try { 206 result.complete(finisher.apply(join(received))); 207 received.clear(); 208 } catch (IllegalArgumentException e) { 209 result.completeExceptionally(e); 210 } 211 } 212 213 @Override 214 public CompletionStage<T> getBody() { 215 return result; 216 } 217 } 218 219 static class MultiProcessorImpl<V> implements HttpResponse.MultiProcessor<MultiMapResult<V>,V> { 220 private final MultiMapResult<V> results; 221 private final Function<HttpRequest,Optional<HttpResponse.BodyHandler<V>>> pushHandler; 222 private final boolean completion; // aggregate completes on last PP received or overall completion 223 224 MultiProcessorImpl(Function<HttpRequest,Optional<HttpResponse.BodyHandler<V>>> pushHandler, boolean completion) { 225 this.results = new MultiMapResult<V>(new ConcurrentHashMap<>()); 226 this.pushHandler = pushHandler; 227 this.completion = completion; 228 } 229 230 @Override 231 public Optional<HttpResponse.BodyHandler<V>> onRequest(HttpRequest request) { 232 return pushHandler.apply(request); 233 } 234 235 @Override 236 public void onResponse(HttpResponse<V> response) { 237 results.put(response.request(), CompletableFuture.completedFuture(response)); 238 } 239 240 @Override 241 public void onError(HttpRequest request, Throwable t) { 242 results.put(request, MinimalFuture.failedFuture(t)); 243 } 244 245 @Override 246 public CompletableFuture<MultiMapResult<V>> completion( 247 CompletableFuture<Void> onComplete, CompletableFuture<Void> onFinalPushPromise) { 248 if (completion) 249 return onComplete.thenApply((ignored)-> results); 250 else 251 return onFinalPushPromise.thenApply((ignored) -> results); 252 } 253 } 254 255 static class MultiFile { 256 257 final Path pathRoot; 258 259 MultiFile(Path destination) { 260 if (!destination.toFile().isDirectory()) 261 throw new UncheckedIOException(new IOException("destination is not a directory")); 262 pathRoot = destination; 263 } 264 265 Optional<HttpResponse.BodyHandler<Path>> handlePush(HttpRequest request) { 266 final URI uri = request.uri(); 267 String path = uri.getPath(); 268 while (path.startsWith("/")) 269 path = path.substring(1); 270 Path p = pathRoot.resolve(path); 271 if (Log.trace()) { 272 Log.logTrace("Creating file body processor for URI={0}, path={1}", 273 uri, p); 274 } 275 try { 276 Files.createDirectories(p.getParent()); 277 } catch (IOException ex) { 278 throw new UncheckedIOException(ex); 279 } 280 281 final HttpResponse.BodyHandler<Path> proc = 282 HttpResponse.BodyHandler.asFile(p); 283 284 return Optional.of(proc); 285 } 286 } 287 288 /** 289 * Currently this consumes all of the data and ignores it 290 */ 291 static class NullProcessor<T> implements HttpResponse.BodyProcessor<T> { 292 293 Flow.Subscription subscription; 294 final CompletableFuture<T> cf = new MinimalFuture<>(); 295 final Optional<T> result; 296 297 NullProcessor(Optional<T> result) { 298 this.result = result; 299 } 300 301 @Override 302 public void onSubscribe(Flow.Subscription subscription) { 303 this.subscription = subscription; 304 subscription.request(Long.MAX_VALUE); 305 } 306 307 @Override 308 public void onNext(List<ByteBuffer> items) { 309 // NO-OP 310 } 311 312 @Override 313 public void onError(Throwable throwable) { 314 cf.completeExceptionally(throwable); 315 } 316 317 @Override 318 public void onComplete() { 319 if (result.isPresent()) { 320 cf.complete(result.get()); 321 } else { 322 cf.complete(null); 323 } 324 } 325 326 @Override 327 public CompletionStage<T> getBody() { 328 return cf; 329 } 330 } 331 }