1 /*
   2  * Copyright (c) 2008, 2009, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package sun.nio.ch;
  27 
  28 import java.nio.channels.*;
  29 import java.util.concurrent.*;
  30 import java.security.AccessController;
  31 import sun.security.action.GetIntegerAction;
  32 import jdk.internal.misc.InnocuousThread;
  33 
  34 /**
  35  * Defines static methods to invoke a completion handler or arbitrary task.
  36  */
  37 
  38 class Invoker {
  39     private Invoker() { }
  40 
  41     // maximum number of completion handlers that may be invoked on the current
  42     // thread before it re-directs invocations to the thread pool. This helps
  43     // avoid stack overflow and lessens the risk of starvation.
  44     private static final int maxHandlerInvokeCount = AccessController.doPrivileged(
  45         new GetIntegerAction("sun.nio.ch.maxCompletionHandlersOnStack", 16));
  46 
  47     // Per-thread object with reference to channel group and a counter for
  48     // the number of completion handlers invoked. This should be reset to 0
  49     // when all completion handlers have completed.
  50     static class GroupAndInvokeCount {
  51         private final AsynchronousChannelGroupImpl group;
  52         private int handlerInvokeCount;
  53         GroupAndInvokeCount(AsynchronousChannelGroupImpl group) {
  54             this.group = group;
  55         }
  56         AsynchronousChannelGroupImpl group() {
  57             return group;
  58         }
  59         int invokeCount() {
  60             return handlerInvokeCount;
  61         }
  62         void setInvokeCount(int value) {
  63             handlerInvokeCount = value;
  64         }
  65         void resetInvokeCount() {
  66             handlerInvokeCount = 0;
  67         }
  68         void incrementInvokeCount() {
  69             handlerInvokeCount++;
  70         }
  71     }
  72     private static final ThreadLocal<GroupAndInvokeCount> myGroupAndInvokeCount =
  73         new ThreadLocal<GroupAndInvokeCount>() {
  74             @Override protected GroupAndInvokeCount initialValue() {
  75                 return null;
  76             }
  77         };
  78 
  79     /**
  80      * Binds this thread to the given group
  81      */
  82     static void bindToGroup(AsynchronousChannelGroupImpl group) {
  83         myGroupAndInvokeCount.set(new GroupAndInvokeCount(group));
  84     }
  85 
  86     /**
  87      * Returns the GroupAndInvokeCount object for this thread.
  88      */
  89     static GroupAndInvokeCount getGroupAndInvokeCount() {
  90         return myGroupAndInvokeCount.get();
  91     }
  92 
  93     /**
  94      * Returns true if the current thread is in a channel group's thread pool
  95      */
  96     static boolean isBoundToAnyGroup() {
  97         return myGroupAndInvokeCount.get() != null;
  98     }
  99 
 100     /**
 101      * Returns true if the current thread is in the given channel's thread
 102      * pool and we haven't exceeded the maximum number of handler frames on
 103      * the stack.
 104      */
 105     static boolean mayInvokeDirect(GroupAndInvokeCount myGroupAndInvokeCount,
 106                                    AsynchronousChannelGroupImpl group)
 107     {
 108         if ((myGroupAndInvokeCount != null) &&
 109             (myGroupAndInvokeCount.group() == group) &&
 110             (myGroupAndInvokeCount.invokeCount() < maxHandlerInvokeCount))
 111         {
 112             return true;
 113         }
 114         return false;
 115     }
 116 
 117     /**
 118      * Invoke handler without checking the thread identity or number of handlers
 119      * on the thread stack.
 120      */
 121     static <V,A> void invokeUnchecked(CompletionHandler<V,? super A> handler,
 122                                       A attachment,
 123                                       V value,
 124                                       Throwable exc)
 125     {
 126         if (exc == null) {
 127             handler.completed(value, attachment);
 128         } else {
 129             handler.failed(exc, attachment);
 130         }
 131 
 132         // clear interrupt
 133         Thread.interrupted();
 134 
 135         // clear thread locals when in default thread pool
 136         if (System.getSecurityManager() != null) {
 137             Thread me = Thread.currentThread();
 138             if (me instanceof InnocuousThread) {
 139                 GroupAndInvokeCount thisGroupAndInvokeCount = myGroupAndInvokeCount.get();
 140                 ((InnocuousThread)me).eraseThreadLocals();
 141                 if (thisGroupAndInvokeCount != null) {
 142                     myGroupAndInvokeCount.set(thisGroupAndInvokeCount);
 143                 }
 144             }
 145         }
 146     }
 147 
 148     /**
 149      * Invoke handler assuming thread identity already checked
 150      */
 151     static <V,A> void invokeDirect(GroupAndInvokeCount myGroupAndInvokeCount,
 152                                    CompletionHandler<V,? super A> handler,
 153                                    A attachment,
 154                                    V result,
 155                                    Throwable exc)
 156     {
 157         myGroupAndInvokeCount.incrementInvokeCount();
 158         Invoker.invokeUnchecked(handler, attachment, result, exc);
 159     }
 160 
 161     /**
 162      * Invokes the handler. If the current thread is in the channel group's
 163      * thread pool then the handler is invoked directly, otherwise it is
 164      * invoked indirectly.
 165      */
 166     static <V,A> void invoke(AsynchronousChannel channel,
 167                              CompletionHandler<V,? super A> handler,
 168                              A attachment,
 169                              V result,
 170                              Throwable exc)
 171     {
 172         boolean invokeDirect = false;
 173         boolean identityOkay = false;
 174         GroupAndInvokeCount thisGroupAndInvokeCount = myGroupAndInvokeCount.get();
 175         if (thisGroupAndInvokeCount != null) {
 176             if ((thisGroupAndInvokeCount.group() == ((Groupable)channel).group()))
 177                 identityOkay = true;
 178             if (identityOkay &&
 179                 (thisGroupAndInvokeCount.invokeCount() < maxHandlerInvokeCount))
 180             {
 181                 // group match
 182                 invokeDirect = true;
 183             }
 184         }
 185         if (invokeDirect) {
 186             invokeDirect(thisGroupAndInvokeCount, handler, attachment, result, exc);
 187         } else {
 188             try {
 189                 invokeIndirectly(channel, handler, attachment, result, exc);
 190             } catch (RejectedExecutionException ree) {
 191                 // channel group shutdown; fallback to invoking directly
 192                 // if the current thread has the right identity.
 193                 if (identityOkay) {
 194                     invokeDirect(thisGroupAndInvokeCount,
 195                                  handler, attachment, result, exc);
 196                 } else {
 197                     throw new ShutdownChannelGroupException();
 198                 }
 199             }
 200         }
 201     }
 202 
 203     /**
 204      * Invokes the handler indirectly via the channel group's thread pool.
 205      */
 206     static <V,A> void invokeIndirectly(AsynchronousChannel channel,
 207                                        final CompletionHandler<V,? super A> handler,
 208                                        final A attachment,
 209                                        final V result,
 210                                        final Throwable exc)
 211     {
 212         try {
 213             ((Groupable)channel).group().executeOnPooledThread(new Runnable() {
 214                 public void run() {
 215                     GroupAndInvokeCount thisGroupAndInvokeCount =
 216                         myGroupAndInvokeCount.get();
 217                     if (thisGroupAndInvokeCount != null)
 218                         thisGroupAndInvokeCount.setInvokeCount(1);
 219                     invokeUnchecked(handler, attachment, result, exc);
 220                 }
 221             });
 222         } catch (RejectedExecutionException ree) {
 223             throw new ShutdownChannelGroupException();
 224         }
 225     }
 226 
 227     /**
 228      * Invokes the handler "indirectly" in the given Executor
 229      */
 230     static <V,A> void invokeIndirectly(final CompletionHandler<V,? super A> handler,
 231                                        final A attachment,
 232                                        final V value,
 233                                        final Throwable exc,
 234                                        Executor executor)
 235     {
 236          try {
 237             executor.execute(new Runnable() {
 238                 public void run() {
 239                     invokeUnchecked(handler, attachment, value, exc);
 240                 }
 241             });
 242         } catch (RejectedExecutionException ree) {
 243             throw new ShutdownChannelGroupException();
 244         }
 245     }
 246 
 247     /**
 248      * Invokes the given task on the thread pool associated with the given
 249      * channel. If the current thread is in the thread pool then the task is
 250      * invoked directly.
 251      */
 252     static void invokeOnThreadInThreadPool(Groupable channel,
 253                                            Runnable task)
 254     {
 255         boolean invokeDirect;
 256         GroupAndInvokeCount thisGroupAndInvokeCount = myGroupAndInvokeCount.get();
 257         AsynchronousChannelGroupImpl targetGroup = channel.group();
 258         if (thisGroupAndInvokeCount == null) {
 259             invokeDirect = false;
 260         } else {
 261             invokeDirect = (thisGroupAndInvokeCount.group == targetGroup);
 262         }
 263         try {
 264             if (invokeDirect) {
 265                 task.run();
 266             } else {
 267                 targetGroup.executeOnPooledThread(task);
 268             }
 269         } catch (RejectedExecutionException ree) {
 270             throw new ShutdownChannelGroupException();
 271         }
 272     }
 273 
 274     /**
 275      * Invoke handler with completed result. This method does not check the
 276      * thread identity or the number of handlers on the thread stack.
 277      */
 278     static <V,A> void invokeUnchecked(PendingFuture<V,A> future) {
 279         assert future.isDone();
 280         CompletionHandler<V,? super A> handler = future.handler();
 281         if (handler != null) {
 282             invokeUnchecked(handler,
 283                             future.attachment(),
 284                             future.value(),
 285                             future.exception());
 286         }
 287     }
 288 
 289     /**
 290      * Invoke handler with completed result. If the current thread is in the
 291      * channel group's thread pool then the handler is invoked directly,
 292      * otherwise it is invoked indirectly.
 293      */
 294     static <V,A> void invoke(PendingFuture<V,A> future) {
 295         assert future.isDone();
 296         CompletionHandler<V,? super A> handler = future.handler();
 297         if (handler != null) {
 298             invoke(future.channel(),
 299                    handler,
 300                    future.attachment(),
 301                    future.value(),
 302                    future.exception());
 303         }
 304     }
 305 
 306     /**
 307      * Invoke handler with completed result. The handler is invoked indirectly,
 308      * via the channel group's thread pool.
 309      */
 310     static <V,A> void invokeIndirectly(PendingFuture<V,A> future) {
 311         assert future.isDone();
 312         CompletionHandler<V,? super A> handler = future.handler();
 313         if (handler != null) {
 314             invokeIndirectly(future.channel(),
 315                              handler,
 316                              future.attachment(),
 317                              future.value(),
 318                              future.exception());
 319         }
 320     }
 321 }