< prev index next >

src/java.httpclient/share/classes/java/net/http/HttpClientImpl.java

Print this page

        

*** 21,60 **** * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA * or visit www.oracle.com if you need additional information or have any */ package java.net.http; import java.io.IOException; import java.net.Authenticator; import java.net.CookieManager; import java.net.ProxySelector; import java.net.URI; - import static java.net.http.Utils.BUFSIZE; import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; - import static java.nio.channels.SelectionKey.OP_CONNECT; - import static java.nio.channels.SelectionKey.OP_READ; - import static java.nio.channels.SelectionKey.OP_WRITE; import java.nio.channels.Selector; - import java.util.*; - import java.util.stream.Stream; - import java.util.concurrent.ExecutorService; import java.security.NoSuchAlgorithmException; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; ! import javax.net.ssl.SSLContext; ! import javax.net.ssl.SSLParameters; /** * Client implementation. Contains all configuration information and also * the selector manager thread which allows async events to be registered * and delivered when they occur. See AsyncEvent. */ class HttpClientImpl extends HttpClient implements BufferHandler { private final CookieManager cookieManager; private final Redirect followRedirects; private final ProxySelector proxySelector; private final Authenticator authenticator; private final Version version; --- 21,65 ---- * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA * or visit www.oracle.com if you need additional information or have any */ package java.net.http; + import javax.net.ssl.SSLContext; + import javax.net.ssl.SSLParameters; import java.io.IOException; import java.net.Authenticator; import java.net.CookieManager; import java.net.ProxySelector; import java.net.URI; import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.security.NoSuchAlgorithmException; + import java.util.ArrayList; + import java.util.Iterator; + import java.util.LinkedList; + import java.util.List; + import java.util.ListIterator; + import java.util.Optional; + import java.util.Set; + import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; ! import java.util.stream.Stream; ! ! import static java.net.http.Utils.BUFSIZE; /** * Client implementation. Contains all configuration information and also * the selector manager thread which allows async events to be registered * and delivered when they occur. See AsyncEvent. */ class HttpClientImpl extends HttpClient implements BufferHandler { + private static final ThreadFactory defaultFactory = Executors.defaultThreadFactory(); private final CookieManager cookieManager; private final Redirect followRedirects; private final ProxySelector proxySelector; private final Authenticator authenticator; private final Version version;
*** 65,75 **** private final SSLContext sslContext; private final SSLParameters sslParams; private final SelectorManager selmgr; private final FilterFactory filters; private final Http2ClientImpl client2; - private static final ThreadFactory defaultFactory = Executors.defaultThreadFactory(); private final LinkedList<TimeoutEvent> timeouts; public static HttpClientImpl create(HttpClientBuilderImpl builder) { HttpClientImpl impl = new HttpClientImpl(builder); impl.start(); --- 70,79 ----
*** 143,182 **** Http2ClientImpl client2() { return client2; } ! LinkedList<ByteBuffer> freelist = new LinkedList<>(); @Override ! public synchronized ByteBuffer getBuffer() { ! if (freelist.isEmpty()) { ! return ByteBuffer.allocate(BUFSIZE); } ! return freelist.removeFirst(); } @Override public synchronized void returnBuffer(ByteBuffer buffer) { - buffer.clear(); freelist.add(buffer); } // Main loop for this client's selector ! class SelectorManager extends Thread { ! final Selector selector; ! boolean closed; ! ! final List<AsyncEvent> readyList; ! final List<AsyncEvent> registrations; SelectorManager() throws IOException { ! readyList = new LinkedList<>(); ! registrations = new LinkedList<>(); selector = Selector.open(); } // This returns immediately. So caller not allowed to send/receive // on connection. --- 147,207 ---- Http2ClientImpl client2() { return client2; } ! /** ! * We keep one size of buffer on free list. That size may increase ! * depending on demand. If that happens we dispose of free buffers ! * that are smaller than new size. ! */ ! private final LinkedList<ByteBuffer> freelist = new LinkedList<>(); ! int currentSize = BUFSIZE; @Override ! public synchronized ByteBuffer getBuffer(int size) { ! ! ByteBuffer buf; ! if (size == -1) ! size = currentSize; ! ! if (size > currentSize) ! currentSize = size; ! ! while (!freelist.isEmpty()) { ! buf = freelist.removeFirst(); ! if (buf.capacity() < currentSize) ! continue; ! buf.clear(); ! return buf; } ! return ByteBuffer.allocate(size); } @Override public synchronized void returnBuffer(ByteBuffer buffer) { freelist.add(buffer); } + @Override + public synchronized void setMinBufferSize(int n) { + currentSize = Math.max(n, currentSize); + } // Main loop for this client's selector + private final class SelectorManager extends Thread { ! private final Selector selector; ! private boolean closed; ! private final List<AsyncEvent> readyList; ! private final List<AsyncEvent> registrations; SelectorManager() throws IOException { ! readyList = new ArrayList<>(); ! registrations = new ArrayList<>(); selector = Selector.open(); + setName("SelectorManager"); } // This returns immediately. So caller not allowed to send/receive // on connection.
*** 191,237 **** synchronized void shutdown() { closed = true; try { selector.close(); ! } catch (IOException e) {} ! } ! ! private List<AsyncEvent> copy(List<AsyncEvent> list) { ! LinkedList<AsyncEvent> c = new LinkedList<>(); ! for (AsyncEvent e : list) { ! c.add(e); ! } ! return c; ! } ! ! String opvals(int i) { ! StringBuilder sb = new StringBuilder(); ! if ((i & OP_READ) != 0) ! sb.append("OP_READ "); ! if ((i & OP_CONNECT) != 0) ! sb.append("OP_CONNECT "); ! if ((i & OP_WRITE) != 0) ! sb.append("OP_WRITE "); ! return sb.toString(); } @Override public void run() { try { ! while (true) { synchronized (this) { for (AsyncEvent exchange : registrations) { SelectableChannel c = exchange.channel(); try { c.configureBlocking(false); SelectionKey key = c.keyFor(selector); SelectorAttachment sa; if (key == null) { sa = new SelectorAttachment(c, selector); } else { ! sa = (SelectorAttachment)key.attachment(); } sa.register(exchange); } catch (IOException e) { Log.logError("HttpClientImpl: " + e); c.close(); --- 216,243 ---- synchronized void shutdown() { closed = true; try { selector.close(); ! } catch (IOException ignored) { } } @Override public void run() { try { ! while (!Thread.currentThread().isInterrupted()) { synchronized (this) { for (AsyncEvent exchange : registrations) { SelectableChannel c = exchange.channel(); try { c.configureBlocking(false); SelectionKey key = c.keyFor(selector); SelectorAttachment sa; if (key == null) { sa = new SelectorAttachment(c, selector); } else { ! sa = (SelectorAttachment) key.attachment(); } sa.register(exchange); } catch (IOException e) { Log.logError("HttpClientImpl: " + e); c.close();
*** 241,287 **** } registrations.clear(); } long timeval = getTimeoutValue(); long now = System.currentTimeMillis(); int n = selector.select(timeval); if (n == 0) { signalTimeouts(now); continue; } Set<SelectionKey> keys = selector.selectedKeys(); for (SelectionKey key : keys) { ! SelectorAttachment sa = (SelectorAttachment)key.attachment(); int eventsOccurred = key.readyOps(); sa.events(eventsOccurred).forEach(readyList::add); sa.resetInterestOps(eventsOccurred); } selector.selectNow(); // complete cancellation selector.selectedKeys().clear(); for (AsyncEvent exchange : readyList) { ! if (exchange instanceof AsyncEvent.Blocking) { exchange.channel().configureBlocking(true); - } else { - assert exchange instanceof AsyncEvent.NonBlocking; } executor.synchronize(); handleEvent(exchange); // will be delegated to executor } readyList.clear(); } } catch (Throwable e) { if (!closed) { - System.err.println("HttpClientImpl terminating on error"); // This terminates thread. So, better just print stack trace String err = Utils.stackTrace(e); Log.logError("HttpClientImpl: fatal error: " + err); } } } void handleEvent(AsyncEvent e) { if (closed) { e.abort(); } else { e.handle(); --- 247,304 ---- } registrations.clear(); } long timeval = getTimeoutValue(); long now = System.currentTimeMillis(); + //debugPrint(selector); int n = selector.select(timeval); if (n == 0) { signalTimeouts(now); continue; } Set<SelectionKey> keys = selector.selectedKeys(); for (SelectionKey key : keys) { ! SelectorAttachment sa = (SelectorAttachment) key.attachment(); int eventsOccurred = key.readyOps(); sa.events(eventsOccurred).forEach(readyList::add); sa.resetInterestOps(eventsOccurred); } selector.selectNow(); // complete cancellation selector.selectedKeys().clear(); for (AsyncEvent exchange : readyList) { ! if (exchange.blocking()) { exchange.channel().configureBlocking(true); } executor.synchronize(); handleEvent(exchange); // will be delegated to executor } readyList.clear(); } } catch (Throwable e) { if (!closed) { // This terminates thread. So, better just print stack trace String err = Utils.stackTrace(e); Log.logError("HttpClientImpl: fatal error: " + err); } + } finally { + shutdown(); } } + void debugPrint(Selector selector) { + System.err.println("Selector: debugprint start"); + Set<SelectionKey> keys = selector.keys(); + for (SelectionKey key : keys) { + SelectableChannel c = key.channel(); + int ops = key.interestOps(); + System.err.printf("selector chan:%s ops:%d\n", c, ops); + } + System.err.println("Selector: debugprint end"); + } + void handleEvent(AsyncEvent e) { if (closed) { e.abort(); } else { e.handle();
*** 301,384 **** */ private static class SelectorAttachment { private final SelectableChannel chan; private final Selector selector; private final ArrayList<AsyncEvent> pending; ! private int interestops; SelectorAttachment(SelectableChannel chan, Selector selector) { this.pending = new ArrayList<>(); this.chan = chan; this.selector = selector; } void register(AsyncEvent e) throws ClosedChannelException { ! int newops = e.interestOps(); ! boolean reRegister = (interestops & newops) != newops; ! interestops |= newops; pending.add(e); if (reRegister) { // first time registration happens here also ! chan.register(selector, interestops, this); ! } } - - int interestOps() { - return interestops; } /** * Returns a Stream<AsyncEvents> containing only events that are ! * registered with the given {@code interestop}. */ ! Stream<AsyncEvent> events(int interestop) { return pending.stream() ! .filter(ev -> (ev.interestOps() & interestop) != 0); } /** ! * Removes any events with the given {@code interestop}, and if no * events remaining, cancels the associated SelectionKey. */ ! void resetInterestOps(int interestop) { ! int newops = 0; Iterator<AsyncEvent> itr = pending.iterator(); while (itr.hasNext()) { AsyncEvent event = itr.next(); int evops = event.interestOps(); ! if ((evops & interestop) != 0) { itr.remove(); } else { ! newops |= evops; } } ! interestops = newops; SelectionKey key = chan.keyFor(selector); ! if (newops == 0) { key.cancel(); } else { ! key.interestOps(newops); } } } /** * Creates a HttpRequest associated with this group. * ! * @throws IllegalStateException if the group has been stopped */ @Override public HttpRequestBuilderImpl request() { return new HttpRequestBuilderImpl(this, null); } /** * Creates a HttpRequest associated with this group. * ! * @throws IllegalStateException if the group has been stopped */ @Override public HttpRequestBuilderImpl request(URI uri) { return new HttpRequestBuilderImpl(this, uri); } --- 318,403 ---- */ private static class SelectorAttachment { private final SelectableChannel chan; private final Selector selector; private final ArrayList<AsyncEvent> pending; ! private int interestOps; SelectorAttachment(SelectableChannel chan, Selector selector) { this.pending = new ArrayList<>(); this.chan = chan; this.selector = selector; } void register(AsyncEvent e) throws ClosedChannelException { ! int newOps = e.interestOps(); ! boolean reRegister = (interestOps & newOps) != newOps; ! interestOps |= newOps; pending.add(e); if (reRegister) { // first time registration happens here also ! chan.register(selector, interestOps, this); } } /** * Returns a Stream<AsyncEvents> containing only events that are ! * registered with the given {@code interestOps}. */ ! Stream<AsyncEvent> events(int interestOps) { return pending.stream() ! .filter(ev -> (ev.interestOps() & interestOps) != 0); } /** ! * Removes any events with the given {@code interestOps}, and if no * events remaining, cancels the associated SelectionKey. */ ! void resetInterestOps(int interestOps) { ! int newOps = 0; Iterator<AsyncEvent> itr = pending.iterator(); while (itr.hasNext()) { AsyncEvent event = itr.next(); int evops = event.interestOps(); ! if (event.repeating()) { ! newOps |= evops; ! continue; ! } ! if ((evops & interestOps) != 0) { itr.remove(); } else { ! newOps |= evops; } } ! this.interestOps = newOps; SelectionKey key = chan.keyFor(selector); ! if (newOps == 0) { key.cancel(); } else { ! key.interestOps(newOps); } } } /** * Creates a HttpRequest associated with this group. * ! * @throws IllegalStateException ! * if the group has been stopped */ @Override public HttpRequestBuilderImpl request() { return new HttpRequestBuilderImpl(this, null); } /** * Creates a HttpRequest associated with this group. * ! * @throws IllegalStateException ! * if the group has been stopped */ @Override public HttpRequestBuilderImpl request(URI uri) { return new HttpRequestBuilderImpl(this, uri); }
*** 442,461 **** boolean getHttp2Allowed() { return version.equals(Version.HTTP_2); } ! //void setHttp2NotSupported(String host) { ! //http2NotSupported.put(host, false); ! //} ! ! final void initFilters() { addFilter(AuthenticationFilter.class); addFilter(RedirectFilter.class); } ! final void addFilter(Class<? extends HeaderFilter> f) { filters.addFilter(f); } final List<HeaderFilter> filterChain() { return filters.getFilterChain(); --- 461,476 ---- boolean getHttp2Allowed() { return version.equals(Version.HTTP_2); } ! private void initFilters() { addFilter(AuthenticationFilter.class); addFilter(RedirectFilter.class); } ! private void addFilter(Class<? extends HeaderFilter> f) { filters.addFilter(f); } final List<HeaderFilter> filterChain() { return filters.getFilterChain();
*** 477,494 **** event.delta = elapse - listval; next.delta -= event.delta; iter.previous(); break; } else if (!iter.hasNext()) { ! event.delta = event.timeval - listval ; } } iter.add(event); selmgr.wakeupSelector(); } ! synchronized void signalTimeouts(long then) { if (timeouts.isEmpty()) { return; } long now = System.currentTimeMillis(); long duration = now - then; --- 492,509 ---- event.delta = elapse - listval; next.delta -= event.delta; iter.previous(); break; } else if (!iter.hasNext()) { ! event.delta = event.timeval - listval; } } iter.add(event); selmgr.wakeupSelector(); } ! private synchronized void signalTimeouts(long then) { if (timeouts.isEmpty()) { return; } long now = System.currentTimeMillis(); long duration = now - then;
*** 535,545 **** "sun.net.httpclient.connectionWindowSize", 256 * 1024 ); } // returns 0 meaning block forever, or a number of millis to block for ! synchronized long getTimeoutValue() { if (timeouts.isEmpty()) { return 0; } else { return timeouts.get(0).delta; } --- 550,560 ---- "sun.net.httpclient.connectionWindowSize", 256 * 1024 ); } // returns 0 meaning block forever, or a number of millis to block for ! private synchronized long getTimeoutValue() { if (timeouts.isEmpty()) { return 0; } else { return timeouts.get(0).delta; }
< prev index next >