--- old/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseProcessors.java 2017-11-30 04:06:30.635308090 -0800 +++ /dev/null 2017-10-28 22:49:55.551349757 -0700 @@ -1,331 +0,0 @@ -/* - * Copyright (c) 2016, Oracle and/or its affiliates. All rights reserved. - * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. - * - * This code is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License version 2 only, as - * published by the Free Software Foundation. Oracle designates this - * particular file as subject to the "Classpath" exception as provided - * by Oracle in the LICENSE file that accompanied this code. - * - * This code is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - * version 2 for more details (a copy is included in the LICENSE file that - * accompanied this code). - * - * You should have received a copy of the GNU General Public License version - * 2 along with this work; if not, write to the Free Software Foundation, - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. - * - * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA - * or visit www.oracle.com if you need additional information or have any - * questions. - */ - -package jdk.incubator.http; - -import java.io.IOException; -import java.io.UncheckedIOException; -import java.net.URI; -import java.nio.ByteBuffer; -import java.nio.channels.FileChannel; -import java.nio.file.Files; -import java.nio.file.OpenOption; -import java.nio.file.Path; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionStage; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Flow; -import java.util.function.Consumer; -import java.util.function.Function; -import jdk.incubator.http.internal.common.MinimalFuture; -import jdk.incubator.http.internal.common.Utils; -import jdk.incubator.http.internal.common.Log; - -class ResponseProcessors { - - static class ConsumerProcessor implements HttpResponse.BodyProcessor { - private final Consumer> consumer; - private Flow.Subscription subscription; - private final CompletableFuture result = new MinimalFuture<>(); - - ConsumerProcessor(Consumer> consumer) { - this.consumer = consumer; - } - - @Override - public CompletionStage getBody() { - return result; - } - - @Override - public void onSubscribe(Flow.Subscription subscription) { - this.subscription = subscription; - subscription.request(1); - } - - @Override - public void onNext(ByteBuffer item) { - byte[] buf = new byte[item.remaining()]; - item.get(buf); - consumer.accept(Optional.of(buf)); - subscription.request(1); - } - - @Override - public void onError(Throwable throwable) { - result.completeExceptionally(throwable); - } - - @Override - public void onComplete() { - consumer.accept(Optional.empty()); - result.complete(null); - } - - } - - static class PathProcessor implements HttpResponse.BodyProcessor { - - private final Path file; - private final CompletableFuture result = new MinimalFuture<>(); - - private Flow.Subscription subscription; - private FileChannel out; - private final OpenOption[] options; - - PathProcessor(Path file, OpenOption... options) { - this.file = file; - this.options = options; - } - - @Override - public void onSubscribe(Flow.Subscription subscription) { - this.subscription = subscription; - try { - out = FileChannel.open(file, options); - } catch (IOException e) { - result.completeExceptionally(e); - subscription.cancel(); - return; - } - subscription.request(1); - } - - @Override - public void onNext(ByteBuffer item) { - try { - out.write(item); - } catch (IOException ex) { - Utils.close(out); - subscription.cancel(); - result.completeExceptionally(ex); - } - subscription.request(1); - } - - @Override - public void onError(Throwable e) { - result.completeExceptionally(e); - Utils.close(out); - } - - @Override - public void onComplete() { - Utils.close(out); - result.complete(file); - } - - @Override - public CompletionStage getBody() { - return result; - } - } - - static class ByteArrayProcessor implements HttpResponse.BodyProcessor { - private final Function finisher; - private final CompletableFuture result = new MinimalFuture<>(); - private final List received = new ArrayList<>(); - - private Flow.Subscription subscription; - - ByteArrayProcessor(Function finisher) { - this.finisher = finisher; - } - - @Override - public void onSubscribe(Flow.Subscription subscription) { - if (this.subscription != null) { - subscription.cancel(); - return; - } - this.subscription = subscription; - // We can handle whatever you've got - subscription.request(Long.MAX_VALUE); - } - - @Override - public void onNext(ByteBuffer item) { - // incoming buffers are allocated by http client internally, - // and won't be used anywhere except this place. - // So it's free simply to store them for further processing. - if(item.hasRemaining()) { - received.add(item); - } - } - - @Override - public void onError(Throwable throwable) { - received.clear(); - result.completeExceptionally(throwable); - } - - static private byte[] join(List bytes) { - int size = Utils.remaining(bytes); - byte[] res = new byte[size]; - int from = 0; - for (ByteBuffer b : bytes) { - int l = b.remaining(); - b.get(res, from, l); - from += l; - } - return res; - } - - @Override - public void onComplete() { - try { - result.complete(finisher.apply(join(received))); - received.clear(); - } catch (IllegalArgumentException e) { - result.completeExceptionally(e); - } - } - - @Override - public CompletionStage getBody() { - return result; - } - } - - static class MultiProcessorImpl implements HttpResponse.MultiProcessor,V> { - private final MultiMapResult results; - private final Function>> pushHandler; - private final boolean completion; // aggregate completes on last PP received or overall completion - - MultiProcessorImpl(Function>> pushHandler, boolean completion) { - this.results = new MultiMapResult(new ConcurrentHashMap<>()); - this.pushHandler = pushHandler; - this.completion = completion; - } - - @Override - public Optional> onRequest(HttpRequest request) { - return pushHandler.apply(request); - } - - @Override - public void onResponse(HttpResponse response) { - results.put(response.request(), CompletableFuture.completedFuture(response)); - } - - @Override - public void onError(HttpRequest request, Throwable t) { - results.put(request, MinimalFuture.failedFuture(t)); - } - - @Override - public CompletableFuture> completion( - CompletableFuture onComplete, CompletableFuture onFinalPushPromise) { - if (completion) - return onComplete.thenApply((ignored)-> results); - else - return onFinalPushPromise.thenApply((ignored) -> results); - } - } - - static class MultiFile { - - final Path pathRoot; - - MultiFile(Path destination) { - if (!destination.toFile().isDirectory()) - throw new UncheckedIOException(new IOException("destination is not a directory")); - pathRoot = destination; - } - - Optional> handlePush(HttpRequest request) { - final URI uri = request.uri(); - String path = uri.getPath(); - while (path.startsWith("/")) - path = path.substring(1); - Path p = pathRoot.resolve(path); - if (Log.trace()) { - Log.logTrace("Creating file body processor for URI={0}, path={1}", - uri, p); - } - try { - Files.createDirectories(p.getParent()); - } catch (IOException ex) { - throw new UncheckedIOException(ex); - } - - final HttpResponse.BodyHandler proc = - HttpResponse.BodyHandler.asFile(p); - - return Optional.of(proc); - } - } - - /** - * Currently this consumes all of the data and ignores it - */ - static class NullProcessor implements HttpResponse.BodyProcessor { - - Flow.Subscription subscription; - final CompletableFuture cf = new MinimalFuture<>(); - final Optional result; - - NullProcessor(Optional result) { - this.result = result; - } - - @Override - public void onSubscribe(Flow.Subscription subscription) { - this.subscription = subscription; - subscription.request(Long.MAX_VALUE); - } - - @Override - public void onNext(ByteBuffer item) { - // TODO: check whether this should consume the buffer, as in: - item.position(item.limit()); - } - - @Override - public void onError(Throwable throwable) { - cf.completeExceptionally(throwable); - } - - @Override - public void onComplete() { - if (result.isPresent()) { - cf.complete(result.get()); - } else { - cf.complete(null); - } - } - - @Override - public CompletionStage getBody() { - return cf; - } - } -}