/* * Copyright (c) 2001, 2018, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License version 2 only, as * published by the Free Software Foundation. Oracle designates this * particular file as subject to the "Classpath" exception as provided * by Oracle in the LICENSE file that accompanied this code. * * This code is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License * version 2 for more details (a copy is included in the LICENSE file that * accompanied this code). * * You should have received a copy of the GNU General Public License version * 2 along with this work; if not, write to the Free Software Foundation, * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. * * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA * or visit www.oracle.com if you need additional information or have any * questions. */ package sun.nio.ch; import java.io.IOException; import java.nio.channels.ClosedSelectorException; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.spi.SelectorProvider; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Deque; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import jdk.internal.misc.Unsafe; /** * Selector implementation based on poll */ class PollSelectorImpl extends SelectorImpl { // initial capacity of poll array private static final int INITIAL_CAPACITY = 16; // poll array, grows as needed private int pollArrayCapacity = INITIAL_CAPACITY; private int pollArraySize; private AllocatedNativeObject pollArray; // file descriptors used for interrupt private final int fd0; private final int fd1; // keys for file descriptors in poll array, synchronize on selector private final List pollKeys = new ArrayList<>(); // pending updates, queued by putEventOps private final Object updateLock = new Object(); private final Deque updateKeys = new ArrayDeque<>(); // interrupt triggering and clearing private final Object interruptLock = new Object(); private boolean interruptTriggered; PollSelectorImpl(SelectorProvider sp) throws IOException { super(sp); int size = pollArrayCapacity * SIZE_POLLFD; this.pollArray = new AllocatedNativeObject(size, false); try { long fds = IOUtil.makePipe(false); this.fd0 = (int) (fds >>> 32); this.fd1 = (int) fds; } catch (IOException ioe) { pollArray.free(); throw ioe; } // wakeup support synchronized (this) { setFirst(fd0, Net.POLLIN); } } private void ensureOpen() { if (!isOpen()) throw new ClosedSelectorException(); } @Override protected int doSelect(Consumer action, long timeout) throws IOException { assert Thread.holdsLock(this); int to = (int) Math.min(timeout, Integer.MAX_VALUE); // max poll timeout boolean blocking = (to != 0); boolean timedPoll = (to > 0); processUpdateQueue(); processDeregisterQueue(); try { begin(blocking); int numPolled; do { long startTime = timedPoll ? System.nanoTime() : 0; numPolled = poll(pollArray.address(), pollArraySize, to); if (numPolled == IOStatus.INTERRUPTED && timedPoll) { // timed poll interrupted so need to adjust timeout long adjust = System.nanoTime() - startTime; to -= TimeUnit.MILLISECONDS.convert(adjust, TimeUnit.NANOSECONDS); if (to <= 0) { // timeout expired so no retry numPolled = 0; } } } while (numPolled == IOStatus.INTERRUPTED); assert numPolled <= pollArraySize; } finally { end(blocking); } processDeregisterQueue(); return processEvents(action); } /** * Process changes to the interest ops. */ private void processUpdateQueue() { assert Thread.holdsLock(this); synchronized (updateLock) { SelectionKeyImpl ski; while ((ski = updateKeys.pollFirst()) != null) { int newEvents = ski.translateInterestOps(); if (ski.isValid()) { int index = ski.getIndex(); assert index >= 0 && index < pollArraySize; if (index > 0) { assert pollKeys.get(index) == ski; if (newEvents == 0) { remove(ski); } else { update(ski, newEvents); } } else if (newEvents != 0) { add(ski, newEvents); } } } } } /** * Process the polled events. * If the interrupt fd has been selected, drain it and clear the interrupt. */ private int processEvents(Consumer action) throws IOException { assert Thread.holdsLock(this); assert pollArraySize > 0 && pollArraySize == pollKeys.size(); int numKeysUpdated = 0; for (int i = 1; i < pollArraySize; i++) { int rOps = getReventOps(i); if (rOps != 0) { SelectionKeyImpl ski = pollKeys.get(i); assert ski.getFDVal() == getDescriptor(i); if (ski.isValid()) { numKeysUpdated += processReadyEvents(rOps, ski, action); } } } // check for interrupt if (getReventOps(0) != 0) { assert getDescriptor(0) == fd0; clearInterrupt(); } return numKeysUpdated; } @Override protected void implClose() throws IOException { assert !isOpen(); assert Thread.holdsLock(this); // prevent further wakeup synchronized (interruptLock) { interruptTriggered = true; } pollArray.free(); FileDispatcherImpl.closeIntFD(fd0); FileDispatcherImpl.closeIntFD(fd1); } @Override protected void implRegister(SelectionKeyImpl ski) { assert ski.getIndex() == 0; ensureOpen(); } @Override protected void implDereg(SelectionKeyImpl ski) throws IOException { assert !ski.isValid(); assert Thread.holdsLock(this); // remove from poll array int index = ski.getIndex(); if (index > 0) { remove(ski); } } @Override public void setEventOps(SelectionKeyImpl ski) { ensureOpen(); synchronized (updateLock) { updateKeys.addLast(ski); } } @Override public Selector wakeup() { synchronized (interruptLock) { if (!interruptTriggered) { try { IOUtil.write1(fd1, (byte)0); } catch (IOException ioe) { throw new InternalError(ioe); } interruptTriggered = true; } } return this; } private void clearInterrupt() throws IOException { synchronized (interruptLock) { IOUtil.drain(fd0); interruptTriggered = false; } } /** * Sets the first pollfd enty in the poll array to the given fd */ private void setFirst(int fd, int ops) { assert pollArraySize == 0; assert pollKeys.isEmpty(); putDescriptor(0, fd); putEventOps(0, ops); pollArraySize = 1; pollKeys.add(null); // dummy element } /** * Adds a pollfd entry to the poll array, expanding the poll array if needed. */ private void add(SelectionKeyImpl ski, int ops) { expandIfNeeded(); int index = pollArraySize; assert index > 0; putDescriptor(index, ski.getFDVal()); putEventOps(index, ops); putReventOps(index, 0); ski.setIndex(index); pollArraySize++; pollKeys.add(ski); assert pollKeys.size() == pollArraySize; } /** * Update the events of pollfd entry. */ private void update(SelectionKeyImpl ski, int ops) { int index = ski.getIndex(); assert index > 0 && index < pollArraySize; assert getDescriptor(index) == ski.getFDVal(); putEventOps(index, ops); } /** * Removes a pollfd entry from the poll array */ private void remove(SelectionKeyImpl ski) { int index = ski.getIndex(); assert index > 0 && index < pollArraySize; assert getDescriptor(index) == ski.getFDVal(); // replace pollfd at index with the last pollfd in array int lastIndex = pollArraySize - 1; if (lastIndex != index) { SelectionKeyImpl lastKey = pollKeys.get(lastIndex); assert lastKey.getIndex() == lastIndex; int lastFd = getDescriptor(lastIndex); int lastOps = getEventOps(lastIndex); int lastRevents = getReventOps(lastIndex); assert lastKey.getFDVal() == lastFd; putDescriptor(index, lastFd); putEventOps(index, lastOps); putReventOps(index, lastRevents); pollKeys.set(index, lastKey); lastKey.setIndex(index); } pollKeys.remove(lastIndex); pollArraySize--; assert pollKeys.size() == pollArraySize; ski.setIndex(0); } /** * Expand poll array if at capacity */ private void expandIfNeeded() { if (pollArraySize == pollArrayCapacity) { int oldSize = pollArrayCapacity * SIZE_POLLFD; int newCapacity = pollArrayCapacity + INITIAL_CAPACITY; int newSize = newCapacity * SIZE_POLLFD; AllocatedNativeObject newPollArray = new AllocatedNativeObject(newSize, false); Unsafe.getUnsafe().copyMemory(pollArray.address(), newPollArray.address(), oldSize); pollArray.free(); pollArray = newPollArray; pollArrayCapacity = newCapacity; } } private static final short SIZE_POLLFD = 8; private static final short FD_OFFSET = 0; private static final short EVENT_OFFSET = 4; private static final short REVENT_OFFSET = 6; private void putDescriptor(int i, int fd) { int offset = SIZE_POLLFD * i + FD_OFFSET; pollArray.putInt(offset, fd); } private int getDescriptor(int i) { int offset = SIZE_POLLFD * i + FD_OFFSET; return pollArray.getInt(offset); } private void putEventOps(int i, int event) { int offset = SIZE_POLLFD * i + EVENT_OFFSET; pollArray.putShort(offset, (short)event); } private int getEventOps(int i) { int offset = SIZE_POLLFD * i + EVENT_OFFSET; return pollArray.getShort(offset); } private void putReventOps(int i, int revent) { int offset = SIZE_POLLFD * i + REVENT_OFFSET; pollArray.putShort(offset, (short)revent); } private int getReventOps(int i) { int offset = SIZE_POLLFD * i + REVENT_OFFSET; return pollArray.getShort(offset); } private static native int poll(long pollAddress, int numfds, int timeout); static { IOUtil.load(); } }