< prev index next >
src/java.base/macosx/classes/sun/nio/ch/KQueuePort.java
Print this page
rev 49271 : [mq]: selector-cleanup
@@ -1,7 +1,7 @@
/*
- * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2012, 2018, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
@@ -28,11 +28,15 @@
import java.nio.channels.spi.AsynchronousChannelProvider;
import java.io.IOException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
-import static sun.nio.ch.KQueue.*;
+
+import static sun.nio.ch.KQueue.EVFILT_READ;
+import static sun.nio.ch.KQueue.EVFILT_WRITE;
+import static sun.nio.ch.KQueue.EV_ADD;
+import static sun.nio.ch.KQueue.EV_ONESHOT;
/**
* AsynchronousChannelGroup implementation based on the BSD kqueue facility.
*/
@@ -43,22 +47,22 @@
private static final int MAX_KEVENTS_TO_POLL = 512;
// kqueue file descriptor
private final int kqfd;
+ // address of the poll array passed to kqueue_wait
+ private final long address;
+
// true if kqueue closed
private boolean closed;
// socket pair used for wakeup
private final int sp[];
// number of wakeups pending
private final AtomicInteger wakeupCount = new AtomicInteger();
- // address of the poll array passed to kqueue_wait
- private final long address;
-
// encapsulates an event for a channel
static class Event {
final PollableChannel channel;
final int events;
@@ -80,32 +84,29 @@
KQueuePort(AsynchronousChannelProvider provider, ThreadPool pool)
throws IOException
{
super(provider, pool);
- // open kqueue
- this.kqfd = kqueue();
+ this.kqfd = KQueue.create();
+ this.address = KQueue.allocatePollArray(MAX_KEVENTS_TO_POLL);
// create socket pair for wakeup mechanism
- int[] sv = new int[2];
try {
- socketpair(sv);
-
- // register one end with kqueue
- keventRegister(kqfd, sv[0], EVFILT_READ, EV_ADD);
- } catch (IOException x) {
- close0(kqfd);
- throw x;
+ long fds = IOUtil.makePipe(true);
+ this.sp = new int[]{(int) (fds >>> 32), (int) fds};
+ } catch (IOException ioe) {
+ KQueue.freePollArray(address);
+ FileDispatcherImpl.closeIntFD(kqfd);
+ throw ioe;
}
- this.sp = sv;
- // allocate the poll array
- this.address = allocatePollArray(MAX_KEVENTS_TO_POLL);
+ // register one end with kqueue
+ KQueue.register(kqfd, sp[0], EVFILT_READ, EV_ADD);
// create the queue and offer the special event to ensure that the first
// threads polls
- this.queue = new ArrayBlockingQueue<Event>(MAX_KEVENTS_TO_POLL);
+ this.queue = new ArrayBlockingQueue<>(MAX_KEVENTS_TO_POLL);
this.queue.offer(NEED_TO_POLL);
}
KQueuePort start() {
startThreads(new EventHandlerTask());
@@ -119,21 +120,22 @@
synchronized (this) {
if (closed)
return;
closed = true;
}
- freePollArray(address);
- close0(sp[0]);
- close0(sp[1]);
- close0(kqfd);
+
+ try { FileDispatcherImpl.closeIntFD(kqfd); } catch (IOException ioe) { }
+ try { FileDispatcherImpl.closeIntFD(sp[0]); } catch (IOException ioe) { }
+ try { FileDispatcherImpl.closeIntFD(sp[1]); } catch (IOException ioe) { }
+ KQueue.freePollArray(address);
}
private void wakeup() {
if (wakeupCount.incrementAndGet() == 1) {
// write byte to socketpair to force wakeup
try {
- interrupt(sp[1]);
+ IOUtil.write1(sp[1], (byte)0);
} catch (IOException x) {
throw new AssertionError(x);
}
}
}
@@ -171,13 +173,13 @@
// We use a separate filter for read and write events.
// TBD: Measure cost of EV_ONESHOT vs. EV_CLEAR, either will do here.
int err = 0;
int flags = (EV_ADD|EV_ONESHOT);
if ((events & Net.POLLIN) > 0)
- err = keventRegister(kqfd, fd, EVFILT_READ, flags);
+ err = KQueue.register(kqfd, fd, EVFILT_READ, flags);
if (err == 0 && (events & Net.POLLOUT) > 0)
- err = keventRegister(kqfd, fd, EVFILT_WRITE, flags);
+ err = KQueue.register(kqfd, fd, EVFILT_WRITE, flags);
if (err != 0)
throw new InternalError("kevent failed: " + err); // should not happen
}
/*
@@ -191,28 +193,32 @@
*/
private class EventHandlerTask implements Runnable {
private Event poll() throws IOException {
try {
for (;;) {
- int n = keventPoll(kqfd, address, MAX_KEVENTS_TO_POLL);
+ int n;
+ do {
+ n = KQueue.poll(kqfd, address, MAX_KEVENTS_TO_POLL, -1L);
+ } while (n == IOStatus.INTERRUPTED);
+
/*
* 'n' events have been read. Here we map them to their
* corresponding channel in batch and queue n-1 so that
* they can be handled by other handler threads. The last
* event is handled by this thread (and so is not queued).
*/
fdToChannelLock.readLock().lock();
try {
while (n-- > 0) {
- long keventAddress = getEvent(address, n);
- int fd = getDescriptor(keventAddress);
+ long keventAddress = KQueue.getEvent(address, n);
+ int fd = KQueue.getDescriptor(keventAddress);
// wakeup
if (fd == sp[0]) {
if (wakeupCount.decrementAndGet() == 0) {
// no more wakeups so drain pipe
- drain1(sp[0]);
+ IOUtil.drain(sp[0]);
}
// queue special event if there are more events
// to handle.
if (n > 0) {
@@ -222,11 +228,11 @@
return EXECUTE_TASK_OR_SHUTDOWN;
}
PollableChannel channel = fdToChannel.get(fd);
if (channel != null) {
- int filter = getFilter(keventAddress);
+ int filter = KQueue.getFilter(keventAddress);
int events = 0;
if (filter == EVFILT_READ)
events = Net.POLLIN;
else if (filter == EVFILT_WRITE)
events = Net.POLLOUT;
@@ -312,20 +318,6 @@
implClose();
}
}
}
}
-
- // -- Native methods --
-
- private static native void socketpair(int[] sv) throws IOException;
-
- private static native void interrupt(int fd) throws IOException;
-
- private static native void drain1(int fd) throws IOException;
-
- private static native void close0(int fd);
-
- static {
- IOUtil.load();
- }
}
< prev index next >