20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 */ 24 package java.net.http; 25 26 import java.io.IOException; 27 import java.net.Authenticator; 28 import java.net.CookieManager; 29 import java.net.ProxySelector; 30 import java.net.URI; 31 import static java.net.http.Utils.BUFSIZE; 32 import java.nio.ByteBuffer; 33 import java.nio.channels.ClosedChannelException; 34 import java.nio.channels.SelectableChannel; 35 import java.nio.channels.SelectionKey; 36 import static java.nio.channels.SelectionKey.OP_CONNECT; 37 import static java.nio.channels.SelectionKey.OP_READ; 38 import static java.nio.channels.SelectionKey.OP_WRITE; 39 import java.nio.channels.Selector; 40 import java.util.LinkedList; 41 import java.util.List; 42 import java.util.Set; 43 import java.util.concurrent.ExecutorService; 44 import java.security.NoSuchAlgorithmException; 45 import java.util.Iterator; 46 import java.util.ListIterator; 47 import java.util.Optional; 48 import java.util.concurrent.Executors; 49 import java.util.concurrent.ThreadFactory; 50 import javax.net.ssl.SSLContext; 51 import javax.net.ssl.SSLParameters; 52 53 /** 54 * Client implementation. Contains all configuration information and also 55 * the selector manager thread which allows async events to be registered 56 * and delivered when they occur. See AsyncEvent. 57 */ 58 class HttpClientImpl extends HttpClient implements BufferHandler { 59 60 private final CookieManager cookieManager; 61 private final Redirect followRedirects; 62 private final ProxySelector proxySelector; 63 private final Authenticator authenticator; 64 private final Version version; 65 private boolean pipelining = false; 66 private final ConnectionPool connections; 67 private final ExecutorWrapper executor; 231 if ((i & OP_CONNECT) != 0) 232 sb.append("OP_CONNECT "); 233 if ((i & OP_WRITE) != 0) 234 sb.append("OP_WRITE "); 235 return sb.toString(); 236 } 237 238 @Override 239 public void run() { 240 try { 241 while (true) { 242 synchronized (this) { 243 debugList = copy(registrations); 244 for (AsyncEvent exchange : registrations) { 245 SelectableChannel c = exchange.channel(); 246 try { 247 c.configureBlocking(false); 248 SelectionKey key = c.keyFor(selector); 249 SelectorAttachment sa; 250 if (key == null) { 251 sa = new SelectorAttachment(c); 252 } else { 253 sa = (SelectorAttachment)key.attachment(); 254 } 255 sa.register(exchange); 256 } catch (IOException e) { 257 Log.logError("HttpClientImpl: " + e); 258 c.close(); 259 // let the exchange deal with it 260 handleEvent(exchange); 261 } 262 } 263 registrations.clear(); 264 } 265 long timeval = getTimeoutValue(); 266 long now = System.currentTimeMillis(); 267 int n = selector.select(timeval); 268 if (n == 0) { 269 signalTimeouts(now); 270 continue; 271 } 272 Set<SelectionKey> keys = selector.selectedKeys(); 273 274 for (SelectionKey key : keys) { 275 SelectorAttachment sa = (SelectorAttachment)key.attachment(); 276 int eventsOccurred = key.readyOps(); 277 for (AsyncEvent ev : sa.events(eventsOccurred)) { 278 readyList.add(ev); 279 } 280 sa.resetInterestOps(eventsOccurred); 281 } 282 selector.selectNow(); // complete cancellation 283 selector.selectedKeys().clear(); 284 285 for (AsyncEvent exchange : readyList) { 286 if (exchange instanceof AsyncEvent.Blocking) { 287 exchange.channel().configureBlocking(true); 288 } else { 289 assert exchange instanceof AsyncEvent.NonBlocking; 290 } 291 executor.synchronize(); 292 handleEvent(exchange); // will be delegated to executor 293 } 294 readyList.clear(); 295 } 296 } catch (Throwable e) { 297 if (!closed) { 298 System.err.println("HttpClientImpl terminating on error"); 299 // This terminates thread. So, better just print stack trace 300 String err = Utils.stackTrace(e); 301 Log.logError("HttpClientImpl: fatal error: " + err); 302 } 303 } 304 } 305 306 void handleEvent(AsyncEvent e) { 307 if (closed) { 308 e.abort(); 309 } else { 310 e.handle(); 311 } 312 } 313 314 /** 315 * Tracks multiple user level registrations associated with one 316 * NIO registration (SelectionKey). In this implementation, 317 * registrations are one-off and when an event is posted 318 * the registration is cancelled until explicitly registered 319 * again. 320 * no external synchronization required as this class is only used 321 * by the SelectorManager thread. One of these objects 322 * required per connection. 323 */ 324 private class SelectorAttachment { 325 int interestops; 326 final SelectableChannel chan; 327 final LinkedList<AsyncEvent> pending; 328 329 SelectorAttachment(SelectableChannel chan) { 330 this.pending = new LinkedList<>(); 331 this.chan = chan; 332 } 333 334 void register(AsyncEvent e) throws ClosedChannelException { 335 int newops = e.interestOps(); 336 boolean reRegister = (interestops & newops) != newops; 337 this.interestops |= newops; 338 pending.add(e); 339 if (reRegister) { 340 // first time registration happens here also 341 chan.register(selector, interestops, this); 342 } 343 } 344 345 int interestOps() { 346 return interestops; 347 } 348 349 /** 350 * Returns Iterator<AsyncEvents> containing only events that are 351 * registered with the given interestops 352 * 353 * @param type 354 * @return 355 */ 356 Iterable<AsyncEvent> events(int interestop) { 357 return new Iterable<AsyncEvent>() { 358 @Override 359 public Iterator<AsyncEvent> iterator() { 360 return pending.stream() 361 .filter(ev -> (ev.interestOps() & interestop) != 0) 362 .iterator(); 363 } 364 }; 365 } 366 367 /** 368 * Remove any events with the given interestop, and if no events 369 * remaining, cancel the associated SelectionKey. 370 * 371 * @param interestops 372 */ 373 void resetInterestOps(int interestop) { 374 int size = pending.size(); 375 int newops = 0; 376 for (int i=0; i<size; i++) { 377 AsyncEvent ev = pending.get(i); 378 int evops = ev.interestOps(); 379 if ((evops & interestop) != 0) { 380 pending.remove(i); 381 } else { 382 newops |= evops; 383 } 384 } 385 this.interestops = newops; 386 SelectionKey key = chan.keyFor(selector); 387 if (newops == 0) { 388 key.cancel(); 389 } else { 390 key.interestOps(newops); 391 } 392 } 393 } 394 } 395 396 /** 397 * Creates a HttpRequest associated with this group. 398 * 399 * @throws IllegalStateException if the group has been stopped 400 */ 401 @Override 402 public HttpRequestBuilderImpl request() { 403 return new HttpRequestBuilderImpl(this, null); 404 } 405 406 /** 407 * Creates a HttpRequest associated with this group. 408 * 409 * @throws IllegalStateException if the group has been stopped 410 */ 411 @Override 412 public HttpRequestBuilderImpl request(URI uri) { 413 return new HttpRequestBuilderImpl(this, uri); 414 } | 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 */ 24 package java.net.http; 25 26 import java.io.IOException; 27 import java.net.Authenticator; 28 import java.net.CookieManager; 29 import java.net.ProxySelector; 30 import java.net.URI; 31 import static java.net.http.Utils.BUFSIZE; 32 import java.nio.ByteBuffer; 33 import java.nio.channels.ClosedChannelException; 34 import java.nio.channels.SelectableChannel; 35 import java.nio.channels.SelectionKey; 36 import static java.nio.channels.SelectionKey.OP_CONNECT; 37 import static java.nio.channels.SelectionKey.OP_READ; 38 import static java.nio.channels.SelectionKey.OP_WRITE; 39 import java.nio.channels.Selector; 40 import java.util.*; 41 import java.util.stream.Stream; 42 import java.util.concurrent.ExecutorService; 43 import java.security.NoSuchAlgorithmException; 44 import java.util.concurrent.Executors; 45 import java.util.concurrent.ThreadFactory; 46 import javax.net.ssl.SSLContext; 47 import javax.net.ssl.SSLParameters; 48 49 /** 50 * Client implementation. Contains all configuration information and also 51 * the selector manager thread which allows async events to be registered 52 * and delivered when they occur. See AsyncEvent. 53 */ 54 class HttpClientImpl extends HttpClient implements BufferHandler { 55 56 private final CookieManager cookieManager; 57 private final Redirect followRedirects; 58 private final ProxySelector proxySelector; 59 private final Authenticator authenticator; 60 private final Version version; 61 private boolean pipelining = false; 62 private final ConnectionPool connections; 63 private final ExecutorWrapper executor; 227 if ((i & OP_CONNECT) != 0) 228 sb.append("OP_CONNECT "); 229 if ((i & OP_WRITE) != 0) 230 sb.append("OP_WRITE "); 231 return sb.toString(); 232 } 233 234 @Override 235 public void run() { 236 try { 237 while (true) { 238 synchronized (this) { 239 debugList = copy(registrations); 240 for (AsyncEvent exchange : registrations) { 241 SelectableChannel c = exchange.channel(); 242 try { 243 c.configureBlocking(false); 244 SelectionKey key = c.keyFor(selector); 245 SelectorAttachment sa; 246 if (key == null) { 247 sa = new SelectorAttachment(c, selector); 248 } else { 249 sa = (SelectorAttachment)key.attachment(); 250 } 251 sa.register(exchange); 252 } catch (IOException e) { 253 Log.logError("HttpClientImpl: " + e); 254 c.close(); 255 // let the exchange deal with it 256 handleEvent(exchange); 257 } 258 } 259 registrations.clear(); 260 } 261 long timeval = getTimeoutValue(); 262 long now = System.currentTimeMillis(); 263 int n = selector.select(timeval); 264 if (n == 0) { 265 signalTimeouts(now); 266 continue; 267 } 268 Set<SelectionKey> keys = selector.selectedKeys(); 269 270 for (SelectionKey key : keys) { 271 SelectorAttachment sa = (SelectorAttachment)key.attachment(); 272 int eventsOccurred = key.readyOps(); 273 sa.events(eventsOccurred).forEach(readyList::add); 274 sa.resetInterestOps(eventsOccurred); 275 } 276 selector.selectNow(); // complete cancellation 277 selector.selectedKeys().clear(); 278 279 for (AsyncEvent exchange : readyList) { 280 if (exchange instanceof AsyncEvent.Blocking) { 281 exchange.channel().configureBlocking(true); 282 } else { 283 assert exchange instanceof AsyncEvent.NonBlocking; 284 } 285 executor.synchronize(); 286 handleEvent(exchange); // will be delegated to executor 287 } 288 readyList.clear(); 289 } 290 } catch (Throwable e) { 291 if (!closed) { 292 System.err.println("HttpClientImpl terminating on error"); 293 // This terminates thread. So, better just print stack trace 294 String err = Utils.stackTrace(e); 295 Log.logError("HttpClientImpl: fatal error: " + err); 296 } 297 } 298 } 299 300 void handleEvent(AsyncEvent e) { 301 if (closed) { 302 e.abort(); 303 } else { 304 e.handle(); 305 } 306 } 307 } 308 309 /** 310 * Tracks multiple user level registrations associated with one NIO 311 * registration (SelectionKey). In this implementation, registrations 312 * are one-off and when an event is posted the registration is cancelled 313 * until explicitly registered again. 314 * 315 * <p> No external synchronization required as this class is only used 316 * by the SelectorManager thread. One of these objects required per 317 * connection. 318 */ 319 private static class SelectorAttachment { 320 private final SelectableChannel chan; 321 private final Selector selector; 322 private final ArrayList<AsyncEvent> pending; 323 private int interestops; 324 325 SelectorAttachment(SelectableChannel chan, Selector selector) { 326 this.pending = new ArrayList<>(); 327 this.chan = chan; 328 this.selector = selector; 329 } 330 331 void register(AsyncEvent e) throws ClosedChannelException { 332 int newops = e.interestOps(); 333 boolean reRegister = (interestops & newops) != newops; 334 interestops |= newops; 335 pending.add(e); 336 if (reRegister) { 337 // first time registration happens here also 338 chan.register(selector, interestops, this); 339 } 340 } 341 342 int interestOps() { 343 return interestops; 344 } 345 346 /** 347 * Returns an Iterator<AsyncEvents> containing only events that are 348 * registered with the given {@code interestop}. 349 */ 350 Stream<AsyncEvent> events(int interestop) { 351 return pending.stream() 352 .filter(ev -> (ev.interestOps() & interestop) != 0); 353 } 354 355 /** 356 * Removes any events with the given {@code interestop}, and if no 357 * events remaining, cancels the associated SelectionKey. 358 */ 359 void resetInterestOps(int interestop) { 360 int newops = 0; 361 362 Iterator<AsyncEvent> itr = pending.iterator(); 363 while (itr.hasNext()) { 364 AsyncEvent event = itr.next(); 365 int evops = event.interestOps(); 366 if ((evops & interestop) != 0) { 367 itr.remove(); 368 } else { 369 newops |= evops; 370 } 371 } 372 373 interestops = newops; 374 SelectionKey key = chan.keyFor(selector); 375 if (newops == 0) { 376 key.cancel(); 377 } else { 378 key.interestOps(newops); 379 } 380 } 381 } 382 383 /** 384 * Creates a HttpRequest associated with this group. 385 * 386 * @throws IllegalStateException if the group has been stopped 387 */ 388 @Override 389 public HttpRequestBuilderImpl request() { 390 return new HttpRequestBuilderImpl(this, null); 391 } 392 393 /** 394 * Creates a HttpRequest associated with this group. 395 * 396 * @throws IllegalStateException if the group has been stopped 397 */ 398 @Override 399 public HttpRequestBuilderImpl request(URI uri) { 400 return new HttpRequestBuilderImpl(this, uri); 401 } |