< prev index next >
1 /*
2 * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation. Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 */
24 package java.net.http;
25
26 import java.io.IOException;
27 import java.net.Authenticator;
28 import java.net.CookieManager;
29 import java.net.ProxySelector;
30 import java.net.URI;
31 import static java.net.http.Utils.BUFSIZE;
32 import java.nio.ByteBuffer;
33 import java.nio.channels.ClosedChannelException;
34 import java.nio.channels.SelectableChannel;
35 import java.nio.channels.SelectionKey;
36 import static java.nio.channels.SelectionKey.OP_CONNECT;
37 import static java.nio.channels.SelectionKey.OP_READ;
38 import static java.nio.channels.SelectionKey.OP_WRITE;
39 import java.nio.channels.Selector;
40 import java.util.*;
41 import java.util.stream.Stream;
42 import java.util.concurrent.ExecutorService;
43 import java.security.NoSuchAlgorithmException;
44 import java.util.concurrent.Executors;
45 import java.util.concurrent.ThreadFactory;
46 import javax.net.ssl.SSLContext;
47 import javax.net.ssl.SSLParameters;
48
49 /**
50 * Client implementation. Contains all configuration information and also
51 * the selector manager thread which allows async events to be registered
52 * and delivered when they occur. See AsyncEvent.
53 */
54 class HttpClientImpl extends HttpClient implements BufferHandler {
55
56 private final CookieManager cookieManager;
57 private final Redirect followRedirects;
58 private final ProxySelector proxySelector;
59 private final Authenticator authenticator;
60 private final Version version;
61 private boolean pipelining = false;
62 private final ConnectionPool connections;
63 private final ExecutorWrapper executor;
64 // Security parameters
65 private final SSLContext sslContext;
66 private final SSLParameters sslParams;
67 private final SelectorManager selmgr;
68 private final FilterFactory filters;
69 private final Http2ClientImpl client2;
70 private static final ThreadFactory defaultFactory = Executors.defaultThreadFactory();
71 private final LinkedList<TimeoutEvent> timeouts;
72
73 public static HttpClientImpl create(HttpClientBuilderImpl builder) {
74 HttpClientImpl impl = new HttpClientImpl(builder);
75 impl.start();
76 return impl;
77 }
78
79 private HttpClientImpl(HttpClientBuilderImpl builder) {
80 if (builder.sslContext == null) {
81 try {
82 sslContext = SSLContext.getDefault();
83 } catch (NoSuchAlgorithmException ex) {
84 throw new InternalError(ex);
85 }
86 } else {
87 sslContext = builder.sslContext;
88 }
89 ExecutorService ex = builder.executor;
90 if (ex == null) {
91 ex = Executors.newCachedThreadPool((r) -> {
92 Thread t = defaultFactory.newThread(r);
93 t.setDaemon(true);
94 return t;
95 });
96 } else {
97 ex = builder.executor;
98 }
99 client2 = new Http2ClientImpl(this);
100 executor = ExecutorWrapper.wrap(ex);
101 cookieManager = builder.cookieManager;
102 followRedirects = builder.followRedirects == null ?
103 Redirect.NEVER : builder.followRedirects;
104 this.proxySelector = builder.proxy;
105 authenticator = builder.authenticator;
106 version = builder.version;
107 sslParams = builder.sslParams;
108 connections = new ConnectionPool();
109 connections.start();
110 timeouts = new LinkedList<>();
111 try {
112 selmgr = new SelectorManager();
113 } catch (IOException e) {
114 // unlikely
115 throw new InternalError(e);
116 }
117 selmgr.setDaemon(true);
118 selmgr.setName("HttpSelector");
119 filters = new FilterFactory();
120 initFilters();
121 }
122
123 private void start() {
124 selmgr.start();
125 }
126
127 /**
128 * Wait for activity on given exchange (assuming blocking = false).
129 * It's a no-op if blocking = true. In particular, the following occurs
130 * in the SelectorManager thread.
131 *
132 * 1) mark the connection non-blocking
133 * 2) add to selector
134 * 3) If selector fires for this exchange then
135 * 4) - mark connection as blocking
136 * 5) - call AsyncEvent.handle()
137 *
138 * If exchange needs to block again, then call registerEvent() again
139 */
140 void registerEvent(AsyncEvent exchange) throws IOException {
141 selmgr.register(exchange);
142 }
143
144 Http2ClientImpl client2() {
145 return client2;
146 }
147
148 LinkedList<ByteBuffer> freelist = new LinkedList<>();
149
150 @Override
151 public synchronized ByteBuffer getBuffer() {
152 if (freelist.isEmpty()) {
153 return ByteBuffer.allocate(BUFSIZE);
154 }
155 return freelist.removeFirst();
156 }
157
158 @Override
159 public synchronized void returnBuffer(ByteBuffer buffer) {
160 buffer.clear();
161 freelist.add(buffer);
162 }
163
164
165 // Main loop for this client's selector
166
167 class SelectorManager extends Thread {
168 final Selector selector;
169 boolean closed;
170
171 final List<AsyncEvent> readyList;
172 final List<AsyncEvent> registrations;
173
174 SelectorManager() throws IOException {
175 readyList = new LinkedList<>();
176 registrations = new LinkedList<>();
177 selector = Selector.open();
178 }
179
180 // This returns immediately. So caller not allowed to send/receive
181 // on connection.
182
183 synchronized void register(AsyncEvent e) throws IOException {
184 registrations.add(e);
185 selector.wakeup();
186 }
187
188 void wakeupSelector() {
189 selector.wakeup();
190 }
191
192 synchronized void shutdown() {
193 closed = true;
194 try {
195 selector.close();
196 } catch (IOException e) {}
197 }
198
199 private List<AsyncEvent> copy(List<AsyncEvent> list) {
200 LinkedList<AsyncEvent> c = new LinkedList<>();
201 for (AsyncEvent e : list) {
202 c.add(e);
203 }
204 return c;
205 }
206
207 String opvals(int i) {
208 StringBuilder sb = new StringBuilder();
209 if ((i & OP_READ) != 0)
210 sb.append("OP_READ ");
211 if ((i & OP_CONNECT) != 0)
212 sb.append("OP_CONNECT ");
213 if ((i & OP_WRITE) != 0)
214 sb.append("OP_WRITE ");
215 return sb.toString();
216 }
217
218 @Override
219 public void run() {
220 try {
221 while (true) {
222 synchronized (this) {
223 for (AsyncEvent exchange : registrations) {
224 SelectableChannel c = exchange.channel();
225 try {
226 c.configureBlocking(false);
227 SelectionKey key = c.keyFor(selector);
228 SelectorAttachment sa;
229 if (key == null) {
230 sa = new SelectorAttachment(c, selector);
231 } else {
232 sa = (SelectorAttachment)key.attachment();
233 }
234 sa.register(exchange);
235 } catch (IOException e) {
236 Log.logError("HttpClientImpl: " + e);
237 c.close();
238 // let the exchange deal with it
239 handleEvent(exchange);
240 }
241 }
242 registrations.clear();
243 }
244 long timeval = getTimeoutValue();
245 long now = System.currentTimeMillis();
246 int n = selector.select(timeval);
247 if (n == 0) {
248 signalTimeouts(now);
249 continue;
250 }
251 Set<SelectionKey> keys = selector.selectedKeys();
252
253 for (SelectionKey key : keys) {
254 SelectorAttachment sa = (SelectorAttachment)key.attachment();
255 int eventsOccurred = key.readyOps();
256 sa.events(eventsOccurred).forEach(readyList::add);
257 sa.resetInterestOps(eventsOccurred);
258 }
259 selector.selectNow(); // complete cancellation
260 selector.selectedKeys().clear();
261
262 for (AsyncEvent exchange : readyList) {
263 if (exchange instanceof AsyncEvent.Blocking) {
264 exchange.channel().configureBlocking(true);
265 } else {
266 assert exchange instanceof AsyncEvent.NonBlocking;
267 }
268 executor.synchronize();
269 handleEvent(exchange); // will be delegated to executor
270 }
271 readyList.clear();
272 }
273 } catch (Throwable e) {
274 if (!closed) {
275 System.err.println("HttpClientImpl terminating on error");
276 // This terminates thread. So, better just print stack trace
277 String err = Utils.stackTrace(e);
278 Log.logError("HttpClientImpl: fatal error: " + err);
279 }
280 }
281 }
282
283 void handleEvent(AsyncEvent e) {
284 if (closed) {
285 e.abort();
286 } else {
287 e.handle();
288 }
289 }
290 }
291
292 /**
293 * Tracks multiple user level registrations associated with one NIO
294 * registration (SelectionKey). In this implementation, registrations
295 * are one-off and when an event is posted the registration is cancelled
296 * until explicitly registered again.
297 *
298 * <p> No external synchronization required as this class is only used
299 * by the SelectorManager thread. One of these objects required per
300 * connection.
301 */
302 private static class SelectorAttachment {
303 private final SelectableChannel chan;
304 private final Selector selector;
305 private final ArrayList<AsyncEvent> pending;
306 private int interestops;
307
308 SelectorAttachment(SelectableChannel chan, Selector selector) {
309 this.pending = new ArrayList<>();
310 this.chan = chan;
311 this.selector = selector;
312 }
313
314 void register(AsyncEvent e) throws ClosedChannelException {
315 int newops = e.interestOps();
316 boolean reRegister = (interestops & newops) != newops;
317 interestops |= newops;
318 pending.add(e);
319 if (reRegister) {
320 // first time registration happens here also
321 chan.register(selector, interestops, this);
322 }
323 }
324
325 int interestOps() {
326 return interestops;
327 }
328
329 /**
330 * Returns a Stream<AsyncEvents> containing only events that are
331 * registered with the given {@code interestop}.
332 */
333 Stream<AsyncEvent> events(int interestop) {
334 return pending.stream()
335 .filter(ev -> (ev.interestOps() & interestop) != 0);
336 }
337
338 /**
339 * Removes any events with the given {@code interestop}, and if no
340 * events remaining, cancels the associated SelectionKey.
341 */
342 void resetInterestOps(int interestop) {
343 int newops = 0;
344
345 Iterator<AsyncEvent> itr = pending.iterator();
346 while (itr.hasNext()) {
347 AsyncEvent event = itr.next();
348 int evops = event.interestOps();
349 if ((evops & interestop) != 0) {
350 itr.remove();
351 } else {
352 newops |= evops;
353 }
354 }
355
356 interestops = newops;
357 SelectionKey key = chan.keyFor(selector);
358 if (newops == 0) {
359 key.cancel();
360 } else {
361 key.interestOps(newops);
362 }
363 }
364 }
365
366 /**
367 * Creates a HttpRequest associated with this group.
368 *
369 * @throws IllegalStateException if the group has been stopped
370 */
371 @Override
372 public HttpRequestBuilderImpl request() {
373 return new HttpRequestBuilderImpl(this, null);
374 }
375
376 /**
377 * Creates a HttpRequest associated with this group.
378 *
379 * @throws IllegalStateException if the group has been stopped
380 */
381 @Override
382 public HttpRequestBuilderImpl request(URI uri) {
383 return new HttpRequestBuilderImpl(this, uri);
384 }
385
386 @Override
387 public SSLContext sslContext() {
388 Utils.checkNetPermission("getSSLContext");
389 return sslContext;
390 }
391
392 @Override
393 public Optional<SSLParameters> sslParameters() {
394 return Optional.ofNullable(sslParams);
395 }
396
397 @Override
398 public Optional<Authenticator> authenticator() {
399 return Optional.ofNullable(authenticator);
400 }
401
402 @Override
403 public ExecutorService executorService() {
404 return executor.userExecutor();
405 }
406
407 ExecutorWrapper executorWrapper() {
408 return executor;
409 }
410
411 @Override
412 public boolean pipelining() {
413 return this.pipelining;
414 }
415
416 ConnectionPool connectionPool() {
417 return connections;
418 }
419
420 @Override
421 public Redirect followRedirects() {
422 return followRedirects;
423 }
424
425
426 @Override
427 public Optional<CookieManager> cookieManager() {
428 return Optional.ofNullable(cookieManager);
429 }
430
431 @Override
432 public Optional<ProxySelector> proxy() {
433 return Optional.ofNullable(this.proxySelector);
434 }
435
436 @Override
437 public Version version() {
438 return version;
439 }
440
441 //private final HashMap<String, Boolean> http2NotSupported = new HashMap<>();
442
443 boolean getHttp2Allowed() {
444 return version.equals(Version.HTTP_2);
445 }
446
447 //void setHttp2NotSupported(String host) {
448 //http2NotSupported.put(host, false);
449 //}
450
451 final void initFilters() {
452 addFilter(AuthenticationFilter.class);
453 addFilter(RedirectFilter.class);
454 }
455
456 final void addFilter(Class<? extends HeaderFilter> f) {
457 filters.addFilter(f);
458 }
459
460 final List<HeaderFilter> filterChain() {
461 return filters.getFilterChain();
462 }
463
464 // Timer controls. Timers are implemented through timed Selector.select()
465 // calls.
466 synchronized void registerTimer(TimeoutEvent event) {
467 long elapse = event.timevalMillis();
468 ListIterator<TimeoutEvent> iter = timeouts.listIterator();
469 long listval = 0;
470 event.delta = event.timeval; // in case list empty
471 TimeoutEvent next;
472 while (iter.hasNext()) {
473 next = iter.next();
474 listval += next.delta;
475 if (elapse < listval) {
476 listval -= next.delta;
477 event.delta = elapse - listval;
478 next.delta -= event.delta;
479 iter.previous();
480 break;
481 } else if (!iter.hasNext()) {
482 event.delta = event.timeval - listval ;
483 }
484 }
485 iter.add(event);
486 selmgr.wakeupSelector();
487 }
488
489 synchronized void signalTimeouts(long then) {
490 if (timeouts.isEmpty()) {
491 return;
492 }
493 long now = System.currentTimeMillis();
494 long duration = now - then;
495 ListIterator<TimeoutEvent> iter = timeouts.listIterator();
496 TimeoutEvent event = iter.next();
497 long delta = event.delta;
498 if (duration < delta) {
499 event.delta -= duration;
500 return;
501 }
502 event.handle();
503 iter.remove();
504 while (iter.hasNext()) {
505 event = iter.next();
506 if (event.delta == 0) {
507 event.handle();
508 iter.remove();
509 } else {
510 event.delta += delta;
511 break;
512 }
513 }
514 }
515
516 synchronized void cancelTimer(TimeoutEvent event) {
517 ListIterator<TimeoutEvent> iter = timeouts.listIterator();
518 while (iter.hasNext()) {
519 TimeoutEvent ev = iter.next();
520 if (event == ev) {
521 if (iter.hasNext()) {
522 // adjust
523 TimeoutEvent next = iter.next();
524 next.delta += ev.delta;
525 iter.previous();
526 }
527 iter.remove();
528 }
529 }
530 }
531
532 // used for the connection window
533 int getReceiveBufferSize() {
534 return Utils.getIntegerNetProperty(
535 "sun.net.httpclient.connectionWindowSize", 256 * 1024
536 );
537 }
538
539 // returns 0 meaning block forever, or a number of millis to block for
540 synchronized long getTimeoutValue() {
541 if (timeouts.isEmpty()) {
542 return 0;
543 } else {
544 return timeouts.get(0).delta;
545 }
546 }
547 }
< prev index next >