< prev index next >
src/java.base/macosx/classes/sun/nio/ch/KQueuePort.java
Print this page
rev 49271 : [mq]: selector-cleanup
*** 1,7 ****
/*
! * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
--- 1,7 ----
/*
! * Copyright (c) 2012, 2018, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
*** 28,38 ****
import java.nio.channels.spi.AsynchronousChannelProvider;
import java.io.IOException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
! import static sun.nio.ch.KQueue.*;
/**
* AsynchronousChannelGroup implementation based on the BSD kqueue facility.
*/
--- 28,42 ----
import java.nio.channels.spi.AsynchronousChannelProvider;
import java.io.IOException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
!
! import static sun.nio.ch.KQueue.EVFILT_READ;
! import static sun.nio.ch.KQueue.EVFILT_WRITE;
! import static sun.nio.ch.KQueue.EV_ADD;
! import static sun.nio.ch.KQueue.EV_ONESHOT;
/**
* AsynchronousChannelGroup implementation based on the BSD kqueue facility.
*/
*** 43,64 ****
private static final int MAX_KEVENTS_TO_POLL = 512;
// kqueue file descriptor
private final int kqfd;
// true if kqueue closed
private boolean closed;
// socket pair used for wakeup
private final int sp[];
// number of wakeups pending
private final AtomicInteger wakeupCount = new AtomicInteger();
- // address of the poll array passed to kqueue_wait
- private final long address;
-
// encapsulates an event for a channel
static class Event {
final PollableChannel channel;
final int events;
--- 47,68 ----
private static final int MAX_KEVENTS_TO_POLL = 512;
// kqueue file descriptor
private final int kqfd;
+ // address of the poll array passed to kqueue_wait
+ private final long address;
+
// true if kqueue closed
private boolean closed;
// socket pair used for wakeup
private final int sp[];
// number of wakeups pending
private final AtomicInteger wakeupCount = new AtomicInteger();
// encapsulates an event for a channel
static class Event {
final PollableChannel channel;
final int events;
*** 80,111 ****
KQueuePort(AsynchronousChannelProvider provider, ThreadPool pool)
throws IOException
{
super(provider, pool);
! // open kqueue
! this.kqfd = kqueue();
// create socket pair for wakeup mechanism
- int[] sv = new int[2];
try {
! socketpair(sv);
!
! // register one end with kqueue
! keventRegister(kqfd, sv[0], EVFILT_READ, EV_ADD);
! } catch (IOException x) {
! close0(kqfd);
! throw x;
}
- this.sp = sv;
! // allocate the poll array
! this.address = allocatePollArray(MAX_KEVENTS_TO_POLL);
// create the queue and offer the special event to ensure that the first
// threads polls
! this.queue = new ArrayBlockingQueue<Event>(MAX_KEVENTS_TO_POLL);
this.queue.offer(NEED_TO_POLL);
}
KQueuePort start() {
startThreads(new EventHandlerTask());
--- 84,112 ----
KQueuePort(AsynchronousChannelProvider provider, ThreadPool pool)
throws IOException
{
super(provider, pool);
! this.kqfd = KQueue.create();
! this.address = KQueue.allocatePollArray(MAX_KEVENTS_TO_POLL);
// create socket pair for wakeup mechanism
try {
! long fds = IOUtil.makePipe(true);
! this.sp = new int[]{(int) (fds >>> 32), (int) fds};
! } catch (IOException ioe) {
! KQueue.freePollArray(address);
! FileDispatcherImpl.closeIntFD(kqfd);
! throw ioe;
}
! // register one end with kqueue
! KQueue.register(kqfd, sp[0], EVFILT_READ, EV_ADD);
// create the queue and offer the special event to ensure that the first
// threads polls
! this.queue = new ArrayBlockingQueue<>(MAX_KEVENTS_TO_POLL);
this.queue.offer(NEED_TO_POLL);
}
KQueuePort start() {
startThreads(new EventHandlerTask());
*** 119,139 ****
synchronized (this) {
if (closed)
return;
closed = true;
}
! freePollArray(address);
! close0(sp[0]);
! close0(sp[1]);
! close0(kqfd);
}
private void wakeup() {
if (wakeupCount.incrementAndGet() == 1) {
// write byte to socketpair to force wakeup
try {
! interrupt(sp[1]);
} catch (IOException x) {
throw new AssertionError(x);
}
}
}
--- 120,141 ----
synchronized (this) {
if (closed)
return;
closed = true;
}
!
! try { FileDispatcherImpl.closeIntFD(kqfd); } catch (IOException ioe) { }
! try { FileDispatcherImpl.closeIntFD(sp[0]); } catch (IOException ioe) { }
! try { FileDispatcherImpl.closeIntFD(sp[1]); } catch (IOException ioe) { }
! KQueue.freePollArray(address);
}
private void wakeup() {
if (wakeupCount.incrementAndGet() == 1) {
// write byte to socketpair to force wakeup
try {
! IOUtil.write1(sp[1], (byte)0);
} catch (IOException x) {
throw new AssertionError(x);
}
}
}
*** 171,183 ****
// We use a separate filter for read and write events.
// TBD: Measure cost of EV_ONESHOT vs. EV_CLEAR, either will do here.
int err = 0;
int flags = (EV_ADD|EV_ONESHOT);
if ((events & Net.POLLIN) > 0)
! err = keventRegister(kqfd, fd, EVFILT_READ, flags);
if (err == 0 && (events & Net.POLLOUT) > 0)
! err = keventRegister(kqfd, fd, EVFILT_WRITE, flags);
if (err != 0)
throw new InternalError("kevent failed: " + err); // should not happen
}
/*
--- 173,185 ----
// We use a separate filter for read and write events.
// TBD: Measure cost of EV_ONESHOT vs. EV_CLEAR, either will do here.
int err = 0;
int flags = (EV_ADD|EV_ONESHOT);
if ((events & Net.POLLIN) > 0)
! err = KQueue.register(kqfd, fd, EVFILT_READ, flags);
if (err == 0 && (events & Net.POLLOUT) > 0)
! err = KQueue.register(kqfd, fd, EVFILT_WRITE, flags);
if (err != 0)
throw new InternalError("kevent failed: " + err); // should not happen
}
/*
*** 191,218 ****
*/
private class EventHandlerTask implements Runnable {
private Event poll() throws IOException {
try {
for (;;) {
! int n = keventPoll(kqfd, address, MAX_KEVENTS_TO_POLL);
/*
* 'n' events have been read. Here we map them to their
* corresponding channel in batch and queue n-1 so that
* they can be handled by other handler threads. The last
* event is handled by this thread (and so is not queued).
*/
fdToChannelLock.readLock().lock();
try {
while (n-- > 0) {
! long keventAddress = getEvent(address, n);
! int fd = getDescriptor(keventAddress);
// wakeup
if (fd == sp[0]) {
if (wakeupCount.decrementAndGet() == 0) {
// no more wakeups so drain pipe
! drain1(sp[0]);
}
// queue special event if there are more events
// to handle.
if (n > 0) {
--- 193,224 ----
*/
private class EventHandlerTask implements Runnable {
private Event poll() throws IOException {
try {
for (;;) {
! int n;
! do {
! n = KQueue.poll(kqfd, address, MAX_KEVENTS_TO_POLL, -1L);
! } while (n == IOStatus.INTERRUPTED);
!
/*
* 'n' events have been read. Here we map them to their
* corresponding channel in batch and queue n-1 so that
* they can be handled by other handler threads. The last
* event is handled by this thread (and so is not queued).
*/
fdToChannelLock.readLock().lock();
try {
while (n-- > 0) {
! long keventAddress = KQueue.getEvent(address, n);
! int fd = KQueue.getDescriptor(keventAddress);
// wakeup
if (fd == sp[0]) {
if (wakeupCount.decrementAndGet() == 0) {
// no more wakeups so drain pipe
! IOUtil.drain(sp[0]);
}
// queue special event if there are more events
// to handle.
if (n > 0) {
*** 222,232 ****
return EXECUTE_TASK_OR_SHUTDOWN;
}
PollableChannel channel = fdToChannel.get(fd);
if (channel != null) {
! int filter = getFilter(keventAddress);
int events = 0;
if (filter == EVFILT_READ)
events = Net.POLLIN;
else if (filter == EVFILT_WRITE)
events = Net.POLLOUT;
--- 228,238 ----
return EXECUTE_TASK_OR_SHUTDOWN;
}
PollableChannel channel = fdToChannel.get(fd);
if (channel != null) {
! int filter = KQueue.getFilter(keventAddress);
int events = 0;
if (filter == EVFILT_READ)
events = Net.POLLIN;
else if (filter == EVFILT_WRITE)
events = Net.POLLOUT;
*** 312,331 ****
implClose();
}
}
}
}
-
- // -- Native methods --
-
- private static native void socketpair(int[] sv) throws IOException;
-
- private static native void interrupt(int fd) throws IOException;
-
- private static native void drain1(int fd) throws IOException;
-
- private static native void close0(int fd);
-
- static {
- IOUtil.load();
- }
}
--- 318,323 ----
< prev index next >