< prev index next >
src/java.httpclient/share/classes/java/net/http/HttpClientImpl.java
Print this page
*** 35,52 ****
import java.nio.channels.SelectionKey;
import static java.nio.channels.SelectionKey.OP_CONNECT;
import static java.nio.channels.SelectionKey.OP_READ;
import static java.nio.channels.SelectionKey.OP_WRITE;
import java.nio.channels.Selector;
! import java.util.LinkedList;
! import java.util.List;
! import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.security.NoSuchAlgorithmException;
- import java.util.Iterator;
- import java.util.ListIterator;
- import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLParameters;
--- 35,48 ----
import java.nio.channels.SelectionKey;
import static java.nio.channels.SelectionKey.OP_CONNECT;
import static java.nio.channels.SelectionKey.OP_READ;
import static java.nio.channels.SelectionKey.OP_WRITE;
import java.nio.channels.Selector;
! import java.util.*;
! import java.util.stream.Stream;
import java.util.concurrent.ExecutorService;
import java.security.NoSuchAlgorithmException;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLParameters;
*** 246,256 ****
try {
c.configureBlocking(false);
SelectionKey key = c.keyFor(selector);
SelectorAttachment sa;
if (key == null) {
! sa = new SelectorAttachment(c);
} else {
sa = (SelectorAttachment)key.attachment();
}
sa.register(exchange);
} catch (IOException e) {
--- 242,252 ----
try {
c.configureBlocking(false);
SelectionKey key = c.keyFor(selector);
SelectorAttachment sa;
if (key == null) {
! sa = new SelectorAttachment(c, selector);
} else {
sa = (SelectorAttachment)key.attachment();
}
sa.register(exchange);
} catch (IOException e) {
*** 272,284 ****
Set<SelectionKey> keys = selector.selectedKeys();
for (SelectionKey key : keys) {
SelectorAttachment sa = (SelectorAttachment)key.attachment();
int eventsOccurred = key.readyOps();
! for (AsyncEvent ev : sa.events(eventsOccurred)) {
! readyList.add(ev);
! }
sa.resetInterestOps(eventsOccurred);
}
selector.selectNow(); // complete cancellation
selector.selectedKeys().clear();
--- 268,278 ----
Set<SelectionKey> keys = selector.selectedKeys();
for (SelectionKey key : keys) {
SelectorAttachment sa = (SelectorAttachment)key.attachment();
int eventsOccurred = key.readyOps();
! sa.events(eventsOccurred).forEach(readyList::add);
sa.resetInterestOps(eventsOccurred);
}
selector.selectNow(); // complete cancellation
selector.selectedKeys().clear();
*** 308,342 ****
e.abort();
} else {
e.handle();
}
}
/**
! * Tracks multiple user level registrations associated with one
! * NIO registration (SelectionKey). In this implementation,
! * registrations are one-off and when an event is posted
! * the registration is cancelled until explicitly registered
! * again.
! * no external synchronization required as this class is only used
! * by the SelectorManager thread. One of these objects
! * required per connection.
*/
! private class SelectorAttachment {
! int interestops;
! final SelectableChannel chan;
! final LinkedList<AsyncEvent> pending;
! SelectorAttachment(SelectableChannel chan) {
! this.pending = new LinkedList<>();
this.chan = chan;
}
void register(AsyncEvent e) throws ClosedChannelException {
int newops = e.interestOps();
boolean reRegister = (interestops & newops) != newops;
! this.interestops |= newops;
pending.add(e);
if (reRegister) {
// first time registration happens here also
chan.register(selector, interestops, this);
}
--- 302,339 ----
e.abort();
} else {
e.handle();
}
}
+ }
/**
! * Tracks multiple user level registrations associated with one NIO
! * registration (SelectionKey). In this implementation, registrations
! * are one-off and when an event is posted the registration is cancelled
! * until explicitly registered again.
! *
! * <p> No external synchronization required as this class is only used
! * by the SelectorManager thread. One of these objects required per
! * connection.
*/
! private static class SelectorAttachment {
! private final SelectableChannel chan;
! private final Selector selector;
! private final ArrayList<AsyncEvent> pending;
! private int interestops;
! SelectorAttachment(SelectableChannel chan, Selector selector) {
! this.pending = new ArrayList<>();
this.chan = chan;
+ this.selector = selector;
}
void register(AsyncEvent e) throws ClosedChannelException {
int newops = e.interestOps();
boolean reRegister = (interestops & newops) != newops;
! interestops |= newops;
pending.add(e);
if (reRegister) {
// first time registration happens here also
chan.register(selector, interestops, this);
}
*** 345,399 ****
int interestOps() {
return interestops;
}
/**
! * Returns Iterator<AsyncEvents> containing only events that are
! * registered with the given interestops
! *
! * @param type
! * @return
*/
! Iterable<AsyncEvent> events(int interestop) {
! return new Iterable<AsyncEvent>() {
! @Override
! public Iterator<AsyncEvent> iterator() {
return pending.stream()
! .filter(ev -> (ev.interestOps() & interestop) != 0)
! .iterator();
! }
! };
}
/**
! * Remove any events with the given interestop, and if no events
! * remaining, cancel the associated SelectionKey.
! *
! * @param interestops
*/
void resetInterestOps(int interestop) {
- int size = pending.size();
int newops = 0;
! for (int i=0; i<size; i++) {
! AsyncEvent ev = pending.get(i);
! int evops = ev.interestOps();
if ((evops & interestop) != 0) {
! pending.remove(i);
} else {
newops |= evops;
}
}
! this.interestops = newops;
SelectionKey key = chan.keyFor(selector);
if (newops == 0) {
key.cancel();
} else {
key.interestOps(newops);
}
}
}
- }
/**
* Creates a HttpRequest associated with this group.
*
* @throws IllegalStateException if the group has been stopped
--- 342,386 ----
int interestOps() {
return interestops;
}
/**
! * Returns an Iterator<AsyncEvents> containing only events that are
! * registered with the given {@code interestop}.
*/
! Stream<AsyncEvent> events(int interestop) {
return pending.stream()
! .filter(ev -> (ev.interestOps() & interestop) != 0);
}
/**
! * Removes any events with the given {@code interestop}, and if no
! * events remaining, cancels the associated SelectionKey.
*/
void resetInterestOps(int interestop) {
int newops = 0;
!
! Iterator<AsyncEvent> itr = pending.iterator();
! while (itr.hasNext()) {
! AsyncEvent event = itr.next();
! int evops = event.interestOps();
if ((evops & interestop) != 0) {
! itr.remove();
} else {
newops |= evops;
}
}
!
! interestops = newops;
SelectionKey key = chan.keyFor(selector);
if (newops == 0) {
key.cancel();
} else {
key.interestOps(newops);
}
}
}
/**
* Creates a HttpRequest associated with this group.
*
* @throws IllegalStateException if the group has been stopped
< prev index next >