1 /*
   2  * Copyright (c) 1997, 2010, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package com.sun.xml.internal.ws.api.pipe;
  27 
  28 import com.sun.istack.internal.NotNull;
  29 import com.sun.istack.internal.Nullable;
  30 import com.sun.xml.internal.ws.api.Cancelable;
  31 import com.sun.xml.internal.ws.api.Component;
  32 import com.sun.xml.internal.ws.api.ComponentRegistry;
  33 import com.sun.xml.internal.ws.api.SOAPVersion;
  34 import com.sun.xml.internal.ws.api.addressing.AddressingVersion;
  35 import com.sun.xml.internal.ws.api.message.Packet;
  36 import com.sun.xml.internal.ws.api.pipe.helper.AbstractFilterTubeImpl;
  37 import com.sun.xml.internal.ws.api.server.Adapter;
  38 
  39 import java.util.ArrayList;
  40 import java.util.HashSet;
  41 import java.util.List;
  42 import java.util.Set;
  43 import java.util.concurrent.CopyOnWriteArraySet;
  44 import java.util.concurrent.atomic.AtomicInteger;
  45 import java.util.concurrent.locks.ReentrantLock;
  46 import java.util.logging.Level;
  47 import java.util.logging.Logger;
  48 
  49 /**
  50  * User-level thread. Represents the execution of one request/response processing.
  51  * <p/>
  52  * <p/>
  53  * JAX-WS RI is capable of running a large number of request/response concurrently by
  54  * using a relatively small number of threads. This is made possible by utilizing
  55  * a {@link Fiber} &mdash; a user-level thread that gets created for each request/response
  56  * processing.
  57  * <p/>
  58  * <p/>
  59  * A fiber remembers where in the pipeline the processing is at, what needs to be
  60  * executed on the way out (when processing response), and other additional information
  61  * specific to the execution of a particular request/response.
  62  * <p/>
  63  * <h2>Suspend/Resume</h2>
  64  * <p/>
  65  * Fiber can be {@link NextAction#suspend() suspended} by a {@link Tube}.
  66  * When a fiber is suspended, it will be kept on the side until it is
  67  * {@link #resume(Packet) resumed}. This allows threads to go execute
  68  * other runnable fibers, allowing efficient utilization of smaller number of
  69  * threads.
  70  * <p/>
  71  * <h2>Context-switch Interception</h2>
  72  * <p/>
  73  * {@link FiberContextSwitchInterceptor} allows {@link Tube}s and {@link Adapter}s
  74  * to perform additional processing every time a thread starts running a fiber
  75  * and stops running it.
  76  * <p/>
  77  * <h2>Context ClassLoader</h2>
  78  * <p/>
  79  * Just like thread, a fiber has a context class loader (CCL.) A fiber's CCL
  80  * becomes the thread's CCL when it's executing the fiber. The original CCL
  81  * of the thread will be restored when the thread leaves the fiber execution.
  82  * <p/>
  83  * <p/>
  84  * <h2>Debugging Aid</h2>
  85  * <p/>
  86  * Because {@link Fiber} doesn't keep much in the call stack, and instead use
  87  * {@link #conts} to store the continuation, debugging fiber related activities
  88  * could be harder.
  89  * <p/>
  90  * <p/>
  91  * Setting the {@link #LOGGER} for FINE would give you basic start/stop/resume/suspend
  92  * level logging. Using FINER would cause more detailed logging, which includes
  93  * what tubes are executed in what order and how they behaved.
  94  * <p/>
  95  * <p/>
  96  * When you debug the server side, consider setting {@link Fiber#serializeExecution}
  97  * to true, so that execution of fibers are serialized. Debugging a server
  98  * with more than one running threads is very tricky, and this switch will
  99  * prevent that. This can be also enabled by setting the system property on.
 100  * See the source code.
 101  *
 102  * @author Kohsuke Kawaguchi
 103  * @author Jitendra Kotamraju
 104  */
 105 public final class Fiber implements Runnable, Cancelable, ComponentRegistry {
 106 
 107         /**
 108          * Callback interface for notification of suspend and resume.
 109          *
 110          * @since 2.2.6
 111          */
 112     public interface Listener {
 113         /**
 114          * Fiber has been suspended.  Implementations of this callback may resume the Fiber.
 115          * @param fiber Fiber
 116          */
 117         public void fiberSuspended(Fiber fiber);
 118 
 119         /**
 120          * Fiber has been resumed.  Behavior is undefined if implementations of this callback attempt to suspend the Fiber.
 121          * @param fiber Fiber
 122          */
 123         public void fiberResumed(Fiber fiber);
 124     }
 125 
 126     private List<Listener> _listeners = new ArrayList<Listener>();
 127 
 128     /**
 129      * Adds suspend/resume callback listener
 130      * @param listener Listener
 131      * @since 2.2.6
 132      */
 133     public void addListener(Listener listener) {
 134         synchronized(_listeners) {
 135             if (!_listeners.contains(listener)) {
 136                 _listeners.add(listener);
 137             }
 138         }
 139     }
 140 
 141     /**
 142      * Removes suspend/resume callback listener
 143      * @param listener Listener
 144      * @since 2.2.6
 145      */
 146     public void removeListener(Listener listener) {
 147         synchronized(_listeners) {
 148             _listeners.remove(listener);
 149         }
 150     }
 151 
 152     private List<Listener> getCurrentListeners() {
 153       synchronized(_listeners) {
 154          return new ArrayList<Listener>(_listeners);
 155       }
 156     }
 157 
 158     private void clearListeners() {
 159         synchronized(_listeners) {
 160             _listeners.clear();
 161         }
 162     }
 163 
 164     /**
 165      * {@link Tube}s whose {@link Tube#processResponse(Packet)} method needs
 166      * to be invoked on the way back.
 167      */
 168     private Tube[] conts = new Tube[16];
 169     private int contsSize;
 170 
 171     /**
 172      * If this field is non-null, the next instruction to execute is
 173      * to call its {@link Tube#processRequest(Packet)}. Otherwise
 174      * the instruction is to call {@link #conts}.
 175      */
 176     private Tube next;
 177 
 178     private Packet packet;
 179 
 180     private Throwable/*but really it's either RuntimeException or Error*/ throwable;
 181 
 182     public final Engine owner;
 183 
 184     /**
 185      * Is this thread suspended? 0=not suspended, 1=suspended.
 186      * <p/>
 187      * <p/>
 188      * Logically this is just a boolean, but we need to prepare for the case
 189      * where the thread is {@link #resume(Packet) resumed} before we get to the {@link #suspend()}.
 190      * This happens when things happen in the following order:
 191      * <p/>
 192      * <ol>
 193      * <li>Tube decides that the fiber needs to be suspended to wait for the external event.
 194      * <li>Tube hooks up fiber with some external mechanism (like NIO channel selector)
 195      * <li>Tube returns with {@link NextAction#suspend()}.
 196      * <li>"External mechanism" becomes signal state and invokes {@link Fiber#resume(Packet)}
 197      * to wake up fiber
 198      * <li>{@link Fiber#doRun} invokes {@link Fiber#suspend()}.
 199      * </ol>
 200      * <p/>
 201      * <p/>
 202      * Using int, this will work OK because {@link #suspendedCount} becomes -1 when
 203      * {@link #resume(Packet)} occurs before {@link #suspend()}.
 204      * <p/>
 205      * <p/>
 206      * Increment and decrement is guarded by 'this' object.
 207      */
 208     private volatile int suspendedCount = 0;
 209 
 210     private volatile boolean isInsideSuspendCallbacks = false;
 211 
 212     /**
 213      * Is this fiber completed?
 214      */
 215     private volatile boolean completed;
 216 
 217     /**
 218      * Is this {@link Fiber} currently running in the synchronous mode?
 219      */
 220     private boolean synchronous;
 221 
 222     private boolean interrupted;
 223 
 224     private final int id;
 225 
 226     /**
 227      * Active {@link FiberContextSwitchInterceptor}s for this fiber.
 228      */
 229     private List<FiberContextSwitchInterceptor> interceptors;
 230 
 231     /**
 232      * Not null when {@link #interceptors} is not null.
 233      */
 234     private InterceptorHandler interceptorHandler;
 235 
 236     /**
 237      * This flag is set to true when a new interceptor is added.
 238      * <p/>
 239      * When that happens, we need to first exit the current interceptors
 240      * and then reenter them, so that the newly added interceptors start
 241      * taking effect. This flag is used to control that flow.
 242      */
 243     private boolean needsToReenter;
 244 
 245     /**
 246      * Fiber's context {@link ClassLoader}.
 247      */
 248     private
 249     @Nullable
 250     ClassLoader contextClassLoader;
 251 
 252     private
 253     @Nullable
 254     CompletionCallback completionCallback;
 255 
 256     /**
 257      * The thread on which this Fiber is currently executing, if applicable.
 258      */
 259     private Thread currentThread;
 260 
 261     private volatile boolean isCanceled;
 262 
 263     /**
 264      * Set to true if this fiber is started asynchronously, to avoid
 265      * doubly-invoking completion code.
 266      */
 267     private boolean started;
 268 
 269     /**
 270      * Set to true if this fiber is started sync but allowed to run async.
 271      * This property exists for use cases where the processing model is fundamentally async
 272      * but some requirement or feature mandates that part of the tubeline run synchronously.  For
 273      * instance, WS-ReliableMessaging with non-anonymous addressing is compatible with running
 274      * asynchronously, but if in-order message delivery is used then message processing must assign
 275      * a message number before the remainder of the processing can be asynchronous.
 276      */
 277     private boolean startedSync;
 278 
 279     /**
 280      * Callback to be invoked when a {@link Fiber} finishs execution.
 281      */
 282     public interface CompletionCallback {
 283         /**
 284          * Indicates that the fiber has finished its execution.
 285          * <p/>
 286          * <p/>
 287          * Since the JAX-WS RI runs asynchronously,
 288          * this method maybe invoked by a different thread
 289          * than any of the threads that started it or run a part of tubeline.
 290          */
 291         void onCompletion(@NotNull Packet response);
 292 
 293         /**
 294          * Indicates that the fiber has finished abnormally, by throwing a given {@link Throwable}.
 295          */
 296         void onCompletion(@NotNull Throwable error);
 297     }
 298 
 299     Fiber(Engine engine) {
 300         this.owner = engine;
 301         if (isTraceEnabled()) {
 302             id = iotaGen.incrementAndGet();
 303             LOGGER.fine(getName() + " created");
 304         } else {
 305             id = -1;
 306         }
 307 
 308         // if this is run from another fiber, then we naturally inherit its context classloader,
 309         // so this code works for fiber->fiber inheritance just fine.
 310         contextClassLoader = Thread.currentThread().getContextClassLoader();
 311     }
 312 
 313     /**
 314      * Starts the execution of this fiber asynchronously.
 315      * <p/>
 316      * <p/>
 317      * This method works like {@link Thread#start()}.
 318      *
 319      * @param tubeline           The first tube of the tubeline that will act on the packet.
 320      * @param request            The request packet to be passed to <tt>startPoint.processRequest()</tt>.
 321      * @param completionCallback The callback to be invoked when the processing is finished and the
 322      *                           final response packet is available.
 323      * @see #runSync(Tube, Packet)
 324      */
 325     public void start(@NotNull Tube tubeline, @NotNull Packet request, @Nullable CompletionCallback completionCallback) {
 326         start(tubeline, request, completionCallback, false);
 327     }
 328 
 329     private void dumpFiberContext(String desc) {
 330         if(isTraceEnabled()) {
 331             String action = null;
 332             String msgId = null;
 333             if (packet != null) {
 334                 for (SOAPVersion sv: SOAPVersion.values()) {
 335                     for (AddressingVersion av: AddressingVersion.values()) {
 336                         action = packet.getMessage() != null ? packet.getMessage().getHeaders().getAction(av, sv) : null;
 337                         msgId = packet.getMessage() != null ? packet.getMessage().getHeaders().getMessageID(av, sv) : null;
 338                         if (action != null || msgId != null) {
 339                            break;
 340                         }
 341                     }
 342                     if (action != null || msgId != null) {
 343                         break;
 344                     }
 345                 }
 346             }
 347             String actionAndMsgDesc;
 348             if (action == null && msgId == null) {
 349                 actionAndMsgDesc = "NO ACTION or MSG ID";
 350             } else {
 351                 actionAndMsgDesc = "'" + action + "' and msgId '" + msgId + "'";
 352             }
 353 
 354             String tubeDesc;
 355             if (next != null) {
 356                 tubeDesc = next.toString() + ".processRequest()";
 357             } else {
 358                 tubeDesc = peekCont() + ".processResponse()";
 359             }
 360 
 361             LOGGER.fine(getName() + " " + desc + " with " + actionAndMsgDesc  + " and 'current' tube " + tubeDesc + " from thread " + Thread.currentThread().getName() + " with Packet: " + (packet != null ? packet.toShortString() : null));
 362         }
 363     }
 364 
 365     /**
 366      * Starts the execution of this fiber.
 367      *
 368      * If forceSync is true, then the fiber is started for an ostensibly async invocation,
 369      * but allows for some portion of the tubeline to run sync with the calling
 370      * client instance (Port/Dispatch instance). This allows tubes that enforce
 371      * ordering to see requests in the order they were sent at the point the
 372      * client invoked them.
 373      * <p>
 374      * The forceSync parameter will be true only when the caller (e.g. AsyncInvoker or
 375      * SEIStub) knows one or more tubes need to enforce ordering and thus need
 376      * to run sync with the client. Such tubes can return
 377      * NextAction.INVOKE_ASYNC to indicate that the next tube in the tubeline
 378      * should be invoked async to the current thread.
 379      *
 380      * <p>
 381      * This method works like {@link Thread#start()}.
 382      *
 383      * @param tubeline
 384      *      The first tube of the tubeline that will act on the packet.
 385      * @param request
 386      *      The request packet to be passed to <tt>startPoint.processRequest()</tt>.
 387      * @param completionCallback
 388      *      The callback to be invoked when the processing is finished and the
 389      *      final response packet is available.
 390      *
 391      * @see #start(Tube,Packet,CompletionCallback)
 392      * @see #runSync(Tube,Packet)
 393      * @since 2.2.6
 394      */
 395     public void start(@NotNull Tube tubeline, @NotNull Packet request, @Nullable CompletionCallback completionCallback, boolean forceSync) {
 396         next = tubeline;
 397         this.packet = request;
 398         this.completionCallback = completionCallback;
 399 
 400         if (forceSync) {
 401                 this.startedSync = true;
 402                 dumpFiberContext("starting (sync)");
 403                 run();
 404         } else {
 405                 this.started = true;
 406                 dumpFiberContext("starting (async)");
 407                 owner.addRunnable(this);
 408         }
 409     }
 410 
 411     /**
 412      * Wakes up a suspended fiber.
 413      * <p/>
 414      * <p/>
 415      * If a fiber was suspended without specifying the next {@link Tube},
 416      * then the execution will be resumed in the response processing direction,
 417      * by calling the {@link Tube#processResponse(Packet)} method on the next/first
 418      * {@link Tube} in the {@link Fiber}'s processing stack with the specified resume
 419      * packet as the parameter.
 420      * <p/>
 421      * <p/>
 422      * If a fiber was suspended with specifying the next {@link Tube},
 423      * then the execution will be resumed in the request processing direction,
 424      * by calling the next tube's {@link Tube#processRequest(Packet)} method with the
 425      * specified resume packet as the parameter.
 426      * <p/>
 427      * <p/>
 428      * This method is implemented in a race-free way. Another thread can invoke
 429      * this method even before this fiber goes into the suspension mode. So the caller
 430      * need not worry about synchronizing {@link NextAction#suspend()} and this method.
 431      *
 432      * @param resumePacket packet used in the resumed processing
 433      */
 434     public void resume(@NotNull Packet resumePacket) {
 435         resume(resumePacket, false);
 436     }
 437 
 438     /**
 439      * Similar to resume(Packet) but allowing the Fiber to be resumed
 440      * synchronously (in the current Thread). If you want to know when the
 441      * fiber completes (not when this method returns) then add/wrap a
 442      * CompletionCallback on this Fiber.
 443      * For example, an asynchronous response endpoint that supports WS-ReliableMessaging
 444      * including in-order message delivery may need to resume the Fiber synchronously
 445      * until message order is confirmed prior to returning to asynchronous processing.
 446      * @since 2.2.6
 447      */
 448     public synchronized void resume(@NotNull Packet resumePacket,
 449                                     boolean forceSync) {
 450       resume(resumePacket, forceSync, null);
 451     }
 452 
 453     /**
 454      * Similar to resume(Packet, boolean) but allowing the Fiber to be resumed
 455      * and at the same time atomically assign a new CompletionCallback to it.
 456      * @since 2.2.6
 457      */
 458     public void resume(@NotNull Packet resumePacket,
 459                        boolean forceSync,
 460                        CompletionCallback callback) {
 461 
 462        synchronized(this) {
 463            if (callback != null) {
 464              setCompletionCallback(callback);
 465            }
 466            if(isTraceEnabled())
 467                 LOGGER.fine(getName()+" resuming. Will have suspendedCount=" + (suspendedCount-1));
 468             packet = resumePacket;
 469             if( --suspendedCount == 0 ) {
 470                 if (!isInsideSuspendCallbacks) {
 471                         List<Listener> listeners = getCurrentListeners();
 472                         for (Listener listener: listeners) {
 473                             try {
 474                                 listener.fiberResumed(this);
 475                             } catch (Throwable e) {
 476                                 if (isTraceEnabled())
 477                                         LOGGER.fine("Listener " + listener + " threw exception: "  + e.getMessage());
 478                             }
 479                         }
 480 
 481                         if(synchronous) {
 482                             notifyAll();
 483                         } else if (forceSync || startedSync) {
 484                             run();
 485                         } else {
 486                             dumpFiberContext("resuming (async)");
 487                             owner.addRunnable(this);
 488                         }
 489                 }
 490             } else {
 491                 if (isTraceEnabled()) {
 492                     LOGGER.fine(getName() + " taking no action on resume because suspendedCount != 0: " + suspendedCount);
 493                 }
 494             }
 495         }
 496     }
 497 
 498     /**
 499      * Wakes up a suspended fiber and begins response processing.
 500      * @since 2.2.6
 501      */
 502     public synchronized void resumeAndReturn(@NotNull Packet resumePacket,
 503                                              boolean forceSync) {
 504         if(isTraceEnabled())
 505             LOGGER.fine(getName()+" resumed with Return Packet");
 506         next = null;
 507         resume(resumePacket, forceSync);
 508     }
 509 
 510     /**
 511      * Wakes up a suspended fiber with an exception.
 512      * <p/>
 513      * <p/>
 514      * The execution of the suspended fiber will be resumed in the response
 515      * processing direction, by calling the {@link Tube#processException(Throwable)} method
 516      * on the next/first {@link Tube} in the {@link Fiber}'s processing stack with
 517      * the specified exception as the parameter.
 518      * <p/>
 519      * <p/>
 520      * This method is implemented in a race-free way. Another thread can invoke
 521      * this method even before this fiber goes into the suspension mode. So the caller
 522      * need not worry about synchronizing {@link NextAction#suspend()} and this method.
 523      *
 524      * @param throwable exception that is used in the resumed processing
 525      */
 526     public synchronized void resume(@NotNull Throwable throwable) {
 527         resume(throwable, false);
 528     }
 529 
 530     /**
 531      * Wakes up a suspend fiber with an exception.
 532      *
 533      * If forceSync is true, then the suspended fiber will resume with
 534      * synchronous processing on the current thread.  This will continue
 535      * until some Tube indicates that it is safe to switch to asynchronous
 536      * processing.
 537      *
 538      * @param error exception that is used in the resumed processing
 539      * @param forceSync if processing begins synchronously
 540      * @since 2.2.6
 541      */
 542     public synchronized void resume(@NotNull Throwable error,
 543                                             boolean forceSync) {
 544         if(isTraceEnabled())
 545             LOGGER.fine(getName()+" resumed with Return Throwable");
 546         next = null;
 547         throwable = error;
 548         resume(packet, forceSync);
 549     }
 550 
 551     /**
 552      * Marks this Fiber as cancelled.  A cancelled Fiber will never invoke its completion callback
 553      * @param mayInterrupt if cancel should use {@link Thread.interrupt()}
 554      * @see java.util.Future.cancel
 555      * @since 2.2.6
 556      */
 557     public void cancel(boolean mayInterrupt) {
 558         isCanceled = true;
 559         if (mayInterrupt) {
 560                 synchronized(this) {
 561                         if (currentThread != null)
 562                                 currentThread.interrupt();
 563                 }
 564         }
 565     }
 566 
 567     /**
 568      * Suspends this fiber's execution until the resume method is invoked.
 569      * <p/>
 570      * The call returns immediately, and when the fiber is resumed
 571      * the execution picks up from the last scheduled continuation.
 572      */
 573     private boolean suspend() {
 574 
 575         synchronized(this) {
 576             if(isTraceEnabled()) {
 577                 LOGGER.fine(getName()+" suspending. Will have suspendedCount=" + (suspendedCount+1));
 578                 if (suspendedCount > 0) {
 579                     LOGGER.fine("WARNING - " + getName()+" suspended more than resumed. Will require more than one resume to actually resume this fiber.");
 580                 }
 581             }
 582 
 583             List<Listener> listeners = getCurrentListeners();
 584             if (++suspendedCount == 1) {
 585                 isInsideSuspendCallbacks = true;
 586                 try {
 587                         for (Listener listener: listeners) {
 588                             try {
 589                                 listener.fiberSuspended(this);
 590                             } catch (Throwable e) {
 591                                 if(isTraceEnabled())
 592                                         LOGGER.fine("Listener " + listener + " threw exception: "  + e.getMessage());
 593                             }
 594                         }
 595                 } finally {
 596                         isInsideSuspendCallbacks = false;
 597                 }
 598             }
 599 
 600             if (suspendedCount <= 0) {
 601                 // suspend callback caused fiber to resume
 602                 for (Listener listener: listeners) {
 603                     try {
 604                         listener.fiberResumed(this);
 605                     } catch (Throwable e) {
 606                         if(isTraceEnabled())
 607                                 LOGGER.fine("Listener " + listener + " threw exception: "  + e.getMessage());
 608                     }
 609                 }
 610 
 611                 return false;
 612             }
 613 
 614             return true;
 615         }
 616     }
 617 
 618     /**
 619      * Adds a new {@link FiberContextSwitchInterceptor} to this fiber.
 620      * <p/>
 621      * <p/>
 622      * The newly installed fiber will take effect immediately after the current
 623      * tube returns from its {@link Tube#processRequest(Packet)} or
 624      * {@link Tube#processResponse(Packet)}, before the next tube begins processing.
 625      * <p/>
 626      * <p/>
 627      * So when the tubeline consists of X and Y, and when X installs an interceptor,
 628      * the order of execution will be as follows:
 629      * <p/>
 630      * <ol>
 631      * <li>X.processRequest()
 632      * <li>interceptor gets installed
 633      * <li>interceptor.execute() is invoked
 634      * <li>Y.processRequest()
 635      * </ol>
 636      */
 637     public void addInterceptor(@NotNull FiberContextSwitchInterceptor interceptor) {
 638         if (interceptors == null) {
 639             interceptors = new ArrayList<FiberContextSwitchInterceptor>();
 640             interceptorHandler = new InterceptorHandler();
 641         }
 642         interceptors.add(interceptor);
 643         needsToReenter = true;
 644     }
 645 
 646     /**
 647      * Removes a {@link FiberContextSwitchInterceptor} from this fiber.
 648      * <p/>
 649      * <p/>
 650      * The removal of the interceptor takes effect immediately after the current
 651      * tube returns from its {@link Tube#processRequest(Packet)} or
 652      * {@link Tube#processResponse(Packet)}, before the next tube begins processing.
 653      * <p/>
 654      * <p/>
 655      * <p/>
 656      * So when the tubeline consists of X and Y, and when Y uninstalls an interceptor
 657      * on the way out, then the order of execution will be as follows:
 658      * <p/>
 659      * <ol>
 660      * <li>Y.processResponse() (notice that this happens with interceptor.execute() in the callstack)
 661      * <li>interceptor gets uninstalled
 662      * <li>interceptor.execute() returns
 663      * <li>X.processResponse()
 664      * </ol>
 665      *
 666      * @return true if the specified interceptor was removed. False if
 667      *         the specified interceptor was not registered with this fiber to begin with.
 668      */
 669     public boolean removeInterceptor(@NotNull FiberContextSwitchInterceptor interceptor) {
 670         if (interceptors != null && interceptors.remove(interceptor)) {
 671             needsToReenter = true;
 672             return true;
 673         }
 674         return false;
 675     }
 676 
 677     /**
 678      * Gets the context {@link ClassLoader} of this fiber.
 679      */
 680     public
 681     @Nullable
 682     ClassLoader getContextClassLoader() {
 683         return contextClassLoader;
 684     }
 685 
 686     /**
 687      * Sets the context {@link ClassLoader} of this fiber.
 688      */
 689     public ClassLoader setContextClassLoader(@Nullable ClassLoader contextClassLoader) {
 690         ClassLoader r = this.contextClassLoader;
 691         this.contextClassLoader = contextClassLoader;
 692         return r;
 693     }
 694 
 695     /**
 696      * DO NOT CALL THIS METHOD. This is an implementation detail
 697      * of {@link Fiber}.
 698      */
 699     @Deprecated
 700     public void run() {
 701         assert !synchronous;
 702         doRun();
 703         if (startedSync && suspendedCount == 0 &&
 704             (next != null || contsSize > 0)) {
 705             // We bailed out of running this fiber we started as sync, and now
 706             // want to finish running it async
 707             startedSync = false;
 708             // Start back up as an async fiber
 709             dumpFiberContext("restarting (async) after startSync");
 710             owner.addRunnable(this);
 711         } else {
 712             completionCheck();
 713         }
 714     }
 715 
 716     /**
 717      * Runs a given {@link Tube} (and everything thereafter) synchronously.
 718      * <p/>
 719      * <p/>
 720      * This method blocks and returns only when all the successive {@link Tube}s
 721      * complete their request/response processing. This method can be used
 722      * if a {@link Tube} needs to fallback to synchronous processing.
 723      * <p/>
 724      * <h3>Example:</h3>
 725      * <pre>
 726      * class FooTube extends {@link AbstractFilterTubeImpl} {
 727      *   NextAction processRequest(Packet request) {
 728      *     // run everything synchronously and return with the response packet
 729      *     return doReturnWith(Fiber.current().runSync(next,request));
 730      *   }
 731      *   NextAction processResponse(Packet response) {
 732      *     // never be invoked
 733      *   }
 734      * }
 735      * </pre>
 736      *
 737      * @param tubeline The first tube of the tubeline that will act on the packet.
 738      * @param request  The request packet to be passed to <tt>startPoint.processRequest()</tt>.
 739      * @return The response packet to the <tt>request</tt>.
 740      * @see #start(Tube, Packet, CompletionCallback)
 741      */
 742     public synchronized
 743     @NotNull
 744     Packet runSync(@NotNull Tube tubeline, @NotNull Packet request) {
 745         // save the current continuation, so that we return runSync() without executing them.
 746         final Tube[] oldCont = conts;
 747         final int oldContSize = contsSize;
 748         final boolean oldSynchronous = synchronous;
 749         final Tube oldNext = next;
 750 
 751         if (oldContSize > 0) {
 752             conts = new Tube[16];
 753             contsSize = 0;
 754         }
 755 
 756         try {
 757             synchronous = true;
 758             this.packet = request;
 759             next = tubeline;
 760             doRun();
 761             if (throwable != null) {
 762                 if (throwable instanceof RuntimeException) {
 763                     throw (RuntimeException) throwable;
 764                 }
 765                 if (throwable instanceof Error) {
 766                     throw (Error) throwable;
 767                 }
 768                 // our system is supposed to only accept Error or RuntimeException
 769                 throw new AssertionError(throwable);
 770             }
 771             return this.packet;
 772         } finally {
 773             conts = oldCont;
 774             contsSize = oldContSize;
 775             synchronous = oldSynchronous;
 776             next = oldNext;
 777             if(interrupted) {
 778                 Thread.currentThread().interrupt();
 779                 interrupted = false;
 780             }
 781             if(!started && !startedSync)
 782                 completionCheck();
 783         }
 784     }
 785 
 786     private synchronized void completionCheck() {
 787         // Don't trigger completion and callbacks if fiber is suspended
 788         if(!isCanceled && contsSize==0 && suspendedCount == 0) {
 789             if(isTraceEnabled())
 790                 LOGGER.fine(getName()+" completed");
 791             completed = true;
 792             clearListeners();
 793             notifyAll();
 794             if (completionCallback != null) {
 795                 if (throwable != null)
 796                     completionCallback.onCompletion(throwable);
 797                 else
 798                     completionCallback.onCompletion(packet);
 799             }
 800         }
 801     }
 802 
 803     ///**
 804     // * Blocks until the fiber completes.
 805     // */
 806     //public synchronized void join() throws InterruptedException {
 807     //    while(!completed)
 808     //        wait();
 809     //}
 810 
 811     /**
 812      * Invokes all registered {@link InterceptorHandler}s and then call into
 813      * {@link Fiber#__doRun()}.
 814      */
 815     private class InterceptorHandler implements FiberContextSwitchInterceptor.Work<Tube, Tube> {
 816         /**
 817          * Index in {@link Fiber#interceptors} to invoke next.
 818          */
 819         private int idx;
 820 
 821         /**
 822          * Initiate the interception, and eventually invokes {@link Fiber#__doRun()}.
 823          */
 824         Tube invoke(Tube next) {
 825             idx = 0;
 826             return execute(next);
 827         }
 828 
 829         public Tube execute(Tube next) {
 830             if (idx == interceptors.size()) {
 831                 Fiber.this.next = next;
 832                 __doRun();
 833             } else {
 834                 FiberContextSwitchInterceptor interceptor = interceptors.get(idx++);
 835                 return interceptor.execute(Fiber.this, next, this);
 836             }
 837             return Fiber.this.next;
 838         }
 839     }
 840 
 841     /**
 842      * Executes the fiber as much as possible.
 843      *
 844      */
 845     @SuppressWarnings({"LoopStatementThatDoesntLoop"}) // IntelliJ reports this bogus error
 846     private void doRun() {
 847 
 848         dumpFiberContext("running");
 849 
 850         if (serializeExecution) {
 851             serializedExecutionLock.lock();
 852             try {
 853                 _doRun(next);
 854             } finally {
 855                 serializedExecutionLock.unlock();
 856             }
 857         } else {
 858             _doRun(next);
 859         }
 860     }
 861 
 862     private String currentThreadMonitor = "CurrentThreadMonitor";
 863 
 864     private void _doRun(Tube next) {
 865         Thread thread;
 866         synchronized(currentThreadMonitor) {
 867           if (currentThread != null && !synchronous) {
 868             if (LOGGER.isLoggable(Level.FINE)) {
 869               LOGGER.fine("Attempt to run Fiber ['" + this + "'] in more than one thread. Current Thread: " + currentThread + " Attempted Thread: " + Thread.currentThread());
 870             }
 871             while (currentThread != null) {
 872               try {
 873                 currentThreadMonitor.wait();
 874               } catch (Exception e) {
 875                 // ignore
 876               }
 877             }
 878           }
 879           currentThread = Thread.currentThread();
 880           thread = currentThread;
 881           if (LOGGER.isLoggable(Level.FINE)) {
 882             LOGGER.fine("Thread entering _doRun(): " + thread);
 883           }
 884         }
 885 
 886         ClassLoader old = thread.getContextClassLoader();
 887         thread.setContextClassLoader(contextClassLoader);
 888         try {
 889             do {
 890                 needsToReenter = false;
 891 
 892                 // if interceptors are set, go through the interceptors.
 893                 if(interceptorHandler ==null) {
 894                     this.next = next;
 895                     __doRun();
 896                 }
 897                 else
 898                     next = interceptorHandler.invoke(next);
 899             } while (needsToReenter);
 900 
 901         } finally {
 902             thread.setContextClassLoader(old);
 903             synchronized(currentThreadMonitor) {
 904                 currentThread = null;
 905               if (LOGGER.isLoggable(Level.FINE)) {
 906                 LOGGER.fine("Thread leaving _doRun(): " + thread);
 907               }
 908               currentThreadMonitor.notify();
 909             }
 910         }
 911     }
 912 
 913     /**
 914      * To be invoked from {@link #doRun()}.
 915      *
 916      * @see #doRun()
 917      */
 918     private void __doRun() {
 919         final Fiber old = CURRENT_FIBER.get();
 920         CURRENT_FIBER.set(this);
 921 
 922         // if true, lots of debug messages to show what's being executed
 923         final boolean traceEnabled = LOGGER.isLoggable(Level.FINER);
 924 
 925         try {
 926             boolean abortResponse = false;
 927             boolean justSuspended = false;
 928             while(!isCanceled && !isBlocking(justSuspended) && !needsToReenter) {
 929                 try {
 930                     NextAction na;
 931                     Tube last;
 932                     if(throwable!=null) {
 933                         if(contsSize==0 || abortResponse) {
 934                             contsSize = 0; // abortResponse case
 935                             // nothing else to execute. we are done.
 936                             return;
 937                         }
 938                         last = popCont();
 939                         if (traceEnabled)
 940                             LOGGER.finer(getName() + ' ' + last + ".processException(" + throwable + ')');
 941                         na = last.processException(throwable);
 942                     } else {
 943                         if(next!=null) {
 944                             if(traceEnabled)
 945                                 LOGGER.finer(getName()+' '+next+".processRequest("+(packet != null ? "Packet@"+Integer.toHexString(packet.hashCode()) : "null")+')');
 946                             na = next.processRequest(packet);
 947                             last = next;
 948                         } else {
 949                             if(contsSize==0 || abortResponse) {
 950                                 // nothing else to execute. we are done.
 951                                 contsSize = 0;
 952                                 return;
 953                             }
 954                             last = popCont();
 955                             if(traceEnabled)
 956                                 LOGGER.finer(getName()+' '+last+".processResponse("+(packet != null ? "Packet@"+Integer.toHexString(packet.hashCode()) : "null")+')');
 957                             na = last.processResponse(packet);
 958                         }
 959                     }
 960 
 961                     if (traceEnabled)
 962                         LOGGER.finer(getName() + ' ' + last + " returned with " + na);
 963 
 964                     // If resume is called before suspend, then make sure
 965                     // resume(Packet) is not lost
 966                     if (na.kind != NextAction.SUSPEND) {
 967                         // preserve in-flight packet so that processException may inspect
 968                         if (na.kind != NextAction.THROW &&
 969                           na.kind != NextAction.THROW_ABORT_RESPONSE)
 970                                 packet = na.packet;
 971                         throwable = na.throwable;
 972                     }
 973 
 974                     switch(na.kind) {
 975                     case NextAction.INVOKE:
 976                     case NextAction.INVOKE_ASYNC:
 977                         pushCont(last);
 978                         // fall through next
 979                     case NextAction.INVOKE_AND_FORGET:
 980                         next = na.next;
 981                         if (na.kind == NextAction.INVOKE_ASYNC
 982                             && startedSync) {
 983                           // Break out here
 984                           return;
 985                         }
 986                         break;
 987                     case NextAction.THROW_ABORT_RESPONSE:
 988                     case NextAction.ABORT_RESPONSE:
 989                         abortResponse = true;
 990                         if (LOGGER.isLoggable(Level.FINE)) {
 991                           LOGGER.fine("Fiber " + this + " is aborting a response due to exception: " + na.throwable);
 992                         }
 993                     case NextAction.RETURN:
 994                     case NextAction.THROW:
 995                         next = null;
 996                         break;
 997                     case NextAction.SUSPEND:
 998                         if (next != null) {
 999                           // Only store the 'last' tube when we're processing
1000                           // a request, since conts array is for processResponse
1001                           pushCont(last);
1002                         }
1003                         next = na.next;
1004                         justSuspended = suspend();
1005                         break;
1006                     default:
1007                         throw new AssertionError();
1008                     }
1009                 } catch (RuntimeException t) {
1010                     if (traceEnabled)
1011                         LOGGER.log(Level.FINER, getName() + " Caught " + t + ". Start stack unwinding", t);
1012                     throwable = t;
1013                 } catch (Error t) {
1014                     if (traceEnabled)
1015                         LOGGER.log(Level.FINER, getName() + " Caught " + t + ". Start stack unwinding", t);
1016                     throwable = t;
1017                 }
1018 
1019                 dumpFiberContext("After tube execution");
1020             }
1021 
1022             if (isCanceled) {
1023                 next = null;
1024                 throwable = null;
1025                 contsSize = 0;
1026             }
1027 
1028             // there's nothing we can execute right away.
1029             // we'll be back when this fiber is resumed.
1030 
1031         } finally {
1032             CURRENT_FIBER.set(old);
1033         }
1034     }
1035 
1036     private void pushCont(Tube tube) {
1037         conts[contsSize++] = tube;
1038 
1039         // expand if needed
1040         int len = conts.length;
1041         if (contsSize == len) {
1042             Tube[] newBuf = new Tube[len * 2];
1043             System.arraycopy(conts, 0, newBuf, 0, len);
1044             conts = newBuf;
1045         }
1046     }
1047 
1048     private Tube popCont() {
1049         return conts[--contsSize];
1050     }
1051 
1052     private Tube peekCont() {
1053         int index = contsSize - 1;
1054         if (index >= 0 && index < conts.length) {
1055           return conts[index];
1056         } else {
1057           return null;
1058         }
1059     }
1060 
1061     /**
1062      * Only to be used by Tubes that manipulate the Fiber to create alternate flows
1063      * @since 2.2.6
1064      */
1065     public void resetCont(Tube[] conts, int contsSize) {
1066         this.conts = conts;
1067         this.contsSize = contsSize;
1068     }
1069 
1070     /**
1071      * Returns true if the fiber needs to block its execution.
1072      */
1073     private boolean isBlocking(boolean justSuspended) {
1074         if (synchronous) {
1075             while (suspendedCount == 1)
1076                 try {
1077                     if (isTraceEnabled()) {
1078                         LOGGER.fine(getName() + " is blocking thread " + Thread.currentThread().getName());
1079                     }
1080                     wait(); // the synchronized block is the whole runSync method.
1081                 } catch (InterruptedException e) {
1082                     // remember that we are interrupted, but don't respond to it
1083                     // right away. This behavior is in line with what happens
1084                     // when you are actually running the whole thing synchronously.
1085                     interrupted = true;
1086                 }
1087             return false;
1088         }
1089         else
1090             return justSuspended || suspendedCount==1;
1091     }
1092 
1093     private String getName() {
1094         return "engine-" + owner.id + "fiber-" + id;
1095     }
1096 
1097     @Override
1098     public String toString() {
1099         return getName();
1100     }
1101 
1102     /**
1103      * Gets the current {@link Packet} associated with this fiber.
1104      * <p/>
1105      * <p/>
1106      * This method returns null if no packet has been associated with the fiber yet.
1107      */
1108     public
1109     @Nullable
1110     Packet getPacket() {
1111         return packet;
1112     }
1113 
1114     /**
1115      * Returns completion callback associated with this Fiber
1116      * @return Completion callback
1117      * @since 2.2.6
1118      */
1119     public CompletionCallback getCompletionCallback() {
1120         return completionCallback;
1121     }
1122 
1123     /**
1124      * Updates completion callback associated with this Fiber
1125      * @param completionCallback Completion callback
1126      * @since 2.2.6
1127      */
1128     public void setCompletionCallback(CompletionCallback completionCallback) {
1129         this.completionCallback = completionCallback;
1130     }
1131 
1132     /**
1133      * (ADVANCED) Returns true if the current fiber is being executed synchronously.
1134      * <p/>
1135      * <p/>
1136      * Fiber may run synchronously for various reasons. Perhaps this is
1137      * on client side and application has invoked a synchronous method call.
1138      * Perhaps this is on server side and we have deployed on a synchronous
1139      * transport (like servlet.)
1140      * <p/>
1141      * <p/>
1142      * When a fiber is run synchronously (IOW by {@link #runSync(Tube, Packet)}),
1143      * further invocations to {@link #runSync(Tube, Packet)} can be done
1144      * without degrading the performance.
1145      * <p/>
1146      * <p/>
1147      * So this value can be used as a further optimization hint for
1148      * advanced {@link Tube}s to choose the best strategy to invoke
1149      * the next {@link Tube}. For example, a tube may want to install
1150      * a {@link FiberContextSwitchInterceptor} if running async, yet
1151      * it might find it faster to do {@link #runSync(Tube, Packet)}
1152      * if it's already running synchronously.
1153      */
1154     public static boolean isSynchronous() {
1155         return current().synchronous;
1156     }
1157 
1158     /**
1159      * Returns true if the current Fiber on the current thread was started
1160      * synchronously. Note, this is not strictly the same as being synchronous
1161      * because the assumption is that the Fiber will ultimately be dispatched
1162      * asynchronously, possibly have a completion callback associated with it, etc.
1163      * Note, the 'startedSync' flag is cleared once the current Fiber is
1164      * converted to running asynchronously.
1165      * @since 2.2.6
1166      */
1167     public boolean isStartedSync() {
1168         return startedSync;
1169     }
1170 
1171     /**
1172      * Gets the current fiber that's running.
1173      * <p/>
1174      * <p/>
1175      * This works like {@link Thread#currentThread()}.
1176      * This method only works when invoked from {@link Tube}.
1177      */
1178     public static
1179     @NotNull
1180     Fiber current() {
1181         Fiber fiber = CURRENT_FIBER.get();
1182         if (fiber == null)
1183             throw new IllegalStateException("Can be only used from fibers");
1184         return fiber;
1185     }
1186 
1187     /**
1188      * Gets the current fiber that's running, if set.
1189      */
1190     public static Fiber getCurrentIfSet() {
1191         return CURRENT_FIBER.get();
1192     }
1193 
1194     private static final ThreadLocal<Fiber> CURRENT_FIBER = new ThreadLocal<Fiber>();
1195 
1196     /**
1197      * Used to allocate unique number for each fiber.
1198      */
1199     private static final AtomicInteger iotaGen = new AtomicInteger();
1200 
1201     private static boolean isTraceEnabled() {
1202         return LOGGER.isLoggable(Level.FINE);
1203     }
1204 
1205     private static final Logger LOGGER = Logger.getLogger(Fiber.class.getName());
1206 
1207 
1208     private static final ReentrantLock serializedExecutionLock = new ReentrantLock();
1209 
1210     /**
1211      * Set this boolean to true to execute fibers sequentially one by one.
1212      * See class javadoc.
1213      */
1214     public static volatile boolean serializeExecution = Boolean.getBoolean(Fiber.class.getName() + ".serialize");
1215 
1216     private final Set<Component> components = new CopyOnWriteArraySet<Component>();
1217 
1218         public <S> S getSPI(Class<S> spiType) {
1219                 for(Component c : components) {
1220                         S spi = c.getSPI(spiType);
1221                         if (spi != null)
1222                                 return spi;
1223                 }
1224                 return null;
1225         }
1226 
1227         public Set<Component> getComponents() {
1228                 return components;
1229         }
1230 }