1 /* 2 * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package jdk.incubator.http; 27 28 import java.io.IOException; 29 import java.net.InetSocketAddress; 30 import jdk.incubator.http.HttpResponse.BodyHandler; 31 import jdk.incubator.http.HttpResponse.BodyProcessor; 32 import java.nio.ByteBuffer; 33 import java.util.concurrent.CompletableFuture; 34 import java.util.Collections; 35 import java.util.LinkedList; 36 import java.util.List; 37 import java.util.concurrent.CompletionException; 38 import java.util.concurrent.Executor; 39 import jdk.incubator.http.internal.common.Log; 40 import jdk.incubator.http.internal.common.MinimalFuture; 41 import jdk.incubator.http.internal.common.Utils; 42 43 /** 44 * Encapsulates one HTTP/1.1 request/responseAsync exchange. 45 */ 46 class Http1Exchange<T> extends ExchangeImpl<T> { 47 48 final HttpRequestImpl request; // main request 49 private final List<CompletableFuture<?>> operations; // used for cancel 50 final Http1Request requestAction; 51 private volatile Http1Response<T> response; 52 // use to record possible cancellation raised before any operation 53 // has been initiated. 54 private IOException failed; 55 final HttpConnection connection; 56 final HttpClientImpl client; 57 final Executor executor; 58 volatile ByteBuffer buffer; // used for receiving 59 60 @Override 61 public String toString() { 62 return request.toString(); 63 } 64 65 HttpRequestImpl request() { 66 return request; 67 } 68 69 Http1Exchange(Exchange<T> exchange, HttpConnection connection) 70 throws IOException 71 { 72 super(exchange); 73 this.request = exchange.request(); 74 this.client = exchange.client(); 75 this.executor = exchange.executor(); 76 this.operations = new LinkedList<>(); 77 this.buffer = Utils.EMPTY_BYTEBUFFER; 78 if (connection != null) { 79 this.connection = connection; 80 } else { 81 InetSocketAddress addr = request.getAddress(client); 82 this.connection = HttpConnection.getConnection(addr, client, request); 83 } 84 this.requestAction = new Http1Request(request, client, this.connection); 85 } 86 87 88 HttpConnection connection() { 89 return connection; 90 } 91 92 93 @Override 94 T readBody(BodyHandler<T> handler, boolean returnConnectionToPool) 95 throws IOException 96 { 97 BodyProcessor<T> processor = handler.apply(response.responseCode(), 98 response.responseHeaders()); 99 CompletableFuture<T> bodyCF = response.readBody(processor, 100 returnConnectionToPool, 101 this::executeInline); 102 try { 103 return bodyCF.join(); 104 } catch (CompletionException e) { 105 throw Utils.getIOException(e); 106 } 107 } 108 109 private void executeInline(Runnable r) { 110 r.run(); 111 } 112 113 synchronized ByteBuffer getBuffer() { 114 return buffer; 115 } 116 117 @Override 118 CompletableFuture<T> readBodyAsync(BodyHandler<T> handler, 119 boolean returnConnectionToPool, 120 Executor executor) 121 { 122 BodyProcessor<T> processor = handler.apply(response.responseCode(), 123 response.responseHeaders()); 124 CompletableFuture<T> bodyCF = response.readBody(processor, 125 returnConnectionToPool, 126 executor); 127 return bodyCF; 128 } 129 130 @Override 131 void sendHeadersOnly() throws IOException, InterruptedException { 132 try { 133 if (!connection.connected()) { 134 connection.connect(); 135 } 136 requestAction.sendHeadersOnly(); 137 } catch (Throwable e) { 138 connection.close(); 139 throw e; 140 } 141 } 142 143 @Override 144 void sendBody() throws IOException { 145 try { 146 requestAction.continueRequest(); 147 } catch (Throwable e) { 148 connection.close(); 149 throw e; 150 } 151 } 152 153 @Override 154 Response getResponse() throws IOException { 155 try { 156 response = new Http1Response<>(connection, this); 157 response.readHeaders(); 158 Response r = response.response(); 159 buffer = response.getBuffer(); 160 return r; 161 } catch (Throwable t) { 162 connection.close(); 163 throw t; 164 } 165 } 166 167 private void closeConnection() { 168 connection.close(); 169 } 170 171 /** 172 * Cancel checks to see if request and responseAsync finished already. 173 * If not it closes the connection and completes all pending operations 174 */ 175 @Override 176 void cancel() { 177 cancel(new IOException("Request cancelled")); 178 } 179 180 /** 181 * Cancel checks to see if request and responseAsync finished already. 182 * If not it closes the connection and completes all pending operations 183 */ 184 @Override 185 synchronized void cancel(IOException cause) { 186 if (requestAction != null && requestAction.finished() 187 && response != null && response.finished()) { 188 return; 189 } 190 connection.close(); 191 int count = 0; 192 if (operations.isEmpty()) { 193 failed = cause; 194 Log.logTrace("Http1Exchange: request [{0}/timeout={1}ms] no pending operation." 195 + "\n\tCan''t cancel yet with {2}", 196 request.uri(), 197 request.duration() == null ? -1 : 198 // calling duration.toMillis() can throw an exception. 199 // this is just debugging, we don't care if it overflows. 200 (request.duration().getSeconds() * 1000 201 + request.duration().getNano() / 1000000), 202 cause); 203 } else { 204 for (CompletableFuture<?> cf : operations) { 205 cf.completeExceptionally(cause); 206 count++; 207 } 208 } 209 Log.logError("Http1Exchange.cancel: count=" + count); 210 } 211 212 CompletableFuture<Response> getResponseAsyncImpl(Executor executor) { 213 return MinimalFuture.supply( () -> { 214 response = new Http1Response<>(connection, Http1Exchange.this); 215 response.readHeaders(); 216 Response r = response.response(); 217 buffer = response.getBuffer(); 218 return r; 219 }, executor); 220 } 221 222 @Override 223 CompletableFuture<Response> getResponseAsync(Executor executor) { 224 CompletableFuture<Response> cf = 225 connection.whenReceivingResponse() 226 .thenCompose((v) -> getResponseAsyncImpl(executor)); 227 IOException cause; 228 synchronized(this) { 229 operations.add(cf); 230 cause = failed; 231 failed = null; 232 } 233 if (cause != null) { 234 Log.logTrace("Http1Exchange: request [{0}/timeout={1}ms]" 235 + "\n\tCompleting exceptionally with {2}\n", 236 request.uri(), 237 request.duration() == null ? -1 : 238 // calling duration.toMillis() can throw an exception. 239 // this is just debugging, we don't care if it overflows. 240 (request.duration().getSeconds() * 1000 241 + request.duration().getNano() / 1000000), 242 cause); 243 cf.completeExceptionally(cause); 244 } 245 return cf; 246 } 247 }