< prev index next >

src/java.httpclient/share/classes/java/net/http/HttpClientImpl.java

Print this page

        

*** 35,52 **** import java.nio.channels.SelectionKey; import static java.nio.channels.SelectionKey.OP_CONNECT; import static java.nio.channels.SelectionKey.OP_READ; import static java.nio.channels.SelectionKey.OP_WRITE; import java.nio.channels.Selector; ! import java.util.LinkedList; ! import java.util.List; ! import java.util.Set; import java.util.concurrent.ExecutorService; import java.security.NoSuchAlgorithmException; - import java.util.Iterator; - import java.util.ListIterator; - import java.util.Optional; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLParameters; --- 35,48 ---- import java.nio.channels.SelectionKey; import static java.nio.channels.SelectionKey.OP_CONNECT; import static java.nio.channels.SelectionKey.OP_READ; import static java.nio.channels.SelectionKey.OP_WRITE; import java.nio.channels.Selector; ! import java.util.*; ! import java.util.stream.Stream; import java.util.concurrent.ExecutorService; import java.security.NoSuchAlgorithmException; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLParameters;
*** 246,256 **** try { c.configureBlocking(false); SelectionKey key = c.keyFor(selector); SelectorAttachment sa; if (key == null) { ! sa = new SelectorAttachment(c); } else { sa = (SelectorAttachment)key.attachment(); } sa.register(exchange); } catch (IOException e) { --- 242,252 ---- try { c.configureBlocking(false); SelectionKey key = c.keyFor(selector); SelectorAttachment sa; if (key == null) { ! sa = new SelectorAttachment(c, selector); } else { sa = (SelectorAttachment)key.attachment(); } sa.register(exchange); } catch (IOException e) {
*** 272,284 **** Set<SelectionKey> keys = selector.selectedKeys(); for (SelectionKey key : keys) { SelectorAttachment sa = (SelectorAttachment)key.attachment(); int eventsOccurred = key.readyOps(); ! for (AsyncEvent ev : sa.events(eventsOccurred)) { ! readyList.add(ev); ! } sa.resetInterestOps(eventsOccurred); } selector.selectNow(); // complete cancellation selector.selectedKeys().clear(); --- 268,278 ---- Set<SelectionKey> keys = selector.selectedKeys(); for (SelectionKey key : keys) { SelectorAttachment sa = (SelectorAttachment)key.attachment(); int eventsOccurred = key.readyOps(); ! sa.events(eventsOccurred).forEach(readyList::add); sa.resetInterestOps(eventsOccurred); } selector.selectNow(); // complete cancellation selector.selectedKeys().clear();
*** 308,342 **** e.abort(); } else { e.handle(); } } /** ! * Tracks multiple user level registrations associated with one ! * NIO registration (SelectionKey). In this implementation, ! * registrations are one-off and when an event is posted ! * the registration is cancelled until explicitly registered ! * again. ! * no external synchronization required as this class is only used ! * by the SelectorManager thread. One of these objects ! * required per connection. */ ! private class SelectorAttachment { ! int interestops; ! final SelectableChannel chan; ! final LinkedList<AsyncEvent> pending; ! SelectorAttachment(SelectableChannel chan) { ! this.pending = new LinkedList<>(); this.chan = chan; } void register(AsyncEvent e) throws ClosedChannelException { int newops = e.interestOps(); boolean reRegister = (interestops & newops) != newops; ! this.interestops |= newops; pending.add(e); if (reRegister) { // first time registration happens here also chan.register(selector, interestops, this); } --- 302,339 ---- e.abort(); } else { e.handle(); } } + } /** ! * Tracks multiple user level registrations associated with one NIO ! * registration (SelectionKey). In this implementation, registrations ! * are one-off and when an event is posted the registration is cancelled ! * until explicitly registered again. ! * ! * <p> No external synchronization required as this class is only used ! * by the SelectorManager thread. One of these objects required per ! * connection. */ ! private static class SelectorAttachment { ! private final SelectableChannel chan; ! private final Selector selector; ! private final ArrayList<AsyncEvent> pending; ! private int interestops; ! SelectorAttachment(SelectableChannel chan, Selector selector) { ! this.pending = new ArrayList<>(); this.chan = chan; + this.selector = selector; } void register(AsyncEvent e) throws ClosedChannelException { int newops = e.interestOps(); boolean reRegister = (interestops & newops) != newops; ! interestops |= newops; pending.add(e); if (reRegister) { // first time registration happens here also chan.register(selector, interestops, this); }
*** 345,399 **** int interestOps() { return interestops; } /** ! * Returns Iterator<AsyncEvents> containing only events that are ! * registered with the given interestops ! * ! * @param type ! * @return */ ! Iterable<AsyncEvent> events(int interestop) { ! return new Iterable<AsyncEvent>() { ! @Override ! public Iterator<AsyncEvent> iterator() { return pending.stream() ! .filter(ev -> (ev.interestOps() & interestop) != 0) ! .iterator(); ! } ! }; } /** ! * Remove any events with the given interestop, and if no events ! * remaining, cancel the associated SelectionKey. ! * ! * @param interestops */ void resetInterestOps(int interestop) { - int size = pending.size(); int newops = 0; ! for (int i=0; i<size; i++) { ! AsyncEvent ev = pending.get(i); ! int evops = ev.interestOps(); if ((evops & interestop) != 0) { ! pending.remove(i); } else { newops |= evops; } } ! this.interestops = newops; SelectionKey key = chan.keyFor(selector); if (newops == 0) { key.cancel(); } else { key.interestOps(newops); } } } - } /** * Creates a HttpRequest associated with this group. * * @throws IllegalStateException if the group has been stopped --- 342,386 ---- int interestOps() { return interestops; } /** ! * Returns an Iterator<AsyncEvents> containing only events that are ! * registered with the given {@code interestop}. */ ! Stream<AsyncEvent> events(int interestop) { return pending.stream() ! .filter(ev -> (ev.interestOps() & interestop) != 0); } /** ! * Removes any events with the given {@code interestop}, and if no ! * events remaining, cancels the associated SelectionKey. */ void resetInterestOps(int interestop) { int newops = 0; ! ! Iterator<AsyncEvent> itr = pending.iterator(); ! while (itr.hasNext()) { ! AsyncEvent event = itr.next(); ! int evops = event.interestOps(); if ((evops & interestop) != 0) { ! itr.remove(); } else { newops |= evops; } } ! ! interestops = newops; SelectionKey key = chan.keyFor(selector); if (newops == 0) { key.cancel(); } else { key.interestOps(newops); } } } /** * Creates a HttpRequest associated with this group. * * @throws IllegalStateException if the group has been stopped
< prev index next >