< prev index next >

src/java.base/macosx/classes/sun/nio/ch/KQueuePort.java

Print this page
rev 49271 : [mq]: selector-cleanup

*** 1,7 **** /* ! * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License version 2 only, as * published by the Free Software Foundation. Oracle designates this --- 1,7 ---- /* ! * Copyright (c) 2012, 2018, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License version 2 only, as * published by the Free Software Foundation. Oracle designates this
*** 28,38 **** import java.nio.channels.spi.AsynchronousChannelProvider; import java.io.IOException; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicInteger; ! import static sun.nio.ch.KQueue.*; /** * AsynchronousChannelGroup implementation based on the BSD kqueue facility. */ --- 28,42 ---- import java.nio.channels.spi.AsynchronousChannelProvider; import java.io.IOException; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicInteger; ! ! import static sun.nio.ch.KQueue.EVFILT_READ; ! import static sun.nio.ch.KQueue.EVFILT_WRITE; ! import static sun.nio.ch.KQueue.EV_ADD; ! import static sun.nio.ch.KQueue.EV_ONESHOT; /** * AsynchronousChannelGroup implementation based on the BSD kqueue facility. */
*** 43,64 **** private static final int MAX_KEVENTS_TO_POLL = 512; // kqueue file descriptor private final int kqfd; // true if kqueue closed private boolean closed; // socket pair used for wakeup private final int sp[]; // number of wakeups pending private final AtomicInteger wakeupCount = new AtomicInteger(); - // address of the poll array passed to kqueue_wait - private final long address; - // encapsulates an event for a channel static class Event { final PollableChannel channel; final int events; --- 47,68 ---- private static final int MAX_KEVENTS_TO_POLL = 512; // kqueue file descriptor private final int kqfd; + // address of the poll array passed to kqueue_wait + private final long address; + // true if kqueue closed private boolean closed; // socket pair used for wakeup private final int sp[]; // number of wakeups pending private final AtomicInteger wakeupCount = new AtomicInteger(); // encapsulates an event for a channel static class Event { final PollableChannel channel; final int events;
*** 80,111 **** KQueuePort(AsynchronousChannelProvider provider, ThreadPool pool) throws IOException { super(provider, pool); ! // open kqueue ! this.kqfd = kqueue(); // create socket pair for wakeup mechanism - int[] sv = new int[2]; try { ! socketpair(sv); ! ! // register one end with kqueue ! keventRegister(kqfd, sv[0], EVFILT_READ, EV_ADD); ! } catch (IOException x) { ! close0(kqfd); ! throw x; } - this.sp = sv; ! // allocate the poll array ! this.address = allocatePollArray(MAX_KEVENTS_TO_POLL); // create the queue and offer the special event to ensure that the first // threads polls ! this.queue = new ArrayBlockingQueue<Event>(MAX_KEVENTS_TO_POLL); this.queue.offer(NEED_TO_POLL); } KQueuePort start() { startThreads(new EventHandlerTask()); --- 84,112 ---- KQueuePort(AsynchronousChannelProvider provider, ThreadPool pool) throws IOException { super(provider, pool); ! this.kqfd = KQueue.create(); ! this.address = KQueue.allocatePollArray(MAX_KEVENTS_TO_POLL); // create socket pair for wakeup mechanism try { ! long fds = IOUtil.makePipe(true); ! this.sp = new int[]{(int) (fds >>> 32), (int) fds}; ! } catch (IOException ioe) { ! KQueue.freePollArray(address); ! FileDispatcherImpl.closeIntFD(kqfd); ! throw ioe; } ! // register one end with kqueue ! KQueue.register(kqfd, sp[0], EVFILT_READ, EV_ADD); // create the queue and offer the special event to ensure that the first // threads polls ! this.queue = new ArrayBlockingQueue<>(MAX_KEVENTS_TO_POLL); this.queue.offer(NEED_TO_POLL); } KQueuePort start() { startThreads(new EventHandlerTask());
*** 119,139 **** synchronized (this) { if (closed) return; closed = true; } ! freePollArray(address); ! close0(sp[0]); ! close0(sp[1]); ! close0(kqfd); } private void wakeup() { if (wakeupCount.incrementAndGet() == 1) { // write byte to socketpair to force wakeup try { ! interrupt(sp[1]); } catch (IOException x) { throw new AssertionError(x); } } } --- 120,141 ---- synchronized (this) { if (closed) return; closed = true; } ! ! try { FileDispatcherImpl.closeIntFD(kqfd); } catch (IOException ioe) { } ! try { FileDispatcherImpl.closeIntFD(sp[0]); } catch (IOException ioe) { } ! try { FileDispatcherImpl.closeIntFD(sp[1]); } catch (IOException ioe) { } ! KQueue.freePollArray(address); } private void wakeup() { if (wakeupCount.incrementAndGet() == 1) { // write byte to socketpair to force wakeup try { ! IOUtil.write1(sp[1], (byte)0); } catch (IOException x) { throw new AssertionError(x); } } }
*** 171,183 **** // We use a separate filter for read and write events. // TBD: Measure cost of EV_ONESHOT vs. EV_CLEAR, either will do here. int err = 0; int flags = (EV_ADD|EV_ONESHOT); if ((events & Net.POLLIN) > 0) ! err = keventRegister(kqfd, fd, EVFILT_READ, flags); if (err == 0 && (events & Net.POLLOUT) > 0) ! err = keventRegister(kqfd, fd, EVFILT_WRITE, flags); if (err != 0) throw new InternalError("kevent failed: " + err); // should not happen } /* --- 173,185 ---- // We use a separate filter for read and write events. // TBD: Measure cost of EV_ONESHOT vs. EV_CLEAR, either will do here. int err = 0; int flags = (EV_ADD|EV_ONESHOT); if ((events & Net.POLLIN) > 0) ! err = KQueue.register(kqfd, fd, EVFILT_READ, flags); if (err == 0 && (events & Net.POLLOUT) > 0) ! err = KQueue.register(kqfd, fd, EVFILT_WRITE, flags); if (err != 0) throw new InternalError("kevent failed: " + err); // should not happen } /*
*** 191,218 **** */ private class EventHandlerTask implements Runnable { private Event poll() throws IOException { try { for (;;) { ! int n = keventPoll(kqfd, address, MAX_KEVENTS_TO_POLL); /* * 'n' events have been read. Here we map them to their * corresponding channel in batch and queue n-1 so that * they can be handled by other handler threads. The last * event is handled by this thread (and so is not queued). */ fdToChannelLock.readLock().lock(); try { while (n-- > 0) { ! long keventAddress = getEvent(address, n); ! int fd = getDescriptor(keventAddress); // wakeup if (fd == sp[0]) { if (wakeupCount.decrementAndGet() == 0) { // no more wakeups so drain pipe ! drain1(sp[0]); } // queue special event if there are more events // to handle. if (n > 0) { --- 193,224 ---- */ private class EventHandlerTask implements Runnable { private Event poll() throws IOException { try { for (;;) { ! int n; ! do { ! n = KQueue.poll(kqfd, address, MAX_KEVENTS_TO_POLL, -1L); ! } while (n == IOStatus.INTERRUPTED); ! /* * 'n' events have been read. Here we map them to their * corresponding channel in batch and queue n-1 so that * they can be handled by other handler threads. The last * event is handled by this thread (and so is not queued). */ fdToChannelLock.readLock().lock(); try { while (n-- > 0) { ! long keventAddress = KQueue.getEvent(address, n); ! int fd = KQueue.getDescriptor(keventAddress); // wakeup if (fd == sp[0]) { if (wakeupCount.decrementAndGet() == 0) { // no more wakeups so drain pipe ! IOUtil.drain(sp[0]); } // queue special event if there are more events // to handle. if (n > 0) {
*** 222,232 **** return EXECUTE_TASK_OR_SHUTDOWN; } PollableChannel channel = fdToChannel.get(fd); if (channel != null) { ! int filter = getFilter(keventAddress); int events = 0; if (filter == EVFILT_READ) events = Net.POLLIN; else if (filter == EVFILT_WRITE) events = Net.POLLOUT; --- 228,238 ---- return EXECUTE_TASK_OR_SHUTDOWN; } PollableChannel channel = fdToChannel.get(fd); if (channel != null) { ! int filter = KQueue.getFilter(keventAddress); int events = 0; if (filter == EVFILT_READ) events = Net.POLLIN; else if (filter == EVFILT_WRITE) events = Net.POLLOUT;
*** 312,331 **** implClose(); } } } } - - // -- Native methods -- - - private static native void socketpair(int[] sv) throws IOException; - - private static native void interrupt(int fd) throws IOException; - - private static native void drain1(int fd) throws IOException; - - private static native void close0(int fd); - - static { - IOUtil.load(); - } } --- 318,323 ----
< prev index next >