< prev index next >
src/java.httpclient/share/classes/java/net/http/HttpClientImpl.java
Print this page
*** 21,60 ****
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
*/
package java.net.http;
import java.io.IOException;
import java.net.Authenticator;
import java.net.CookieManager;
import java.net.ProxySelector;
import java.net.URI;
- import static java.net.http.Utils.BUFSIZE;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
- import static java.nio.channels.SelectionKey.OP_CONNECT;
- import static java.nio.channels.SelectionKey.OP_READ;
- import static java.nio.channels.SelectionKey.OP_WRITE;
import java.nio.channels.Selector;
- import java.util.*;
- import java.util.stream.Stream;
- import java.util.concurrent.ExecutorService;
import java.security.NoSuchAlgorithmException;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
! import javax.net.ssl.SSLContext;
! import javax.net.ssl.SSLParameters;
/**
* Client implementation. Contains all configuration information and also
* the selector manager thread which allows async events to be registered
* and delivered when they occur. See AsyncEvent.
*/
class HttpClientImpl extends HttpClient implements BufferHandler {
private final CookieManager cookieManager;
private final Redirect followRedirects;
private final ProxySelector proxySelector;
private final Authenticator authenticator;
private final Version version;
--- 21,65 ----
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
*/
package java.net.http;
+ import javax.net.ssl.SSLContext;
+ import javax.net.ssl.SSLParameters;
import java.io.IOException;
import java.net.Authenticator;
import java.net.CookieManager;
import java.net.ProxySelector;
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.security.NoSuchAlgorithmException;
+ import java.util.ArrayList;
+ import java.util.Iterator;
+ import java.util.LinkedList;
+ import java.util.List;
+ import java.util.ListIterator;
+ import java.util.Optional;
+ import java.util.Set;
+ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
! import java.util.stream.Stream;
!
! import static java.net.http.Utils.BUFSIZE;
/**
* Client implementation. Contains all configuration information and also
* the selector manager thread which allows async events to be registered
* and delivered when they occur. See AsyncEvent.
*/
class HttpClientImpl extends HttpClient implements BufferHandler {
+ private static final ThreadFactory defaultFactory = Executors.defaultThreadFactory();
private final CookieManager cookieManager;
private final Redirect followRedirects;
private final ProxySelector proxySelector;
private final Authenticator authenticator;
private final Version version;
*** 65,75 ****
private final SSLContext sslContext;
private final SSLParameters sslParams;
private final SelectorManager selmgr;
private final FilterFactory filters;
private final Http2ClientImpl client2;
- private static final ThreadFactory defaultFactory = Executors.defaultThreadFactory();
private final LinkedList<TimeoutEvent> timeouts;
public static HttpClientImpl create(HttpClientBuilderImpl builder) {
HttpClientImpl impl = new HttpClientImpl(builder);
impl.start();
--- 70,79 ----
*** 143,182 ****
Http2ClientImpl client2() {
return client2;
}
! LinkedList<ByteBuffer> freelist = new LinkedList<>();
@Override
! public synchronized ByteBuffer getBuffer() {
! if (freelist.isEmpty()) {
! return ByteBuffer.allocate(BUFSIZE);
}
! return freelist.removeFirst();
}
@Override
public synchronized void returnBuffer(ByteBuffer buffer) {
- buffer.clear();
freelist.add(buffer);
}
// Main loop for this client's selector
! class SelectorManager extends Thread {
! final Selector selector;
! boolean closed;
!
! final List<AsyncEvent> readyList;
! final List<AsyncEvent> registrations;
SelectorManager() throws IOException {
! readyList = new LinkedList<>();
! registrations = new LinkedList<>();
selector = Selector.open();
}
// This returns immediately. So caller not allowed to send/receive
// on connection.
--- 147,207 ----
Http2ClientImpl client2() {
return client2;
}
! /**
! * We keep one size of buffer on free list. That size may increase
! * depending on demand. If that happens we dispose of free buffers
! * that are smaller than new size.
! */
! private final LinkedList<ByteBuffer> freelist = new LinkedList<>();
! int currentSize = BUFSIZE;
@Override
! public synchronized ByteBuffer getBuffer(int size) {
!
! ByteBuffer buf;
! if (size == -1)
! size = currentSize;
!
! if (size > currentSize)
! currentSize = size;
!
! while (!freelist.isEmpty()) {
! buf = freelist.removeFirst();
! if (buf.capacity() < currentSize)
! continue;
! buf.clear();
! return buf;
}
! return ByteBuffer.allocate(size);
}
@Override
public synchronized void returnBuffer(ByteBuffer buffer) {
freelist.add(buffer);
}
+ @Override
+ public synchronized void setMinBufferSize(int n) {
+ currentSize = Math.max(n, currentSize);
+ }
// Main loop for this client's selector
+ private final class SelectorManager extends Thread {
! private final Selector selector;
! private boolean closed;
! private final List<AsyncEvent> readyList;
! private final List<AsyncEvent> registrations;
SelectorManager() throws IOException {
! readyList = new ArrayList<>();
! registrations = new ArrayList<>();
selector = Selector.open();
+ setName("SelectorManager");
}
// This returns immediately. So caller not allowed to send/receive
// on connection.
*** 191,237 ****
synchronized void shutdown() {
closed = true;
try {
selector.close();
! } catch (IOException e) {}
! }
!
! private List<AsyncEvent> copy(List<AsyncEvent> list) {
! LinkedList<AsyncEvent> c = new LinkedList<>();
! for (AsyncEvent e : list) {
! c.add(e);
! }
! return c;
! }
!
! String opvals(int i) {
! StringBuilder sb = new StringBuilder();
! if ((i & OP_READ) != 0)
! sb.append("OP_READ ");
! if ((i & OP_CONNECT) != 0)
! sb.append("OP_CONNECT ");
! if ((i & OP_WRITE) != 0)
! sb.append("OP_WRITE ");
! return sb.toString();
}
@Override
public void run() {
try {
! while (true) {
synchronized (this) {
for (AsyncEvent exchange : registrations) {
SelectableChannel c = exchange.channel();
try {
c.configureBlocking(false);
SelectionKey key = c.keyFor(selector);
SelectorAttachment sa;
if (key == null) {
sa = new SelectorAttachment(c, selector);
} else {
! sa = (SelectorAttachment)key.attachment();
}
sa.register(exchange);
} catch (IOException e) {
Log.logError("HttpClientImpl: " + e);
c.close();
--- 216,243 ----
synchronized void shutdown() {
closed = true;
try {
selector.close();
! } catch (IOException ignored) { }
}
@Override
public void run() {
try {
! while (!Thread.currentThread().isInterrupted()) {
synchronized (this) {
for (AsyncEvent exchange : registrations) {
SelectableChannel c = exchange.channel();
try {
c.configureBlocking(false);
SelectionKey key = c.keyFor(selector);
SelectorAttachment sa;
if (key == null) {
sa = new SelectorAttachment(c, selector);
} else {
! sa = (SelectorAttachment) key.attachment();
}
sa.register(exchange);
} catch (IOException e) {
Log.logError("HttpClientImpl: " + e);
c.close();
*** 241,287 ****
}
registrations.clear();
}
long timeval = getTimeoutValue();
long now = System.currentTimeMillis();
int n = selector.select(timeval);
if (n == 0) {
signalTimeouts(now);
continue;
}
Set<SelectionKey> keys = selector.selectedKeys();
for (SelectionKey key : keys) {
! SelectorAttachment sa = (SelectorAttachment)key.attachment();
int eventsOccurred = key.readyOps();
sa.events(eventsOccurred).forEach(readyList::add);
sa.resetInterestOps(eventsOccurred);
}
selector.selectNow(); // complete cancellation
selector.selectedKeys().clear();
for (AsyncEvent exchange : readyList) {
! if (exchange instanceof AsyncEvent.Blocking) {
exchange.channel().configureBlocking(true);
- } else {
- assert exchange instanceof AsyncEvent.NonBlocking;
}
executor.synchronize();
handleEvent(exchange); // will be delegated to executor
}
readyList.clear();
}
} catch (Throwable e) {
if (!closed) {
- System.err.println("HttpClientImpl terminating on error");
// This terminates thread. So, better just print stack trace
String err = Utils.stackTrace(e);
Log.logError("HttpClientImpl: fatal error: " + err);
}
}
}
void handleEvent(AsyncEvent e) {
if (closed) {
e.abort();
} else {
e.handle();
--- 247,304 ----
}
registrations.clear();
}
long timeval = getTimeoutValue();
long now = System.currentTimeMillis();
+ //debugPrint(selector);
int n = selector.select(timeval);
if (n == 0) {
signalTimeouts(now);
continue;
}
Set<SelectionKey> keys = selector.selectedKeys();
for (SelectionKey key : keys) {
! SelectorAttachment sa = (SelectorAttachment) key.attachment();
int eventsOccurred = key.readyOps();
sa.events(eventsOccurred).forEach(readyList::add);
sa.resetInterestOps(eventsOccurred);
}
selector.selectNow(); // complete cancellation
selector.selectedKeys().clear();
for (AsyncEvent exchange : readyList) {
! if (exchange.blocking()) {
exchange.channel().configureBlocking(true);
}
executor.synchronize();
handleEvent(exchange); // will be delegated to executor
}
readyList.clear();
}
} catch (Throwable e) {
if (!closed) {
// This terminates thread. So, better just print stack trace
String err = Utils.stackTrace(e);
Log.logError("HttpClientImpl: fatal error: " + err);
}
+ } finally {
+ shutdown();
}
}
+ void debugPrint(Selector selector) {
+ System.err.println("Selector: debugprint start");
+ Set<SelectionKey> keys = selector.keys();
+ for (SelectionKey key : keys) {
+ SelectableChannel c = key.channel();
+ int ops = key.interestOps();
+ System.err.printf("selector chan:%s ops:%d\n", c, ops);
+ }
+ System.err.println("Selector: debugprint end");
+ }
+
void handleEvent(AsyncEvent e) {
if (closed) {
e.abort();
} else {
e.handle();
*** 301,384 ****
*/
private static class SelectorAttachment {
private final SelectableChannel chan;
private final Selector selector;
private final ArrayList<AsyncEvent> pending;
! private int interestops;
SelectorAttachment(SelectableChannel chan, Selector selector) {
this.pending = new ArrayList<>();
this.chan = chan;
this.selector = selector;
}
void register(AsyncEvent e) throws ClosedChannelException {
! int newops = e.interestOps();
! boolean reRegister = (interestops & newops) != newops;
! interestops |= newops;
pending.add(e);
if (reRegister) {
// first time registration happens here also
! chan.register(selector, interestops, this);
! }
}
-
- int interestOps() {
- return interestops;
}
/**
* Returns a Stream<AsyncEvents> containing only events that are
! * registered with the given {@code interestop}.
*/
! Stream<AsyncEvent> events(int interestop) {
return pending.stream()
! .filter(ev -> (ev.interestOps() & interestop) != 0);
}
/**
! * Removes any events with the given {@code interestop}, and if no
* events remaining, cancels the associated SelectionKey.
*/
! void resetInterestOps(int interestop) {
! int newops = 0;
Iterator<AsyncEvent> itr = pending.iterator();
while (itr.hasNext()) {
AsyncEvent event = itr.next();
int evops = event.interestOps();
! if ((evops & interestop) != 0) {
itr.remove();
} else {
! newops |= evops;
}
}
! interestops = newops;
SelectionKey key = chan.keyFor(selector);
! if (newops == 0) {
key.cancel();
} else {
! key.interestOps(newops);
}
}
}
/**
* Creates a HttpRequest associated with this group.
*
! * @throws IllegalStateException if the group has been stopped
*/
@Override
public HttpRequestBuilderImpl request() {
return new HttpRequestBuilderImpl(this, null);
}
/**
* Creates a HttpRequest associated with this group.
*
! * @throws IllegalStateException if the group has been stopped
*/
@Override
public HttpRequestBuilderImpl request(URI uri) {
return new HttpRequestBuilderImpl(this, uri);
}
--- 318,403 ----
*/
private static class SelectorAttachment {
private final SelectableChannel chan;
private final Selector selector;
private final ArrayList<AsyncEvent> pending;
! private int interestOps;
SelectorAttachment(SelectableChannel chan, Selector selector) {
this.pending = new ArrayList<>();
this.chan = chan;
this.selector = selector;
}
void register(AsyncEvent e) throws ClosedChannelException {
! int newOps = e.interestOps();
! boolean reRegister = (interestOps & newOps) != newOps;
! interestOps |= newOps;
pending.add(e);
if (reRegister) {
// first time registration happens here also
! chan.register(selector, interestOps, this);
}
}
/**
* Returns a Stream<AsyncEvents> containing only events that are
! * registered with the given {@code interestOps}.
*/
! Stream<AsyncEvent> events(int interestOps) {
return pending.stream()
! .filter(ev -> (ev.interestOps() & interestOps) != 0);
}
/**
! * Removes any events with the given {@code interestOps}, and if no
* events remaining, cancels the associated SelectionKey.
*/
! void resetInterestOps(int interestOps) {
! int newOps = 0;
Iterator<AsyncEvent> itr = pending.iterator();
while (itr.hasNext()) {
AsyncEvent event = itr.next();
int evops = event.interestOps();
! if (event.repeating()) {
! newOps |= evops;
! continue;
! }
! if ((evops & interestOps) != 0) {
itr.remove();
} else {
! newOps |= evops;
}
}
! this.interestOps = newOps;
SelectionKey key = chan.keyFor(selector);
! if (newOps == 0) {
key.cancel();
} else {
! key.interestOps(newOps);
}
}
}
/**
* Creates a HttpRequest associated with this group.
*
! * @throws IllegalStateException
! * if the group has been stopped
*/
@Override
public HttpRequestBuilderImpl request() {
return new HttpRequestBuilderImpl(this, null);
}
/**
* Creates a HttpRequest associated with this group.
*
! * @throws IllegalStateException
! * if the group has been stopped
*/
@Override
public HttpRequestBuilderImpl request(URI uri) {
return new HttpRequestBuilderImpl(this, uri);
}
*** 442,461 ****
boolean getHttp2Allowed() {
return version.equals(Version.HTTP_2);
}
! //void setHttp2NotSupported(String host) {
! //http2NotSupported.put(host, false);
! //}
!
! final void initFilters() {
addFilter(AuthenticationFilter.class);
addFilter(RedirectFilter.class);
}
! final void addFilter(Class<? extends HeaderFilter> f) {
filters.addFilter(f);
}
final List<HeaderFilter> filterChain() {
return filters.getFilterChain();
--- 461,476 ----
boolean getHttp2Allowed() {
return version.equals(Version.HTTP_2);
}
! private void initFilters() {
addFilter(AuthenticationFilter.class);
addFilter(RedirectFilter.class);
}
! private void addFilter(Class<? extends HeaderFilter> f) {
filters.addFilter(f);
}
final List<HeaderFilter> filterChain() {
return filters.getFilterChain();
*** 477,494 ****
event.delta = elapse - listval;
next.delta -= event.delta;
iter.previous();
break;
} else if (!iter.hasNext()) {
! event.delta = event.timeval - listval ;
}
}
iter.add(event);
selmgr.wakeupSelector();
}
! synchronized void signalTimeouts(long then) {
if (timeouts.isEmpty()) {
return;
}
long now = System.currentTimeMillis();
long duration = now - then;
--- 492,509 ----
event.delta = elapse - listval;
next.delta -= event.delta;
iter.previous();
break;
} else if (!iter.hasNext()) {
! event.delta = event.timeval - listval;
}
}
iter.add(event);
selmgr.wakeupSelector();
}
! private synchronized void signalTimeouts(long then) {
if (timeouts.isEmpty()) {
return;
}
long now = System.currentTimeMillis();
long duration = now - then;
*** 535,545 ****
"sun.net.httpclient.connectionWindowSize", 256 * 1024
);
}
// returns 0 meaning block forever, or a number of millis to block for
! synchronized long getTimeoutValue() {
if (timeouts.isEmpty()) {
return 0;
} else {
return timeouts.get(0).delta;
}
--- 550,560 ----
"sun.net.httpclient.connectionWindowSize", 256 * 1024
);
}
// returns 0 meaning block forever, or a number of millis to block for
! private synchronized long getTimeoutValue() {
if (timeouts.isEmpty()) {
return 0;
} else {
return timeouts.get(0).delta;
}
< prev index next >