< prev index next >

src/java.base/macosx/classes/sun/nio/ch/KQueuePort.java

Print this page
rev 49271 : [mq]: selector-cleanup

@@ -1,7 +1,7 @@
 /*
- * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2012, 2018, Oracle and/or its affiliates. All rights reserved.
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  *
  * This code is free software; you can redistribute it and/or modify it
  * under the terms of the GNU General Public License version 2 only, as
  * published by the Free Software Foundation.  Oracle designates this

@@ -28,11 +28,15 @@
 import java.nio.channels.spi.AsynchronousChannelProvider;
 import java.io.IOException;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.atomic.AtomicInteger;
-import static sun.nio.ch.KQueue.*;
+
+import static sun.nio.ch.KQueue.EVFILT_READ;
+import static sun.nio.ch.KQueue.EVFILT_WRITE;
+import static sun.nio.ch.KQueue.EV_ADD;
+import static sun.nio.ch.KQueue.EV_ONESHOT;
 
 /**
  * AsynchronousChannelGroup implementation based on the BSD kqueue facility.
  */
 

@@ -43,22 +47,22 @@
     private static final int MAX_KEVENTS_TO_POLL = 512;
 
     // kqueue file descriptor
     private final int kqfd;
 
+    // address of the poll array passed to kqueue_wait
+    private final long address;
+
     // true if kqueue closed
     private boolean closed;
 
     // socket pair used for wakeup
     private final int sp[];
 
     // number of wakeups pending
     private final AtomicInteger wakeupCount = new AtomicInteger();
 
-    // address of the poll array passed to kqueue_wait
-    private final long address;
-
     // encapsulates an event for a channel
     static class Event {
         final PollableChannel channel;
         final int events;
 

@@ -80,32 +84,29 @@
     KQueuePort(AsynchronousChannelProvider provider, ThreadPool pool)
         throws IOException
     {
         super(provider, pool);
 
-        // open kqueue
-        this.kqfd = kqueue();
+        this.kqfd = KQueue.create();
+        this.address = KQueue.allocatePollArray(MAX_KEVENTS_TO_POLL);
 
         // create socket pair for wakeup mechanism
-        int[] sv = new int[2];
         try {
-            socketpair(sv);
-
-            // register one end with kqueue
-            keventRegister(kqfd, sv[0], EVFILT_READ, EV_ADD);
-        } catch (IOException x) {
-            close0(kqfd);
-            throw x;
+            long fds = IOUtil.makePipe(true);
+            this.sp = new int[]{(int) (fds >>> 32), (int) fds};
+        } catch (IOException ioe) {
+            KQueue.freePollArray(address);
+            FileDispatcherImpl.closeIntFD(kqfd);
+            throw ioe;
         }
-        this.sp = sv;
 
-        // allocate the poll array
-        this.address = allocatePollArray(MAX_KEVENTS_TO_POLL);
+        // register one end with kqueue
+        KQueue.register(kqfd, sp[0], EVFILT_READ, EV_ADD);
 
         // create the queue and offer the special event to ensure that the first
         // threads polls
-        this.queue = new ArrayBlockingQueue<Event>(MAX_KEVENTS_TO_POLL);
+        this.queue = new ArrayBlockingQueue<>(MAX_KEVENTS_TO_POLL);
         this.queue.offer(NEED_TO_POLL);
     }
 
     KQueuePort start() {
         startThreads(new EventHandlerTask());

@@ -119,21 +120,22 @@
         synchronized (this) {
             if (closed)
                 return;
             closed = true;
         }
-        freePollArray(address);
-        close0(sp[0]);
-        close0(sp[1]);
-        close0(kqfd);
+
+        try { FileDispatcherImpl.closeIntFD(kqfd); } catch (IOException ioe) { }
+        try { FileDispatcherImpl.closeIntFD(sp[0]); } catch (IOException ioe) { }
+        try { FileDispatcherImpl.closeIntFD(sp[1]); } catch (IOException ioe) { }
+        KQueue.freePollArray(address);
     }
 
     private void wakeup() {
         if (wakeupCount.incrementAndGet() == 1) {
             // write byte to socketpair to force wakeup
             try {
-                interrupt(sp[1]);
+                IOUtil.write1(sp[1], (byte)0);
             } catch (IOException x) {
                 throw new AssertionError(x);
             }
         }
     }

@@ -171,13 +173,13 @@
         // We use a separate filter for read and write events.
         // TBD: Measure cost of EV_ONESHOT vs. EV_CLEAR, either will do here.
         int err = 0;
         int flags = (EV_ADD|EV_ONESHOT);
         if ((events & Net.POLLIN) > 0)
-            err = keventRegister(kqfd, fd, EVFILT_READ, flags);
+            err = KQueue.register(kqfd, fd, EVFILT_READ, flags);
         if (err == 0 && (events & Net.POLLOUT) > 0)
-            err = keventRegister(kqfd, fd, EVFILT_WRITE, flags);
+            err = KQueue.register(kqfd, fd, EVFILT_WRITE, flags);
         if (err != 0)
             throw new InternalError("kevent failed: " + err);  // should not happen
     }
 
     /*

@@ -191,28 +193,32 @@
      */
     private class EventHandlerTask implements Runnable {
         private Event poll() throws IOException {
             try {
                 for (;;) {
-                    int n = keventPoll(kqfd, address, MAX_KEVENTS_TO_POLL);
+                    int n;
+                    do {
+                        n = KQueue.poll(kqfd, address, MAX_KEVENTS_TO_POLL, -1L);
+                    } while (n == IOStatus.INTERRUPTED);
+
                     /*
                      * 'n' events have been read. Here we map them to their
                      * corresponding channel in batch and queue n-1 so that
                      * they can be handled by other handler threads. The last
                      * event is handled by this thread (and so is not queued).
                      */
                     fdToChannelLock.readLock().lock();
                     try {
                         while (n-- > 0) {
-                            long keventAddress = getEvent(address, n);
-                            int fd = getDescriptor(keventAddress);
+                            long keventAddress = KQueue.getEvent(address, n);
+                            int fd = KQueue.getDescriptor(keventAddress);
 
                             // wakeup
                             if (fd == sp[0]) {
                                 if (wakeupCount.decrementAndGet() == 0) {
                                     // no more wakeups so drain pipe
-                                    drain1(sp[0]);
+                                    IOUtil.drain(sp[0]);
                                 }
 
                                 // queue special event if there are more events
                                 // to handle.
                                 if (n > 0) {

@@ -222,11 +228,11 @@
                                 return EXECUTE_TASK_OR_SHUTDOWN;
                             }
 
                             PollableChannel channel = fdToChannel.get(fd);
                             if (channel != null) {
-                                int filter = getFilter(keventAddress);
+                                int filter = KQueue.getFilter(keventAddress);
                                 int events = 0;
                                 if (filter == EVFILT_READ)
                                     events = Net.POLLIN;
                                 else if (filter == EVFILT_WRITE)
                                     events = Net.POLLOUT;

@@ -312,20 +318,6 @@
                     implClose();
                 }
             }
         }
     }
-
-    // -- Native methods --
-
-    private static native void socketpair(int[] sv) throws IOException;
-
-    private static native void interrupt(int fd) throws IOException;
-
-    private static native void drain1(int fd) throws IOException;
-
-    private static native void close0(int fd);
-
-    static {
-        IOUtil.load();
-    }
 }
< prev index next >