/* * Copyright (C) 2017 Google Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software distributed under the License * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express * or implied. See the License for the specific language governing permissions and limitations under * the License. */ package com.google.test.http; import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; import java.util.Objects; import java.util.concurrent.BlockingDeque; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.Flow; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.atomic.AtomicInteger; import jdk.incubator.http.HttpResponse; /** * A piped pair of a {@code HttpResponse.BodyProcessor} and a corresponding * {@link InputStream} from which the data can be read. The BodyProcessor and the * InputStream are decoupled via an internal buffer of {@code numBuffers} ByteBuffers. */ public final class PipedResponseStream { private final HttpResponse.BodyProcessor bodyProcessor; private final InputStream inputStream; /** * @param numBuffers The size of the internal buffer in number of {@code ByteBuffer}s. * FIXME: If the BodyProcessor (or HttpClient) API allowed us to specify * ByteBuffer instances or ByteBuffer sizes, the buffer size could be * expressed in number of bytes, which would be much easier to use correctly. */ public PipedResponseStream(int numBuffers) { PullBodyProcessor bodyProcessor = new PullBodyProcessor(numBuffers); this.bodyProcessor = bodyProcessor; this.inputStream = new PullBodyProcessorInputStream(bodyProcessor); } /** * Returns a {@link InputStream} from which one can read the data delivered to * {@link #getBodyProcessor()}. */ public InputStream getInputStream() { return inputStream; } /** * Returns a {@link jdk.incubator.http.HttpResponse.BodyProcessor} whose data will * be exposed via {@link #getInputStream()}. */ public HttpResponse.BodyProcessor getBodyProcessor() { return bodyProcessor; } /** * A {@code HttpResponse.BodyProcessor} that provides {@link #take() pull} access to * the ByteBuffers that it receives. */ static final class PullBodyProcessor implements HttpResponse.BodyProcessor { /** * Blocks, then returns the next ByteBuffer available, or null if EOF reached. * May be called from any thread. */ public ByteBuffer take() throws InterruptedException { ByteBuffer result = buffers.take(); if (result == EOF) { buffers.putFirst(result); // put back return null; } bufferSize.addAndGet(- result.remaining()); // We've now gained space for one additional buffer. Note that by the time // we get here, some data has already been received so subscription != null. subscription.request(1); return result; } public ByteBuffer peek() { ByteBuffer result = buffers.peek(); return result == EOF ? null : result; } private final CompletableFuture future = new CompletableFuture<>(); // Marks the end of data (BLockingDeque does not permit null elements). private final ByteBuffer EOF = ByteBuffer.wrap(new byte[0]); private final BlockingDeque buffers = new LinkedBlockingDeque<>(); private final int initialBuffersToRequest; // Needs to be volatile since it's written by the BodyProcessor on the network // thread but read by take() on the application thread. private volatile Flow.Subscription subscription; private volatile Throwable throwable; // The current combined size of buffers in bytes remaining (temporarily // nonexact during calls to take() or onNext()). This depends on // the fact that the library will not modify the buffers after passing them // to us through onNext(). private final AtomicInteger bufferSize = new AtomicInteger(0); /** * @param numByteBuffersToBuffer The maximum number of {@link ByteBuffer}s to read * ahead in order to decouple the push-based BodyProcessor from the * pull-based {@link #take()}. */ PullBodyProcessor(int numByteBuffersToBuffer) { this.initialBuffersToRequest = numByteBuffersToBuffer; } /** * An estimate on the current size of the buffer in bytes. The bound may * be nonexact during concurrent calls to {@link #take()} or * {@link #onNext(Object)}. */ int bufferSize() { return bufferSize.get(); } @Override public CompletionStage getBody() { return future; } @Override public void onSubscribe(Flow.Subscription subscription) { PullBodyProcessor.this.subscription = Objects.requireNonNull(subscription); /* FIXME: Because request() takes a number of callbacks and the size of the ByteBuffers that are returned to us is not under our control, we have no control over the number of bytes that we buffer - we only have control over the number of ByteBuffer objects in it. This could be fixed through in one of two different ways: 1.) Let us (the application) allocate the ByteBuffers, so we can size them appropriately for the number of bytes that we want to buffer. This also has the advantage that we could ask for data in chunks of a known size (e.g. block size of an image or video stream), and we could choose the ByteBuffer implementation (e.g. MappedByteBuffer). For example, it'd be possible with a subclass of Subscription that has an overload subscription.request(ByteBuffer). Maybe there is a solution that doesn't require a new type? 2.) Let us request data in bytes rather than in number of callbacks, e.g. by letting the parameter of request(long) indicate a number of bytes. This is harder to do because the Flow API is explicitly defined in terms of number of callbacks. It would also not have the advantage of putting us in control of block size and ByteBuffer implementation class. */ subscription.request(initialBuffersToRequest); } @Override public void onNext(ByteBuffer item) { int remaining = item.remaining(); // Allowing no empty ByteBuffer other than EOF in the queue means that // peek() never returns an empty ByteBuffer (as long as no one modifies // the ByteBuffer after passing it to this method). if (remaining > 0) { buffers.add(item); } // TODO: Guard against overflow? bufferSize.addAndGet(remaining); } @Override public void onError(Throwable throwable) { buffers.add(EOF); // Avoid need for a try/catch block and dealing with InterruptedException // for getting the throwable out of the future. this.throwable = throwable; future.completeExceptionally(throwable); } @Override public void onComplete() { buffers.add(EOF); future.complete(null); } @Override public String toString() { return getClass().getSimpleName() + " (" + buffers.size() + " buffers available)"; } } /** * Makes the bytes from a {@link PullBodyProcessor} available as an {@link InputStream}. * * TODO: This class only depends on a single method, PullBodyProcessor.take(). * *If* we ever want to make this class public, then it should probably depend only * on a narrower interface. * * This class is not thread safe. */ static class PullBodyProcessorInputStream extends InputStream { private final PullBodyProcessor delegate; public PullBodyProcessorInputStream(PullBodyProcessor delegate) { this.delegate = Objects.requireNonNull(delegate); } // null if EOF encountered private ByteBuffer currentBuffer = ByteBuffer.wrap(new byte[0]); private void updateCurrentBufferAndCheckForException() throws IOException { boolean interrupted = false; while (currentBuffer != null && !currentBuffer.hasRemaining()) { try { currentBuffer = delegate.take(); } catch (InterruptedException e) { interrupted = true; } } if (interrupted) { // restore interrupted status since we didn't handle it Thread.currentThread().interrupt(); } Throwable t = delegate.throwable; if (t != null) { if (t instanceof IOException) { throw (IOException) t; } else { throw new IOException(t); } } } @Override public int read() throws IOException { updateCurrentBufferAndCheckForException(); if (currentBuffer == null) { return -1; // eof } return Byte.toUnsignedInt(currentBuffer.get()); } @Override public int read(byte[] b, int off, int len) throws IOException { int bytesCopied = 0; while (bytesCopied < len) { updateCurrentBufferAndCheckForException(); if (currentBuffer == null) { return bytesCopied == 0 ? -1 : bytesCopied; } int toCopy = Math.min(len - bytesCopied, currentBuffer.remaining()); currentBuffer.get(b, off, toCopy); off += toCopy; bytesCopied += toCopy; } return bytesCopied; } @Override public long skip(long n) throws IOException { n = Math.max(0, n); // optional long remainingToSkip = n; while (remainingToSkip > 0) { // Note that this may block if this method was called with n > available() updateCurrentBufferAndCheckForException(); if (currentBuffer == null) { // EOF return (n - remainingToSkip); } int toSkipNow = currentBuffer.remaining(); if (remainingToSkip < toSkipNow) { toSkipNow = (int) remainingToSkip; // safe from overflow (toSkipNow is an int) } currentBuffer.position(currentBuffer.position() + toSkipNow); remainingToSkip -= toSkipNow; } return n; } @Override public int available() throws IOException { // Since this InputStream is not thread safe, we will not be calling // delegate.take() concurrently; onNext() does not block. Therefore // we know that we can at least read this many bytes concurrently // without blocking. if (currentBuffer == null) { return 0; // EOF } return delegate.bufferSize() + currentBuffer.remaining(); } } }