66 final class Exchange<T> { 67 68 final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG); 69 70 final HttpRequestImpl request; 71 final HttpClientImpl client; 72 volatile ExchangeImpl<T> exchImpl; 73 volatile CompletableFuture<? extends ExchangeImpl<T>> exchangeCF; 74 volatile CompletableFuture<Void> bodyIgnored; 75 76 // used to record possible cancellation raised before the exchImpl 77 // has been established. 78 private volatile IOException failed; 79 final AccessControlContext acc; 80 final MultiExchange<T> multi; 81 final Executor parentExecutor; 82 boolean upgrading; // to HTTP/2 83 final PushGroup<T> pushGroup; 84 final String dbgTag; 85 86 Exchange(HttpRequestImpl request, MultiExchange<T> multi) { 87 this.request = request; 88 this.upgrading = false; 89 this.client = multi.client(); 90 this.multi = multi; 91 this.acc = multi.acc; 92 this.parentExecutor = multi.executor; 93 this.pushGroup = multi.pushGroup; 94 this.dbgTag = "Exchange"; 95 } 96 97 /* If different AccessControlContext to be used */ 98 Exchange(HttpRequestImpl request, 99 MultiExchange<T> multi, 100 AccessControlContext acc) 101 { 102 this.request = request; 103 this.acc = acc; 104 this.upgrading = false; 105 this.client = multi.client(); 108 this.pushGroup = multi.pushGroup; 109 this.dbgTag = "Exchange"; 110 } 111 112 PushGroup<T> getPushGroup() { 113 return pushGroup; 114 } 115 116 Executor executor() { 117 return parentExecutor; 118 } 119 120 public HttpRequestImpl request() { 121 return request; 122 } 123 124 HttpClientImpl client() { 125 return client; 126 } 127 128 129 public CompletableFuture<T> readBodyAsync(HttpResponse.BodyHandler<T> handler) { 130 // If we received a 407 while establishing the exchange 131 // there will be no body to read: bodyIgnored will be true, 132 // and exchImpl will be null (if we were trying to establish 133 // an HTTP/2 tunnel through an HTTP/1.1 proxy) 134 if (bodyIgnored != null) return MinimalFuture.completedFuture(null); 135 136 // The connection will not be returned to the pool in the case of WebSocket 137 return exchImpl.readBodyAsync(handler, !request.isWebSocket(), parentExecutor) 138 .whenComplete((r,t) -> exchImpl.completed()); 139 } 140 141 /** 142 * Called after a redirect or similar kind of retry where a body might 143 * be sent but we don't want it. Should send a RESET in h2. For http/1.1 144 * we can consume small quantity of data, or close the connection in 145 * other cases. 146 */ 147 public CompletableFuture<Void> ignoreBody() { 162 // Setting it to null here might get it GC'ed too early, because 163 // the Http1Response is now only weakly referenced by the Selector. 164 } 165 166 public void cancel() { 167 // cancel can be called concurrently before or at the same time 168 // that the exchange impl is being established. 169 // In that case we won't be able to propagate the cancellation 170 // right away 171 if (exchImpl != null) { 172 exchImpl.cancel(); 173 } else { 174 // no impl - can't cancel impl yet. 175 // call cancel(IOException) instead which takes care 176 // of race conditions between impl/cancel. 177 cancel(new IOException("Request cancelled")); 178 } 179 } 180 181 public void cancel(IOException cause) { 182 // If the impl is non null, propagate the exception right away. 183 // Otherwise record it so that it can be propagated once the 184 // exchange impl has been established. 185 ExchangeImpl<?> impl = exchImpl; 186 if (impl != null) { 187 // propagate the exception to the impl 188 if (debug.on()) debug.log("Cancelling exchImpl: %s", exchImpl); 189 impl.cancel(cause); 190 } else { 191 // no impl yet. record the exception 192 failed = cause; 193 // now call checkCancelled to recheck the impl. 194 // if the failed state is set and the impl is not null, reset 195 // the failed state and propagate the exception to the impl. 196 checkCancelled(); 197 } 198 } 199 200 // This method will raise an exception if one was reported and if 201 // it is possible to do so. If the exception can be raised, then 202 // the failed state will be reset. Otherwise, the failed state 203 // will persist until the exception can be raised and the failed state 204 // can be cleared. 205 // Takes care of possible race conditions. 206 private void checkCancelled() { 207 ExchangeImpl<?> impl = null; 208 IOException cause = null; 209 CompletableFuture<? extends ExchangeImpl<T>> cf = null; 210 if (failed != null) { 211 synchronized(this) { 212 cause = failed; | 66 final class Exchange<T> { 67 68 final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG); 69 70 final HttpRequestImpl request; 71 final HttpClientImpl client; 72 volatile ExchangeImpl<T> exchImpl; 73 volatile CompletableFuture<? extends ExchangeImpl<T>> exchangeCF; 74 volatile CompletableFuture<Void> bodyIgnored; 75 76 // used to record possible cancellation raised before the exchImpl 77 // has been established. 78 private volatile IOException failed; 79 final AccessControlContext acc; 80 final MultiExchange<T> multi; 81 final Executor parentExecutor; 82 boolean upgrading; // to HTTP/2 83 final PushGroup<T> pushGroup; 84 final String dbgTag; 85 86 // Keeps track of the underlying connection when establishing an HTTP/2 87 // exchange so that it can be aborted/timed out mid setup. 88 final ConnectionAborter connectionAborter = new ConnectionAborter(); 89 90 Exchange(HttpRequestImpl request, MultiExchange<T> multi) { 91 this.request = request; 92 this.upgrading = false; 93 this.client = multi.client(); 94 this.multi = multi; 95 this.acc = multi.acc; 96 this.parentExecutor = multi.executor; 97 this.pushGroup = multi.pushGroup; 98 this.dbgTag = "Exchange"; 99 } 100 101 /* If different AccessControlContext to be used */ 102 Exchange(HttpRequestImpl request, 103 MultiExchange<T> multi, 104 AccessControlContext acc) 105 { 106 this.request = request; 107 this.acc = acc; 108 this.upgrading = false; 109 this.client = multi.client(); 112 this.pushGroup = multi.pushGroup; 113 this.dbgTag = "Exchange"; 114 } 115 116 PushGroup<T> getPushGroup() { 117 return pushGroup; 118 } 119 120 Executor executor() { 121 return parentExecutor; 122 } 123 124 public HttpRequestImpl request() { 125 return request; 126 } 127 128 HttpClientImpl client() { 129 return client; 130 } 131 132 // Keeps track of the underlying connection when establishing an HTTP/2 133 // exchange so that it can be aborted/timed out mid setup. 134 static final class ConnectionAborter { 135 private volatile HttpConnection connection; 136 137 void connection(HttpConnection connection) { 138 this.connection = connection; 139 } 140 141 void closeConnection() { 142 HttpConnection connection = this.connection; 143 this.connection = null; 144 if (connection != null) { 145 try { 146 connection.close(); 147 } catch (Throwable t) { 148 // ignore 149 } 150 } 151 } 152 } 153 154 public CompletableFuture<T> readBodyAsync(HttpResponse.BodyHandler<T> handler) { 155 // If we received a 407 while establishing the exchange 156 // there will be no body to read: bodyIgnored will be true, 157 // and exchImpl will be null (if we were trying to establish 158 // an HTTP/2 tunnel through an HTTP/1.1 proxy) 159 if (bodyIgnored != null) return MinimalFuture.completedFuture(null); 160 161 // The connection will not be returned to the pool in the case of WebSocket 162 return exchImpl.readBodyAsync(handler, !request.isWebSocket(), parentExecutor) 163 .whenComplete((r,t) -> exchImpl.completed()); 164 } 165 166 /** 167 * Called after a redirect or similar kind of retry where a body might 168 * be sent but we don't want it. Should send a RESET in h2. For http/1.1 169 * we can consume small quantity of data, or close the connection in 170 * other cases. 171 */ 172 public CompletableFuture<Void> ignoreBody() { 187 // Setting it to null here might get it GC'ed too early, because 188 // the Http1Response is now only weakly referenced by the Selector. 189 } 190 191 public void cancel() { 192 // cancel can be called concurrently before or at the same time 193 // that the exchange impl is being established. 194 // In that case we won't be able to propagate the cancellation 195 // right away 196 if (exchImpl != null) { 197 exchImpl.cancel(); 198 } else { 199 // no impl - can't cancel impl yet. 200 // call cancel(IOException) instead which takes care 201 // of race conditions between impl/cancel. 202 cancel(new IOException("Request cancelled")); 203 } 204 } 205 206 public void cancel(IOException cause) { 207 if (debug.on()) debug.log("cancel exchImpl: %s, with \"%s\"", exchImpl, cause); 208 // If the impl is non null, propagate the exception right away. 209 // Otherwise record it so that it can be propagated once the 210 // exchange impl has been established. 211 ExchangeImpl<?> impl = exchImpl; 212 if (impl != null) { 213 // propagate the exception to the impl 214 if (debug.on()) debug.log("Cancelling exchImpl: %s", exchImpl); 215 impl.cancel(cause); 216 } else { 217 // no impl yet. record the exception 218 failed = cause; 219 220 // abort/close the connection if setting up the exchange. This can 221 // be important when setting up HTTP/2 222 connectionAborter.closeConnection(); 223 224 // now call checkCancelled to recheck the impl. 225 // if the failed state is set and the impl is not null, reset 226 // the failed state and propagate the exception to the impl. 227 checkCancelled(); 228 } 229 } 230 231 // This method will raise an exception if one was reported and if 232 // it is possible to do so. If the exception can be raised, then 233 // the failed state will be reset. Otherwise, the failed state 234 // will persist until the exception can be raised and the failed state 235 // can be cleared. 236 // Takes care of possible race conditions. 237 private void checkCancelled() { 238 ExchangeImpl<?> impl = null; 239 IOException cause = null; 240 CompletableFuture<? extends ExchangeImpl<T>> cf = null; 241 if (failed != null) { 242 synchronized(this) { 243 cause = failed; |