< prev index next >
1 /*
2 * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation. Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 */
24 package java.net.http;
25
26 import javax.net.ssl.SSLContext;
27 import javax.net.ssl.SSLParameters;
28 import java.io.IOException;
29 import java.net.Authenticator;
30 import java.net.CookieManager;
31 import java.net.ProxySelector;
32 import java.net.URI;
33 import java.nio.ByteBuffer;
34 import java.nio.channels.ClosedChannelException;
35 import java.nio.channels.SelectableChannel;
36 import java.nio.channels.SelectionKey;
37 import java.nio.channels.Selector;
38 import java.security.NoSuchAlgorithmException;
39 import java.util.ArrayList;
40 import java.util.Iterator;
41 import java.util.LinkedList;
42 import java.util.List;
43 import java.util.ListIterator;
44 import java.util.Optional;
45 import java.util.Set;
46 import java.util.concurrent.ExecutorService;
47 import java.util.concurrent.Executors;
48 import java.util.concurrent.ThreadFactory;
49 import java.util.stream.Stream;
50
51 import static java.net.http.Utils.BUFSIZE;
52
53 /**
54 * Client implementation. Contains all configuration information and also
55 * the selector manager thread which allows async events to be registered
56 * and delivered when they occur. See AsyncEvent.
57 */
58 class HttpClientImpl extends HttpClient implements BufferHandler {
59
60 private static final ThreadFactory defaultFactory = Executors.defaultThreadFactory();
61 private final CookieManager cookieManager;
62 private final Redirect followRedirects;
63 private final ProxySelector proxySelector;
64 private final Authenticator authenticator;
65 private final Version version;
66 private boolean pipelining = false;
67 private final ConnectionPool connections;
68 private final ExecutorWrapper executor;
69 // Security parameters
70 private final SSLContext sslContext;
71 private final SSLParameters sslParams;
72 private final SelectorManager selmgr;
73 private final FilterFactory filters;
74 private final Http2ClientImpl client2;
75 private final LinkedList<TimeoutEvent> timeouts;
76
77 public static HttpClientImpl create(HttpClientBuilderImpl builder) {
78 HttpClientImpl impl = new HttpClientImpl(builder);
79 impl.start();
80 return impl;
81 }
82
83 private HttpClientImpl(HttpClientBuilderImpl builder) {
84 if (builder.sslContext == null) {
85 try {
86 sslContext = SSLContext.getDefault();
87 } catch (NoSuchAlgorithmException ex) {
88 throw new InternalError(ex);
89 }
90 } else {
91 sslContext = builder.sslContext;
92 }
93 ExecutorService ex = builder.executor;
94 if (ex == null) {
95 ex = Executors.newCachedThreadPool((r) -> {
96 Thread t = defaultFactory.newThread(r);
97 t.setDaemon(true);
98 return t;
99 });
100 } else {
101 ex = builder.executor;
102 }
103 client2 = new Http2ClientImpl(this);
104 executor = ExecutorWrapper.wrap(ex);
105 cookieManager = builder.cookieManager;
106 followRedirects = builder.followRedirects == null ?
107 Redirect.NEVER : builder.followRedirects;
108 this.proxySelector = builder.proxy;
109 authenticator = builder.authenticator;
110 version = builder.version;
111 sslParams = builder.sslParams;
112 connections = new ConnectionPool();
113 connections.start();
114 timeouts = new LinkedList<>();
115 try {
116 selmgr = new SelectorManager();
117 } catch (IOException e) {
118 // unlikely
119 throw new InternalError(e);
120 }
121 selmgr.setDaemon(true);
122 selmgr.setName("HttpSelector");
123 filters = new FilterFactory();
124 initFilters();
125 }
126
127 private void start() {
128 selmgr.start();
129 }
130
131 /**
132 * Wait for activity on given exchange (assuming blocking = false).
133 * It's a no-op if blocking = true. In particular, the following occurs
134 * in the SelectorManager thread.
135 *
136 * 1) mark the connection non-blocking
137 * 2) add to selector
138 * 3) If selector fires for this exchange then
139 * 4) - mark connection as blocking
140 * 5) - call AsyncEvent.handle()
141 *
142 * If exchange needs to block again, then call registerEvent() again
143 */
144 void registerEvent(AsyncEvent exchange) throws IOException {
145 selmgr.register(exchange);
146 }
147
148 Http2ClientImpl client2() {
149 return client2;
150 }
151
152 /**
153 * We keep one size of buffer on free list. That size may increase
154 * depending on demand. If that happens we dispose of free buffers
155 * that are smaller than new size.
156 */
157 private final LinkedList<ByteBuffer> freelist = new LinkedList<>();
158 int currentSize = BUFSIZE;
159
160 @Override
161 public synchronized ByteBuffer getBuffer(int size) {
162
163 ByteBuffer buf;
164 if (size == -1)
165 size = currentSize;
166
167 if (size > currentSize)
168 currentSize = size;
169
170 while (!freelist.isEmpty()) {
171 buf = freelist.removeFirst();
172 if (buf.capacity() < currentSize)
173 continue;
174 buf.clear();
175 return buf;
176 }
177 return ByteBuffer.allocate(size);
178 }
179
180 @Override
181 public synchronized void returnBuffer(ByteBuffer buffer) {
182 freelist.add(buffer);
183 }
184
185 @Override
186 public synchronized void setMinBufferSize(int n) {
187 currentSize = Math.max(n, currentSize);
188 }
189
190 // Main loop for this client's selector
191 private final class SelectorManager extends Thread {
192
193 private final Selector selector;
194 private boolean closed;
195 private final List<AsyncEvent> readyList;
196 private final List<AsyncEvent> registrations;
197
198 SelectorManager() throws IOException {
199 readyList = new ArrayList<>();
200 registrations = new ArrayList<>();
201 selector = Selector.open();
202 setName("SelectorManager");
203 }
204
205 // This returns immediately. So caller not allowed to send/receive
206 // on connection.
207
208 synchronized void register(AsyncEvent e) throws IOException {
209 registrations.add(e);
210 selector.wakeup();
211 }
212
213 void wakeupSelector() {
214 selector.wakeup();
215 }
216
217 synchronized void shutdown() {
218 closed = true;
219 try {
220 selector.close();
221 } catch (IOException ignored) { }
222 }
223
224 @Override
225 public void run() {
226 try {
227 while (!Thread.currentThread().isInterrupted()) {
228 synchronized (this) {
229 for (AsyncEvent exchange : registrations) {
230 SelectableChannel c = exchange.channel();
231 try {
232 c.configureBlocking(false);
233 SelectionKey key = c.keyFor(selector);
234 SelectorAttachment sa;
235 if (key == null) {
236 sa = new SelectorAttachment(c, selector);
237 } else {
238 sa = (SelectorAttachment) key.attachment();
239 }
240 sa.register(exchange);
241 } catch (IOException e) {
242 Log.logError("HttpClientImpl: " + e);
243 c.close();
244 // let the exchange deal with it
245 handleEvent(exchange);
246 }
247 }
248 registrations.clear();
249 }
250 long timeval = getTimeoutValue();
251 long now = System.currentTimeMillis();
252 //debugPrint(selector);
253 int n = selector.select(timeval);
254 if (n == 0) {
255 signalTimeouts(now);
256 continue;
257 }
258 Set<SelectionKey> keys = selector.selectedKeys();
259
260 for (SelectionKey key : keys) {
261 SelectorAttachment sa = (SelectorAttachment) key.attachment();
262 int eventsOccurred = key.readyOps();
263 sa.events(eventsOccurred).forEach(readyList::add);
264 sa.resetInterestOps(eventsOccurred);
265 }
266 selector.selectNow(); // complete cancellation
267 selector.selectedKeys().clear();
268
269 for (AsyncEvent exchange : readyList) {
270 if (exchange.blocking()) {
271 exchange.channel().configureBlocking(true);
272 }
273 executor.synchronize();
274 handleEvent(exchange); // will be delegated to executor
275 }
276 readyList.clear();
277 }
278 } catch (Throwable e) {
279 if (!closed) {
280 // This terminates thread. So, better just print stack trace
281 String err = Utils.stackTrace(e);
282 Log.logError("HttpClientImpl: fatal error: " + err);
283 }
284 } finally {
285 shutdown();
286 }
287 }
288
289 void debugPrint(Selector selector) {
290 System.err.println("Selector: debugprint start");
291 Set<SelectionKey> keys = selector.keys();
292 for (SelectionKey key : keys) {
293 SelectableChannel c = key.channel();
294 int ops = key.interestOps();
295 System.err.printf("selector chan:%s ops:%d\n", c, ops);
296 }
297 System.err.println("Selector: debugprint end");
298 }
299
300 void handleEvent(AsyncEvent e) {
301 if (closed) {
302 e.abort();
303 } else {
304 e.handle();
305 }
306 }
307 }
308
309 /**
310 * Tracks multiple user level registrations associated with one NIO
311 * registration (SelectionKey). In this implementation, registrations
312 * are one-off and when an event is posted the registration is cancelled
313 * until explicitly registered again.
314 *
315 * <p> No external synchronization required as this class is only used
316 * by the SelectorManager thread. One of these objects required per
317 * connection.
318 */
319 private static class SelectorAttachment {
320 private final SelectableChannel chan;
321 private final Selector selector;
322 private final ArrayList<AsyncEvent> pending;
323 private int interestOps;
324
325 SelectorAttachment(SelectableChannel chan, Selector selector) {
326 this.pending = new ArrayList<>();
327 this.chan = chan;
328 this.selector = selector;
329 }
330
331 void register(AsyncEvent e) throws ClosedChannelException {
332 int newOps = e.interestOps();
333 boolean reRegister = (interestOps & newOps) != newOps;
334 interestOps |= newOps;
335 pending.add(e);
336 if (reRegister) {
337 // first time registration happens here also
338 chan.register(selector, interestOps, this);
339 }
340 }
341
342 /**
343 * Returns a Stream<AsyncEvents> containing only events that are
344 * registered with the given {@code interestOps}.
345 */
346 Stream<AsyncEvent> events(int interestOps) {
347 return pending.stream()
348 .filter(ev -> (ev.interestOps() & interestOps) != 0);
349 }
350
351 /**
352 * Removes any events with the given {@code interestOps}, and if no
353 * events remaining, cancels the associated SelectionKey.
354 */
355 void resetInterestOps(int interestOps) {
356 int newOps = 0;
357
358 Iterator<AsyncEvent> itr = pending.iterator();
359 while (itr.hasNext()) {
360 AsyncEvent event = itr.next();
361 int evops = event.interestOps();
362 if (event.repeating()) {
363 newOps |= evops;
364 continue;
365 }
366 if ((evops & interestOps) != 0) {
367 itr.remove();
368 } else {
369 newOps |= evops;
370 }
371 }
372
373 this.interestOps = newOps;
374 SelectionKey key = chan.keyFor(selector);
375 if (newOps == 0) {
376 key.cancel();
377 } else {
378 key.interestOps(newOps);
379 }
380 }
381 }
382
383 /**
384 * Creates a HttpRequest associated with this group.
385 *
386 * @throws IllegalStateException
387 * if the group has been stopped
388 */
389 @Override
390 public HttpRequestBuilderImpl request() {
391 return new HttpRequestBuilderImpl(this, null);
392 }
393
394 /**
395 * Creates a HttpRequest associated with this group.
396 *
397 * @throws IllegalStateException
398 * if the group has been stopped
399 */
400 @Override
401 public HttpRequestBuilderImpl request(URI uri) {
402 return new HttpRequestBuilderImpl(this, uri);
403 }
404
405 @Override
406 public SSLContext sslContext() {
407 Utils.checkNetPermission("getSSLContext");
408 return sslContext;
409 }
410
411 @Override
412 public Optional<SSLParameters> sslParameters() {
413 return Optional.ofNullable(sslParams);
414 }
415
416 @Override
417 public Optional<Authenticator> authenticator() {
418 return Optional.ofNullable(authenticator);
419 }
420
421 @Override
422 public ExecutorService executorService() {
423 return executor.userExecutor();
424 }
425
426 ExecutorWrapper executorWrapper() {
427 return executor;
428 }
429
430 @Override
431 public boolean pipelining() {
432 return this.pipelining;
433 }
434
435 ConnectionPool connectionPool() {
436 return connections;
437 }
438
439 @Override
440 public Redirect followRedirects() {
441 return followRedirects;
442 }
443
444
445 @Override
446 public Optional<CookieManager> cookieManager() {
447 return Optional.ofNullable(cookieManager);
448 }
449
450 @Override
451 public Optional<ProxySelector> proxy() {
452 return Optional.ofNullable(this.proxySelector);
453 }
454
455 @Override
456 public Version version() {
457 return version;
458 }
459
460 //private final HashMap<String, Boolean> http2NotSupported = new HashMap<>();
461
462 boolean getHttp2Allowed() {
463 return version.equals(Version.HTTP_2);
464 }
465
466 private void initFilters() {
467 addFilter(AuthenticationFilter.class);
468 addFilter(RedirectFilter.class);
469 }
470
471 private void addFilter(Class<? extends HeaderFilter> f) {
472 filters.addFilter(f);
473 }
474
475 final List<HeaderFilter> filterChain() {
476 return filters.getFilterChain();
477 }
478
479 // Timer controls. Timers are implemented through timed Selector.select()
480 // calls.
481 synchronized void registerTimer(TimeoutEvent event) {
482 long elapse = event.timevalMillis();
483 ListIterator<TimeoutEvent> iter = timeouts.listIterator();
484 long listval = 0;
485 event.delta = event.timeval; // in case list empty
486 TimeoutEvent next;
487 while (iter.hasNext()) {
488 next = iter.next();
489 listval += next.delta;
490 if (elapse < listval) {
491 listval -= next.delta;
492 event.delta = elapse - listval;
493 next.delta -= event.delta;
494 iter.previous();
495 break;
496 } else if (!iter.hasNext()) {
497 event.delta = event.timeval - listval;
498 }
499 }
500 iter.add(event);
501 selmgr.wakeupSelector();
502 }
503
504 private synchronized void signalTimeouts(long then) {
505 if (timeouts.isEmpty()) {
506 return;
507 }
508 long now = System.currentTimeMillis();
509 long duration = now - then;
510 ListIterator<TimeoutEvent> iter = timeouts.listIterator();
511 TimeoutEvent event = iter.next();
512 long delta = event.delta;
513 if (duration < delta) {
514 event.delta -= duration;
515 return;
516 }
517 event.handle();
518 iter.remove();
519 while (iter.hasNext()) {
520 event = iter.next();
521 if (event.delta == 0) {
522 event.handle();
523 iter.remove();
524 } else {
525 event.delta += delta;
526 break;
527 }
528 }
529 }
530
531 synchronized void cancelTimer(TimeoutEvent event) {
532 ListIterator<TimeoutEvent> iter = timeouts.listIterator();
533 while (iter.hasNext()) {
534 TimeoutEvent ev = iter.next();
535 if (event == ev) {
536 if (iter.hasNext()) {
537 // adjust
538 TimeoutEvent next = iter.next();
539 next.delta += ev.delta;
540 iter.previous();
541 }
542 iter.remove();
543 }
544 }
545 }
546
547 // used for the connection window
548 int getReceiveBufferSize() {
549 return Utils.getIntegerNetProperty(
550 "sun.net.httpclient.connectionWindowSize", 256 * 1024
551 );
552 }
553
554 // returns 0 meaning block forever, or a number of millis to block for
555 private synchronized long getTimeoutValue() {
556 if (timeouts.isEmpty()) {
557 return 0;
558 } else {
559 return timeouts.get(0).delta;
560 }
561 }
562 }
< prev index next >