1 /*
   2  * Copyright (c) 2012, 2018, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package sun.nio.ch;
  27 
  28 import java.io.IOException;
  29 import java.nio.channels.ClosedSelectorException;
  30 import java.nio.channels.SelectionKey;
  31 import java.nio.channels.Selector;
  32 import java.nio.channels.spi.SelectorProvider;
  33 import java.util.ArrayDeque;
  34 import java.util.Deque;
  35 import java.util.HashMap;
  36 import java.util.Map;
  37 import java.util.concurrent.TimeUnit;
  38 import java.util.function.Consumer;
  39 
  40 import static sun.nio.ch.SolarisEventPort.PORT_SOURCE_FD;
  41 import static sun.nio.ch.SolarisEventPort.PORT_SOURCE_USER;
  42 import static sun.nio.ch.SolarisEventPort.SIZEOF_PORT_EVENT;
  43 import static sun.nio.ch.SolarisEventPort.OFFSETOF_EVENTS;
  44 import static sun.nio.ch.SolarisEventPort.OFFSETOF_SOURCE;
  45 import static sun.nio.ch.SolarisEventPort.OFFSETOF_OBJECT;
  46 import static sun.nio.ch.SolarisEventPort.port_create;
  47 import static sun.nio.ch.SolarisEventPort.port_close;
  48 import static sun.nio.ch.SolarisEventPort.port_associate;
  49 import static sun.nio.ch.SolarisEventPort.port_dissociate;
  50 import static sun.nio.ch.SolarisEventPort.port_getn;
  51 import static sun.nio.ch.SolarisEventPort.port_send;
  52 
  53 /**
  54  * Selector implementation based on the Solaris event port mechanism.
  55  */
  56 
  57 class EventPortSelectorImpl
  58     extends SelectorImpl
  59 {
  60     // maximum number of events to retrive in one call to port_getn
  61     static final int MAX_EVENTS = Math.min(IOUtil.fdLimit()-1, 1024);
  62 
  63     // port file descriptor
  64     private final int pfd;
  65 
  66     // the poll array (populated by port_getn)
  67     private final long pollArrayAddress;
  68     private final AllocatedNativeObject pollArray;
  69 
  70     // maps file descriptor to selection key, synchronize on selector
  71     private final Map<Integer, SelectionKeyImpl> fdToKey = new HashMap<>();
  72 
  73     // the last update operation, incremented by processUpdateQueue
  74     private int lastUpdate;
  75 
  76     // pending new registrations/updates, queued by setEventOps and
  77     // updateSelectedKeys
  78     private final Object updateLock = new Object();
  79     private final Deque<SelectionKeyImpl> updateKeys = new ArrayDeque<>();
  80 
  81     // interrupt triggering and clearing
  82     private final Object interruptLock = new Object();
  83     private boolean interruptTriggered;
  84 
  85     EventPortSelectorImpl(SelectorProvider sp) throws IOException {
  86         super(sp);
  87 
  88         this.pfd = port_create();
  89 
  90         int allocationSize = MAX_EVENTS * SIZEOF_PORT_EVENT;
  91         this.pollArray = new AllocatedNativeObject(allocationSize, false);
  92         this.pollArrayAddress = pollArray.address();
  93     }
  94 
  95     private void ensureOpen() {
  96         if (!isOpen())
  97             throw new ClosedSelectorException();
  98     }
  99 
 100     @Override
 101     protected int doSelect(Consumer<SelectionKey> action, long timeout)
 102         throws IOException
 103     {
 104         assert Thread.holdsLock(this);
 105 
 106         long to = timeout;
 107         boolean blocking = (to != 0);
 108         boolean timedPoll = (to > 0);
 109 
 110         int numEvents;
 111         processUpdateQueue();
 112         processDeregisterQueue();
 113         try {
 114             begin(blocking);
 115 
 116             do {
 117                 long startTime = timedPoll ? System.nanoTime() : 0;
 118                 numEvents = port_getn(pfd, pollArrayAddress, MAX_EVENTS, to);
 119                 if (numEvents == IOStatus.INTERRUPTED && timedPoll) {
 120                     // timed poll interrupted so need to adjust timeout
 121                     long adjust = System.nanoTime() - startTime;
 122                     to -= TimeUnit.MILLISECONDS.convert(adjust, TimeUnit.NANOSECONDS);
 123                     if (to <= 0) {
 124                         // timeout also expired so no retry
 125                         numEvents = 0;
 126                     }
 127                 }
 128             } while (numEvents == IOStatus.INTERRUPTED);
 129             assert IOStatus.check(numEvents);
 130 
 131         } finally {
 132             end(blocking);
 133         }
 134         processDeregisterQueue();
 135         return processPortEvents(numEvents, action);
 136     }
 137 
 138     /**
 139      * Process new registrations and changes to the interest ops.
 140      */
 141     private void processUpdateQueue() throws IOException {
 142         assert Thread.holdsLock(this);
 143 
 144         // bump lastUpdate to ensure that the interest ops are changed at most
 145         // once per bulk update
 146         lastUpdate++;
 147 
 148         synchronized (updateLock) {
 149             SelectionKeyImpl ski;
 150             while ((ski = updateKeys.pollFirst()) != null) {
 151                 if (ski.isValid()) {
 152                     int fd = ski.getFDVal();
 153                     // add to fdToKey if needed
 154                     SelectionKeyImpl previous = fdToKey.putIfAbsent(fd, ski);
 155                     assert (previous == null) || (previous == ski);
 156 
 157                     int newEvents = ski.translateInterestOps();
 158                     if (newEvents != ski.registeredEvents()) {
 159                         if (newEvents == 0) {
 160                             port_dissociate(pfd, PORT_SOURCE_FD, fd);
 161                         } else {
 162                             port_associate(pfd, PORT_SOURCE_FD, fd, newEvents);
 163                         }
 164                         ski.registeredEvents(newEvents);
 165                     }
 166                 }
 167             }
 168         }
 169     }
 170 
 171     /**
 172      * Process the polled events and re-queue the selected keys so the file
 173      * descriptors are re-associated at the next select operation.
 174      */
 175     private int processPortEvents(int numEvents, Consumer<SelectionKey> action)
 176         throws IOException
 177     {
 178         assert Thread.holdsLock(this);
 179 
 180         int numKeysUpdated = 0;
 181         boolean interrupted = false;
 182 
 183         // Process the polled events while holding the update lock. This allows
 184         // keys to be queued for ready file descriptors so they can be
 185         // re-associated at the next select. The selected-key can be updated
 186         // in this pass.
 187         synchronized (updateLock) {
 188             for (int i = 0; i < numEvents; i++) {
 189                 short source = getSource(i);
 190                 if (source == PORT_SOURCE_FD) {
 191                     int fd = getDescriptor(i);
 192                     SelectionKeyImpl ski = fdToKey.get(fd);
 193                     if (ski != null) {
 194                         ski.registeredEvents(0);
 195                         updateKeys.addLast(ski);
 196 
 197                         // update selected-key set if no action specified
 198                         if (action == null) {
 199                             int rOps = getEventOps(i);
 200                             numKeysUpdated += processReadyEvents(rOps, ski, null);
 201                         }
 202 
 203                     }
 204                 } else if (source == PORT_SOURCE_USER) {
 205                     interrupted = true;
 206                 } else {
 207                     assert false;
 208                 }
 209             }
 210         }
 211 
 212         // if an action specified then iterate over the polled events again so
 213         // that the action is performed without holding the update lock.
 214         if (action != null) {
 215             for (int i = 0; i < numEvents; i++) {
 216                 short source = getSource(i);
 217                 if (source == PORT_SOURCE_FD) {
 218                     int fd = getDescriptor(i);
 219                     SelectionKeyImpl ski = fdToKey.get(fd);
 220                     if (ski != null) {
 221                         int rOps = getEventOps(i);
 222                         numKeysUpdated += processReadyEvents(rOps, ski, action);
 223                     }
 224                 }
 225             }
 226         }
 227 
 228         if (interrupted) {
 229             clearInterrupt();
 230         }
 231         return numKeysUpdated;
 232     }
 233 
 234     @Override
 235     protected void implClose() throws IOException {
 236         assert !isOpen();
 237         assert Thread.holdsLock(this);
 238 
 239         // prevent further wakeup
 240         synchronized (interruptLock) {
 241             interruptTriggered = true;
 242         }
 243 
 244         port_close(pfd);
 245         pollArray.free();
 246     }
 247 
 248     @Override
 249     protected void implDereg(SelectionKeyImpl ski) throws IOException {
 250         assert !ski.isValid();
 251         assert Thread.holdsLock(this);
 252 
 253         int fd = ski.getFDVal();
 254         if (fdToKey.remove(fd) != null) {
 255             if (ski.registeredEvents() != 0) {
 256                 port_dissociate(pfd, PORT_SOURCE_FD, fd);
 257                 ski.registeredEvents(0);
 258             }
 259         } else {
 260             assert ski.registeredEvents() == 0;
 261         }
 262     }
 263 
 264     @Override
 265     public void setEventOps(SelectionKeyImpl ski) {
 266         ensureOpen();
 267         synchronized (updateLock) {
 268             updateKeys.addLast(ski);
 269         }
 270     }
 271 
 272     @Override
 273     public Selector wakeup() {
 274         synchronized (interruptLock) {
 275             if (!interruptTriggered) {
 276                 try {
 277                     port_send(pfd, 0);
 278                 } catch (IOException ioe) {
 279                     throw new InternalError(ioe);
 280                 }
 281                 interruptTriggered = true;
 282             }
 283         }
 284         return this;
 285     }
 286 
 287     private void clearInterrupt() throws IOException {
 288         synchronized (interruptLock) {
 289             interruptTriggered = false;
 290         }
 291     }
 292 
 293     private short getSource(int i) {
 294         int offset = SIZEOF_PORT_EVENT * i + OFFSETOF_SOURCE;
 295         return pollArray.getShort(offset);
 296     }
 297 
 298     private int getEventOps(int i) {
 299         int offset = SIZEOF_PORT_EVENT * i + OFFSETOF_EVENTS;
 300         return pollArray.getInt(offset);
 301     }
 302 
 303     private int getDescriptor(int i) {
 304         //assert Unsafe.getUnsafe().addressSize() == 8;
 305         int offset = SIZEOF_PORT_EVENT * i + OFFSETOF_OBJECT;
 306         return (int) pollArray.getLong(offset);
 307     }
 308 }