1 /* 2 * Copyright (c) 2016, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package jdk.incubator.http; 27 28 import java.io.IOException; 29 import java.io.UncheckedIOException; 30 import java.net.URI; 31 import java.nio.ByteBuffer; 32 import java.nio.channels.FileChannel; 33 import java.nio.file.Files; 34 import java.nio.file.OpenOption; 35 import java.nio.file.Path; 36 import java.util.ArrayList; 37 import java.util.Collections; 38 import java.util.HashMap; 39 import java.util.List; 40 import java.util.Map; 41 import java.util.Optional; 42 import java.util.concurrent.CompletableFuture; 43 import java.util.concurrent.CompletionStage; 44 import java.util.concurrent.ConcurrentHashMap; 45 import java.util.concurrent.Flow; 46 import java.util.function.Consumer; 47 import java.util.function.Function; 48 import jdk.incubator.http.internal.common.MinimalFuture; 49 import jdk.incubator.http.internal.common.Utils; 50 import jdk.incubator.http.internal.common.Log; 51 52 class ResponseProcessors { 53 54 static class ConsumerProcessor implements HttpResponse.BodyProcessor<Void> { 55 private final Consumer<Optional<byte[]>> consumer; 56 private Flow.Subscription subscription; 57 private final CompletableFuture<Void> result = new MinimalFuture<>(); 58 59 ConsumerProcessor(Consumer<Optional<byte[]>> consumer) { 60 this.consumer = consumer; 61 } 62 63 @Override 64 public CompletionStage<Void> getBody() { 65 return result; 66 } 67 68 @Override 69 public void onSubscribe(Flow.Subscription subscription) { 70 this.subscription = subscription; 71 subscription.request(1); 72 } 73 74 @Override 75 public void onNext(ByteBuffer item) { 76 byte[] buf = new byte[item.remaining()]; 77 item.get(buf); 78 consumer.accept(Optional.of(buf)); 79 subscription.request(1); 80 } 81 82 @Override 83 public void onError(Throwable throwable) { 84 result.completeExceptionally(throwable); 85 } 86 87 @Override 88 public void onComplete() { 89 consumer.accept(Optional.empty()); 90 result.complete(null); 91 } 92 93 } 94 95 static class PathProcessor implements HttpResponse.BodyProcessor<Path> { 96 97 private final Path file; 98 private final CompletableFuture<Path> result = new MinimalFuture<>(); 99 100 private Flow.Subscription subscription; 101 private FileChannel out; 102 private final OpenOption[] options; 103 104 PathProcessor(Path file, OpenOption... options) { 105 this.file = file; 106 this.options = options; 107 } 108 109 @Override 110 public void onSubscribe(Flow.Subscription subscription) { 111 this.subscription = subscription; 112 try { 113 out = FileChannel.open(file, options); 114 } catch (IOException e) { 115 result.completeExceptionally(e); 116 subscription.cancel(); 117 return; 118 } 119 subscription.request(1); 120 } 121 122 @Override 123 public void onNext(ByteBuffer item) { 124 try { 125 out.write(item); 126 } catch (IOException ex) { 127 Utils.close(out); 128 subscription.cancel(); 129 result.completeExceptionally(ex); 130 } 131 subscription.request(1); 132 } 133 134 @Override 135 public void onError(Throwable e) { 136 result.completeExceptionally(e); 137 Utils.close(out); 138 } 139 140 @Override 141 public void onComplete() { 142 Utils.close(out); 143 result.complete(file); 144 } 145 146 @Override 147 public CompletionStage<Path> getBody() { 148 return result; 149 } 150 } 151 152 static class ByteArrayProcessor<T> implements HttpResponse.BodyProcessor<T> { 153 private final Function<byte[], T> finisher; 154 private final CompletableFuture<T> result = new MinimalFuture<>(); 155 private final List<ByteBuffer> received = new ArrayList<>(); 156 157 private Flow.Subscription subscription; 158 159 ByteArrayProcessor(Function<byte[],T> finisher) { 160 this.finisher = finisher; 161 } 162 163 @Override 164 public void onSubscribe(Flow.Subscription subscription) { 165 if (this.subscription != null) { 166 subscription.cancel(); 167 return; 168 } 169 this.subscription = subscription; 170 // We can handle whatever you've got 171 subscription.request(Long.MAX_VALUE); 172 } 173 174 @Override 175 public void onNext(ByteBuffer item) { 176 // incoming buffers are allocated by http client internally, 177 // and won't be used anywhere except this place. 178 // So it's free simply to store them for further processing. 179 if(item.hasRemaining()) { 180 received.add(item); 181 } 182 } 183 184 @Override 185 public void onError(Throwable throwable) { 186 received.clear(); 187 result.completeExceptionally(throwable); 188 } 189 190 static private byte[] join(List<ByteBuffer> bytes) { 191 int size = Utils.remaining(bytes); 192 byte[] res = new byte[size]; 193 int from = 0; 194 for (ByteBuffer b : bytes) { 195 int l = b.remaining(); 196 b.get(res, from, l); 197 from += l; 198 } 199 return res; 200 } 201 202 @Override 203 public void onComplete() { 204 try { 205 result.complete(finisher.apply(join(received))); 206 received.clear(); 207 } catch (IllegalArgumentException e) { 208 result.completeExceptionally(e); 209 } 210 } 211 212 @Override 213 public CompletionStage<T> getBody() { 214 return result; 215 } 216 } 217 218 static class MultiProcessorImpl<V> implements HttpResponse.MultiProcessor<MultiMapResult<V>,V> { 219 private final MultiMapResult<V> results; 220 private final Function<HttpRequest,Optional<HttpResponse.BodyHandler<V>>> pushHandler; 221 private final boolean completion; // aggregate completes on last PP received or overall completion 222 223 MultiProcessorImpl(Function<HttpRequest,Optional<HttpResponse.BodyHandler<V>>> pushHandler, boolean completion) { 224 this.results = new MultiMapResult<V>(new ConcurrentHashMap<>()); 225 this.pushHandler = pushHandler; 226 this.completion = completion; 227 } 228 229 @Override 230 public Optional<HttpResponse.BodyHandler<V>> onRequest(HttpRequest request) { 231 return pushHandler.apply(request); 232 } 233 234 @Override 235 public void onResponse(HttpResponse<V> response) { 236 results.put(response.request(), CompletableFuture.completedFuture(response)); 237 } 238 239 @Override 240 public void onError(HttpRequest request, Throwable t) { 241 results.put(request, MinimalFuture.failedFuture(t)); 242 } 243 244 @Override 245 public CompletableFuture<MultiMapResult<V>> completion( 246 CompletableFuture<Void> onComplete, CompletableFuture<Void> onFinalPushPromise) { 247 if (completion) 248 return onComplete.thenApply((ignored)-> results); 249 else 250 return onFinalPushPromise.thenApply((ignored) -> results); 251 } 252 } 253 254 static class MultiFile { 255 256 final Path pathRoot; 257 258 MultiFile(Path destination) { 259 if (!destination.toFile().isDirectory()) 260 throw new UncheckedIOException(new IOException("destination is not a directory")); 261 pathRoot = destination; 262 } 263 264 Optional<HttpResponse.BodyHandler<Path>> handlePush(HttpRequest request) { 265 final URI uri = request.uri(); 266 String path = uri.getPath(); 267 while (path.startsWith("/")) 268 path = path.substring(1); 269 Path p = pathRoot.resolve(path); 270 if (Log.trace()) { 271 Log.logTrace("Creating file body processor for URI={0}, path={1}", 272 uri, p); 273 } 274 try { 275 Files.createDirectories(p.getParent()); 276 } catch (IOException ex) { 277 throw new UncheckedIOException(ex); 278 } 279 280 final HttpResponse.BodyHandler<Path> proc = 281 HttpResponse.BodyHandler.asFile(p); 282 283 return Optional.of(proc); 284 } 285 } 286 287 /** 288 * Currently this consumes all of the data and ignores it 289 */ 290 static class NullProcessor<T> implements HttpResponse.BodyProcessor<T> { 291 292 Flow.Subscription subscription; 293 final CompletableFuture<T> cf = new MinimalFuture<>(); 294 final Optional<T> result; 295 296 NullProcessor(Optional<T> result) { 297 this.result = result; 298 } 299 300 @Override 301 public void onSubscribe(Flow.Subscription subscription) { 302 this.subscription = subscription; 303 subscription.request(Long.MAX_VALUE); 304 } 305 306 @Override 307 public void onNext(ByteBuffer item) { 308 // TODO: check whether this should consume the buffer, as in: 309 item.position(item.limit()); 310 } 311 312 @Override 313 public void onError(Throwable throwable) { 314 cf.completeExceptionally(throwable); 315 } 316 317 @Override 318 public void onComplete() { 319 if (result.isPresent()) { 320 cf.complete(result.get()); 321 } else { 322 cf.complete(null); 323 } 324 } 325 326 @Override 327 public CompletionStage<T> getBody() { 328 return cf; 329 } 330 } 331 }