170 responseFiltersAsync(HttpResponse response)
171 {
172 CompletableFuture<Pair<HttpResponse,HttpRequestImpl>> cf = new CompletableFuture<>();
173 try {
174 Pair<HttpResponse,HttpRequestImpl> n = responseFilters(response); // assumed to be fast
175 cf.complete(n);
176 } catch (Throwable e) {
177 cf.completeExceptionally(e);
178 }
179 return cf;
180 }
181
182 public void cancel() {
183 cancelled = true;
184 getExchange().cancel();
185 }
186
187 public CompletableFuture<HttpResponseImpl> responseAsync(Void v) {
188 CompletableFuture<HttpResponseImpl> cf;
189 if (++attempts > max_attempts) {
190 cf = new CompletableFuture<>();
191 cf.completeExceptionally(new IOException("Too many retries"));
192 } else {
193 if (currentreq.timeval() != 0) {
194 // set timer
195 td = new TimedEvent(currentreq.timeval());
196 client.registerTimer(td);
197 }
198 Exchange exch = getExchange();
199 cf = requestFiltersAsync(currentreq)
200 .thenCompose(exch::responseAsync)
201 .thenCompose(this::responseFiltersAsync)
202 .thenCompose((Pair<HttpResponse,HttpRequestImpl> pair) -> {
203 HttpResponseImpl resp = (HttpResponseImpl)pair.first;
204 if (resp != null) {
205 if (attempts > 1) {
206 Log.logError("Succeeded on attempt: " + attempts);
207 }
208 return CompletableFuture.completedFuture(resp);
209 } else {
210 currentreq = pair.second;
211 Exchange previous = exch;
224 return CompletableFuture.completedFuture(response);
225 }
226 // all exceptions thrown are handled here
227 CompletableFuture<HttpResponseImpl> error = getExceptionalCF(obj.second);
228 if (error == null) {
229 cancelTimer();
230 return responseAsync(null);
231 } else {
232 return error;
233 }
234 });
235 }
236 return cf;
237 }
238
239 /**
240 * Take a Throwable and return a suitable CompletableFuture that is
241 * completed exceptionally.
242 */
243 private CompletableFuture<HttpResponseImpl> getExceptionalCF(Throwable t) {
244 CompletableFuture<HttpResponseImpl> error = new CompletableFuture<>();
245 if ((t instanceof CompletionException) || (t instanceof ExecutionException)) {
246 if (t.getCause() != null) {
247 t = t.getCause();
248 }
249 }
250 if (cancelled && t instanceof IOException) {
251 t = new HttpTimeoutException("request timed out");
252 }
253 error.completeExceptionally(t);
254 return error;
255 }
256
257 <T> T responseBody(HttpResponse.BodyProcessor<T> processor) {
258 return getExchange().responseBody(processor);
259 }
260
261 <T> CompletableFuture<T> responseBodyAsync(HttpResponse.BodyProcessor<T> processor) {
262 return getExchange().responseBodyAsync(processor);
263 }
264
265 class TimedEvent extends TimeoutEvent {
266 TimedEvent(long timeval) {
267 super(timeval);
268 }
269 @Override
270 public void handle() {
271 cancel();
272 }
273
274 }
|
170 responseFiltersAsync(HttpResponse response)
171 {
172 CompletableFuture<Pair<HttpResponse,HttpRequestImpl>> cf = new CompletableFuture<>();
173 try {
174 Pair<HttpResponse,HttpRequestImpl> n = responseFilters(response); // assumed to be fast
175 cf.complete(n);
176 } catch (Throwable e) {
177 cf.completeExceptionally(e);
178 }
179 return cf;
180 }
181
182 public void cancel() {
183 cancelled = true;
184 getExchange().cancel();
185 }
186
187 public CompletableFuture<HttpResponseImpl> responseAsync(Void v) {
188 CompletableFuture<HttpResponseImpl> cf;
189 if (++attempts > max_attempts) {
190 cf = CompletableFuture.failedFuture(new IOException("Too many retries"));
191 } else {
192 if (currentreq.timeval() != 0) {
193 // set timer
194 td = new TimedEvent(currentreq.timeval());
195 client.registerTimer(td);
196 }
197 Exchange exch = getExchange();
198 cf = requestFiltersAsync(currentreq)
199 .thenCompose(exch::responseAsync)
200 .thenCompose(this::responseFiltersAsync)
201 .thenCompose((Pair<HttpResponse,HttpRequestImpl> pair) -> {
202 HttpResponseImpl resp = (HttpResponseImpl)pair.first;
203 if (resp != null) {
204 if (attempts > 1) {
205 Log.logError("Succeeded on attempt: " + attempts);
206 }
207 return CompletableFuture.completedFuture(resp);
208 } else {
209 currentreq = pair.second;
210 Exchange previous = exch;
223 return CompletableFuture.completedFuture(response);
224 }
225 // all exceptions thrown are handled here
226 CompletableFuture<HttpResponseImpl> error = getExceptionalCF(obj.second);
227 if (error == null) {
228 cancelTimer();
229 return responseAsync(null);
230 } else {
231 return error;
232 }
233 });
234 }
235 return cf;
236 }
237
238 /**
239 * Take a Throwable and return a suitable CompletableFuture that is
240 * completed exceptionally.
241 */
242 private CompletableFuture<HttpResponseImpl> getExceptionalCF(Throwable t) {
243 if ((t instanceof CompletionException) || (t instanceof ExecutionException)) {
244 if (t.getCause() != null) {
245 t = t.getCause();
246 }
247 }
248 if (cancelled && t instanceof IOException) {
249 t = new HttpTimeoutException("request timed out");
250 }
251 return CompletableFuture.failedFuture(t);
252 }
253
254 <T> T responseBody(HttpResponse.BodyProcessor<T> processor) {
255 return getExchange().responseBody(processor);
256 }
257
258 <T> CompletableFuture<T> responseBodyAsync(HttpResponse.BodyProcessor<T> processor) {
259 return getExchange().responseBodyAsync(processor);
260 }
261
262 class TimedEvent extends TimeoutEvent {
263 TimedEvent(long timeval) {
264 super(timeval);
265 }
266 @Override
267 public void handle() {
268 cancel();
269 }
270
271 }
|