1 /*
   2  * Copyright (c) 1997, 2013, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package com.sun.xml.internal.ws.api.pipe;
  27 
  28 import com.sun.istack.internal.NotNull;
  29 import com.sun.istack.internal.Nullable;
  30 import com.sun.xml.internal.ws.api.Cancelable;
  31 import com.sun.xml.internal.ws.api.Component;
  32 import com.sun.xml.internal.ws.api.ComponentRegistry;
  33 import com.sun.xml.internal.ws.api.SOAPVersion;
  34 import com.sun.xml.internal.ws.api.addressing.AddressingVersion;
  35 import com.sun.xml.internal.ws.api.message.AddressingUtils;
  36 import com.sun.xml.internal.ws.api.message.Packet;
  37 import com.sun.xml.internal.ws.api.pipe.helper.AbstractFilterTubeImpl;
  38 import com.sun.xml.internal.ws.api.pipe.helper.AbstractTubeImpl;
  39 import com.sun.xml.internal.ws.api.server.Adapter;
  40 import com.sun.xml.internal.ws.api.server.Container;
  41 import com.sun.xml.internal.ws.api.server.ContainerResolver;
  42 
  43 import java.util.ArrayList;
  44 import java.util.List;
  45 import java.util.Set;
  46 import java.util.concurrent.CopyOnWriteArraySet;
  47 import java.util.concurrent.atomic.AtomicInteger;
  48 import java.util.concurrent.locks.Condition;
  49 import java.util.concurrent.locks.ReentrantLock;
  50 import java.util.logging.Level;
  51 import java.util.logging.Logger;
  52 
  53 import javax.xml.ws.Holder;
  54 import javax.xml.ws.WebServiceException;
  55 
  56 /**
  57  * User-level thread. Represents the execution of one request/response processing.
  58  * <p/>
  59  * <p/>
  60  * JAX-WS RI is capable of running a large number of request/response concurrently by
  61  * using a relatively small number of threads. This is made possible by utilizing
  62  * a {@link Fiber} &mdash; a user-level thread that gets created for each request/response
  63  * processing.
  64  * <p/>
  65  * <p/>
  66  * A fiber remembers where in the pipeline the processing is at, what needs to be
  67  * executed on the way out (when processing response), and other additional information
  68  * specific to the execution of a particular request/response.
  69  * <p/>
  70  * <h2>Suspend/Resume</h2>
  71  * <p/>
  72  * Fiber can be {@link NextAction#suspend() suspended} by a {@link Tube}.
  73  * When a fiber is suspended, it will be kept on the side until it is
  74  * {@link #resume(Packet) resumed}. This allows threads to go execute
  75  * other runnable fibers, allowing efficient utilization of smaller number of
  76  * threads.
  77  * <p/>
  78  * <h2>Context-switch Interception</h2>
  79  * <p/>
  80  * {@link FiberContextSwitchInterceptor} allows {@link Tube}s and {@link Adapter}s
  81  * to perform additional processing every time a thread starts running a fiber
  82  * and stops running it.
  83  * <p/>
  84  * <h2>Context ClassLoader</h2>
  85  * <p/>
  86  * Just like thread, a fiber has a context class loader (CCL.) A fiber's CCL
  87  * becomes the thread's CCL when it's executing the fiber. The original CCL
  88  * of the thread will be restored when the thread leaves the fiber execution.
  89  * <p/>
  90  * <p/>
  91  * <h2>Debugging Aid</h2>
  92  * <p/>
  93  * Because {@link Fiber} doesn't keep much in the call stack, and instead use
  94  * {@link #conts} to store the continuation, debugging fiber related activities
  95  * could be harder.
  96  * <p/>
  97  * <p/>
  98  * Setting the {@link #LOGGER} for FINE would give you basic start/stop/resume/suspend
  99  * level logging. Using FINER would cause more detailed logging, which includes
 100  * what tubes are executed in what order and how they behaved.
 101  * <p/>
 102  * <p/>
 103  * When you debug the server side, consider setting {@link Fiber#serializeExecution}
 104  * to true, so that execution of fibers are serialized. Debugging a server
 105  * with more than one running threads is very tricky, and this switch will
 106  * prevent that. This can be also enabled by setting the system property on.
 107  * See the source code.
 108  *
 109  * @author Kohsuke Kawaguchi
 110  * @author Jitendra Kotamraju
 111  */
 112 public final class Fiber implements Runnable, Cancelable, ComponentRegistry {
 113 
 114         /**
 115          * Callback interface for notification of suspend and resume.
 116          *
 117          * @since 2.2.6
 118          * @deprecated Use {@link NextAction#suspend(Runnable)}
 119          */
 120     public interface Listener {
 121         /**
 122          * Fiber has been suspended.  Implementations of this callback may resume the Fiber.
 123          * @param fiber Fiber
 124          */
 125         public void fiberSuspended(Fiber fiber);
 126 
 127         /**
 128          * Fiber has been resumed.  Behavior is undefined if implementations of this callback attempt to suspend the Fiber.
 129          * @param fiber Fiber
 130          */
 131         public void fiberResumed(Fiber fiber);
 132     }
 133 
 134     private final List<Listener> _listeners = new ArrayList<Listener>();
 135 
 136     /**
 137      * Adds suspend/resume callback listener
 138      * @param listener Listener
 139      * @since 2.2.6
 140      * @deprecated
 141      */
 142     public void addListener(Listener listener) {
 143         synchronized(_listeners) {
 144             if (!_listeners.contains(listener)) {
 145                 _listeners.add(listener);
 146             }
 147         }
 148     }
 149 
 150     /**
 151      * Removes suspend/resume callback listener
 152      * @param listener Listener
 153      * @since 2.2.6
 154      * @deprecated
 155      */
 156     public void removeListener(Listener listener) {
 157         synchronized(_listeners) {
 158             _listeners.remove(listener);
 159         }
 160     }
 161 
 162     List<Listener> getCurrentListeners() {
 163       synchronized(_listeners) {
 164          return new ArrayList<Listener>(_listeners);
 165       }
 166     }
 167 
 168     private void clearListeners() {
 169         synchronized(_listeners) {
 170             _listeners.clear();
 171         }
 172     }
 173 
 174     /**
 175      * {@link Tube}s whose {@link Tube#processResponse(Packet)} method needs
 176      * to be invoked on the way back.
 177      */
 178     private Tube[] conts = new Tube[16];
 179     private int contsSize;
 180 
 181     /**
 182      * If this field is non-null, the next instruction to execute is
 183      * to call its {@link Tube#processRequest(Packet)}. Otherwise
 184      * the instruction is to call {@link #conts}.
 185      */
 186     private Tube next;
 187 
 188     private Packet packet;
 189 
 190     private Throwable/*but really it's either RuntimeException or Error*/ throwable;
 191 
 192     public final Engine owner;
 193 
 194     /**
 195      * Is this thread suspended? 0=not suspended, 1=suspended.
 196      * <p/>
 197      * <p/>
 198      * Logically this is just a boolean, but we need to prepare for the case
 199      * where the thread is {@link #resume(Packet) resumed} before we get to the {@link #suspend()}.
 200      * This happens when things happen in the following order:
 201      * <p/>
 202      * <ol>
 203      * <li>Tube decides that the fiber needs to be suspended to wait for the external event.
 204      * <li>Tube hooks up fiber with some external mechanism (like NIO channel selector)
 205      * <li>Tube returns with {@link NextAction#suspend()}.
 206      * <li>"External mechanism" becomes signal state and invokes {@link Fiber#resume(Packet)}
 207      * to wake up fiber
 208      * <li>{@link Fiber#doRun} invokes {@link Fiber#suspend()}.
 209      * </ol>
 210      * <p/>
 211      * <p/>
 212      * Using int, this will work OK because {@link #suspendedCount} becomes -1 when
 213      * {@link #resume(Packet)} occurs before {@link #suspend()}.
 214      * <p/>
 215      * <p/>
 216      * Increment and decrement is guarded by 'this' object.
 217      */
 218     private volatile int suspendedCount = 0;
 219 
 220     private volatile boolean isInsideSuspendCallbacks = false;
 221 
 222     /**
 223      * Is this {@link Fiber} currently running in the synchronous mode?
 224      */
 225     private boolean synchronous;
 226 
 227     private boolean interrupted;
 228 
 229     private final int id;
 230 
 231     /**
 232      * Active {@link FiberContextSwitchInterceptor}s for this fiber.
 233      */
 234     private List<FiberContextSwitchInterceptor> interceptors;
 235 
 236     /**
 237      * Fiber's context {@link ClassLoader}.
 238      */
 239     private
 240     @Nullable
 241     ClassLoader contextClassLoader;
 242 
 243     private
 244     @Nullable
 245     CompletionCallback completionCallback;
 246 
 247     private boolean isDeliverThrowableInPacket = false;
 248 
 249     public void setDeliverThrowableInPacket(boolean isDeliverThrowableInPacket) {
 250         this.isDeliverThrowableInPacket = isDeliverThrowableInPacket;
 251     }
 252 
 253     /**
 254      * The thread on which this Fiber is currently executing, if applicable.
 255      */
 256     private Thread currentThread;
 257 
 258     /**
 259      * Replace uses of synchronized(this) with this lock so that we can control
 260      * unlocking for resume use cases
 261      */
 262     private final ReentrantLock lock = new ReentrantLock();
 263     private final Condition condition = lock.newCondition();
 264 
 265     private volatile boolean isCanceled;
 266 
 267     /**
 268      * Set to true if this fiber is started asynchronously, to avoid
 269      * doubly-invoking completion code.
 270      */
 271     private boolean started;
 272 
 273     /**
 274      * Set to true if this fiber is started sync but allowed to run async.
 275      * This property exists for use cases where the processing model is fundamentally async
 276      * but some requirement or feature mandates that part of the tubeline run synchronously.  For
 277      * instance, WS-ReliableMessaging with non-anonymous addressing is compatible with running
 278      * asynchronously, but if in-order message delivery is used then message processing must assign
 279      * a message number before the remainder of the processing can be asynchronous.
 280      */
 281     private boolean startedSync;
 282 
 283     /**
 284      * Callback to be invoked when a {@link Fiber} finishes execution.
 285      */
 286     public interface CompletionCallback {
 287         /**
 288          * Indicates that the fiber has finished its execution.
 289          * <p/>
 290          * <p/>
 291          * Since the JAX-WS RI runs asynchronously,
 292          * this method maybe invoked by a different thread
 293          * than any of the threads that started it or run a part of tubeline.
 294          */
 295         void onCompletion(@NotNull Packet response);
 296 
 297         /**
 298          * Indicates that the fiber has finished abnormally, by throwing a given {@link Throwable}.
 299          */
 300         void onCompletion(@NotNull Throwable error);
 301     }
 302 
 303     Fiber(Engine engine) {
 304         this.owner = engine;
 305         id = iotaGen.incrementAndGet();
 306         if (isTraceEnabled()) {
 307             LOGGER.log(Level.FINE, "{0} created", getName());
 308         }
 309 
 310         // if this is run from another fiber, then we naturally inherit its context classloader,
 311         // so this code works for fiber->fiber inheritance just fine.
 312         contextClassLoader = Thread.currentThread().getContextClassLoader();
 313     }
 314 
 315     /**
 316      * Starts the execution of this fiber asynchronously.
 317      * <p/>
 318      * <p/>
 319      * This method works like {@link Thread#start()}.
 320      *
 321      * @param tubeline           The first tube of the tubeline that will act on the packet.
 322      * @param request            The request packet to be passed to {@code startPoint.processRequest()}.
 323      * @param completionCallback The callback to be invoked when the processing is finished and the
 324      *                           final response packet is available.
 325      * @see #runSync(Tube, Packet)
 326      */
 327     public void start(@NotNull Tube tubeline, @NotNull Packet request, @Nullable CompletionCallback completionCallback) {
 328         start(tubeline, request, completionCallback, false);
 329     }
 330 
 331     private void dumpFiberContext(String desc) {
 332         if(isTraceEnabled()) {
 333             String action = null;
 334             String msgId = null;
 335             if (packet != null) {
 336                 for (SOAPVersion sv: SOAPVersion.values()) {
 337                     for (AddressingVersion av: AddressingVersion.values()) {
 338                         action = packet.getMessage() != null ? AddressingUtils.getAction(packet.getMessage().getHeaders(), av, sv) : null;
 339                         msgId = packet.getMessage() != null ? AddressingUtils.getMessageID(packet.getMessage().getHeaders(), av, sv) : null;
 340                         if (action != null || msgId != null) {
 341                            break;
 342                         }
 343                     }
 344                     if (action != null || msgId != null) {
 345                         break;
 346                     }
 347                 }
 348             }
 349             String actionAndMsgDesc;
 350             if (action == null && msgId == null) {
 351                 actionAndMsgDesc = "NO ACTION or MSG ID";
 352             } else {
 353                 actionAndMsgDesc = "'" + action + "' and msgId '" + msgId + "'";
 354             }
 355 
 356             String tubeDesc;
 357             if (next != null) {
 358                 tubeDesc = next.toString() + ".processRequest()";
 359             } else {
 360                 tubeDesc = peekCont() + ".processResponse()";
 361             }
 362 
 363             LOGGER.log(Level.FINE, "{0} {1} with {2} and ''current'' tube {3} from thread {4} with Packet: {5}", new Object[]{getName(), desc, actionAndMsgDesc, tubeDesc, Thread.currentThread().getName(), packet != null ? packet.toShortString() : null});
 364         }
 365     }
 366 
 367     /**
 368      * Starts the execution of this fiber.
 369      *
 370      * If forceSync is true, then the fiber is started for an ostensibly async invocation,
 371      * but allows for some portion of the tubeline to run sync with the calling
 372      * client instance (Port/Dispatch instance). This allows tubes that enforce
 373      * ordering to see requests in the order they were sent at the point the
 374      * client invoked them.
 375      * <p>
 376      * The forceSync parameter will be true only when the caller (e.g. AsyncInvoker or
 377      * SEIStub) knows one or more tubes need to enforce ordering and thus need
 378      * to run sync with the client. Such tubes can return
 379      * NextAction.INVOKE_ASYNC to indicate that the next tube in the tubeline
 380      * should be invoked async to the current thread.
 381      *
 382      * <p>
 383      * This method works like {@link Thread#start()}.
 384      *
 385      * @param tubeline
 386      *      The first tube of the tubeline that will act on the packet.
 387      * @param request
 388      *      The request packet to be passed to {@code startPoint.processRequest()}.
 389      * @param completionCallback
 390      *      The callback to be invoked when the processing is finished and the
 391      *      final response packet is available.
 392      *
 393      * @see #start(Tube,Packet,CompletionCallback)
 394      * @see #runSync(Tube,Packet)
 395      * @since 2.2.6
 396      */
 397     public void start(@NotNull Tube tubeline, @NotNull Packet request, @Nullable CompletionCallback completionCallback, boolean forceSync) {
 398         next = tubeline;
 399         this.packet = request;
 400         this.completionCallback = completionCallback;
 401 
 402         if (forceSync) {
 403                 this.startedSync = true;
 404                 dumpFiberContext("starting (sync)");
 405                 run();
 406         } else {
 407                 this.started = true;
 408                 dumpFiberContext("starting (async)");
 409                 owner.addRunnable(this);
 410         }
 411     }
 412 
 413     /**
 414      * Wakes up a suspended fiber.
 415      * <p/>
 416      * <p/>
 417      * If a fiber was suspended without specifying the next {@link Tube},
 418      * then the execution will be resumed in the response processing direction,
 419      * by calling the {@link Tube#processResponse(Packet)} method on the next/first
 420      * {@link Tube} in the {@link Fiber}'s processing stack with the specified resume
 421      * packet as the parameter.
 422      * <p/>
 423      * <p/>
 424      * If a fiber was suspended with specifying the next {@link Tube},
 425      * then the execution will be resumed in the request processing direction,
 426      * by calling the next tube's {@link Tube#processRequest(Packet)} method with the
 427      * specified resume packet as the parameter.
 428      * <p/>
 429      * <p/>
 430      * This method is implemented in a race-free way. Another thread can invoke
 431      * this method even before this fiber goes into the suspension mode. So the caller
 432      * need not worry about synchronizing {@link NextAction#suspend()} and this method.
 433      *
 434      * @param resumePacket packet used in the resumed processing
 435      */
 436     public void resume(@NotNull Packet resumePacket) {
 437         resume(resumePacket, false);
 438     }
 439 
 440     /**
 441      * Similar to resume(Packet) but allowing the Fiber to be resumed
 442      * synchronously (in the current Thread). If you want to know when the
 443      * fiber completes (not when this method returns) then add/wrap a
 444      * CompletionCallback on this Fiber.
 445      * For example, an asynchronous response endpoint that supports WS-ReliableMessaging
 446      * including in-order message delivery may need to resume the Fiber synchronously
 447      * until message order is confirmed prior to returning to asynchronous processing.
 448      * @since 2.2.6
 449      */
 450     public void resume(@NotNull Packet resumePacket,
 451     boolean forceSync) {
 452         resume(resumePacket, forceSync, null);
 453     }
 454 
 455     /**
 456      * Similar to resume(Packet, boolean) but allowing the Fiber to be resumed
 457      * and at the same time atomically assign a new CompletionCallback to it.
 458      * @since 2.2.6
 459      */
 460     public void resume(@NotNull Packet resumePacket,
 461                        boolean forceSync,
 462                        CompletionCallback callback) {
 463        lock.lock();
 464        try {
 465            if (callback != null) {
 466              setCompletionCallback(callback);
 467            }
 468            if(isTraceEnabled())
 469                 LOGGER.log(Level.FINE, "{0} resuming. Will have suspendedCount={1}", new Object[]{getName(), suspendedCount-1});
 470                 packet = resumePacket;
 471                 if( --suspendedCount == 0 ) {
 472                    if (!isInsideSuspendCallbacks) {
 473                         List<Listener> listeners = getCurrentListeners();
 474                         for (Listener listener: listeners) {
 475                             try {
 476                                 listener.fiberResumed(this);
 477                             } catch (Throwable e) {
 478                                 if (isTraceEnabled())
 479                                         LOGGER.log(Level.FINE, "Listener {0} threw exception: {1}", new Object[]{listener, e.getMessage()});
 480                             }
 481                         }
 482 
 483                         if(synchronous) {
 484                             condition.signalAll();
 485                         } else if (forceSync || startedSync) {
 486                             run();
 487                         } else {
 488                             dumpFiberContext("resuming (async)");
 489                             owner.addRunnable(this);
 490                         }
 491                    }
 492                 } else {
 493                     if (isTraceEnabled()) {
 494                         LOGGER.log(Level.FINE, "{0} taking no action on resume because suspendedCount != 0: {1}", new Object[]{getName(), suspendedCount});
 495                 }
 496             }
 497        } finally {
 498            lock.unlock();
 499        }
 500     }
 501 
 502     /**
 503      * Wakes up a suspended fiber and begins response processing.
 504      * @since 2.2.6
 505      */
 506     public void resumeAndReturn(@NotNull Packet resumePacket,
 507                                 boolean forceSync) {
 508         if(isTraceEnabled())
 509             LOGGER.log(Level.FINE, "{0} resumed with Return Packet", getName());
 510         next = null;
 511         resume(resumePacket, forceSync);
 512     }
 513 
 514     /**
 515      * Wakes up a suspended fiber with an exception.
 516      * <p/>
 517      * <p/>
 518      * The execution of the suspended fiber will be resumed in the response
 519      * processing direction, by calling the {@link Tube#processException(Throwable)} method
 520      * on the next/first {@link Tube} in the {@link Fiber}'s processing stack with
 521      * the specified exception as the parameter.
 522      * <p/>
 523      * <p/>
 524      * This method is implemented in a race-free way. Another thread can invoke
 525      * this method even before this fiber goes into the suspension mode. So the caller
 526      * need not worry about synchronizing {@link NextAction#suspend()} and this method.
 527      *
 528      * @param throwable exception that is used in the resumed processing
 529      */
 530     public void resume(@NotNull Throwable throwable) {
 531         resume(throwable, packet, false);
 532     }
 533 
 534     /**
 535      * Wakes up a suspended fiber with an exception.
 536      * <p/>
 537      * <p/>
 538      * The execution of the suspended fiber will be resumed in the response
 539      * processing direction, by calling the {@link Tube#processException(Throwable)} method
 540      * on the next/first {@link Tube} in the {@link Fiber}'s processing stack with
 541      * the specified exception as the parameter.
 542      * <p/>
 543      * <p/>
 544      * This method is implemented in a race-free way. Another thread can invoke
 545      * this method even before this fiber goes into the suspension mode. So the caller
 546      * need not worry about synchronizing {@link NextAction#suspend()} and this method.
 547      *
 548      * @param throwable exception that is used in the resumed processing
 549      * @param packet Packet that will be visible on the Fiber after the resume
 550      * @since 2.2.8
 551      */
 552     public void resume(@NotNull Throwable throwable, @NotNull Packet packet) {
 553         resume(throwable, packet, false);
 554     }
 555 
 556     /**
 557      * Wakes up a suspend fiber with an exception.
 558      *
 559      * If forceSync is true, then the suspended fiber will resume with
 560      * synchronous processing on the current thread.  This will continue
 561      * until some Tube indicates that it is safe to switch to asynchronous
 562      * processing.
 563      *
 564      * @param error exception that is used in the resumed processing
 565      * @param forceSync if processing begins synchronously
 566      * @since 2.2.6
 567      */
 568     public void resume(@NotNull Throwable error,
 569                        boolean forceSync) {
 570         resume(error, packet, forceSync);
 571     }
 572 
 573     /**
 574      * Wakes up a suspend fiber with an exception.
 575      *
 576      * If forceSync is true, then the suspended fiber will resume with
 577      * synchronous processing on the current thread.  This will continue
 578      * until some Tube indicates that it is safe to switch to asynchronous
 579      * processing.
 580      *
 581      * @param error exception that is used in the resumed processing
 582      * @param packet Packet that will be visible on the Fiber after the resume
 583      * @param forceSync if processing begins synchronously
 584      * @since 2.2.8
 585      */
 586     public void resume(@NotNull Throwable error,
 587                        @NotNull Packet packet,
 588                        boolean forceSync) {
 589         if(isTraceEnabled())
 590             LOGGER.log(Level.FINE, "{0} resumed with Return Throwable", getName());
 591         next = null;
 592         throwable = error;
 593         resume(packet, forceSync);
 594     }
 595 
 596     /**
 597      * Marks this Fiber as cancelled.  A cancelled Fiber will never invoke its completion callback
 598      * @param mayInterrupt if cancel should use {@link Thread#interrupt()}
 599      * @see java.util.concurrent.Future#cancel(boolean)
 600      * @since 2.2.6
 601      */
 602     @Override
 603     public void cancel(boolean mayInterrupt) {
 604         isCanceled = true;
 605         if (mayInterrupt) {
 606             // synchronized(this) is used as Thread running Fiber will be holding lock
 607             synchronized(this) {
 608                 if (currentThread != null)
 609                     currentThread.interrupt();
 610             }
 611         }
 612     }
 613 
 614     /**
 615      * Suspends this fiber's execution until the resume method is invoked.
 616      * <p/>
 617      * The call returns immediately, and when the fiber is resumed
 618      * the execution picks up from the last scheduled continuation.
 619      * @param onExitRunnable runnable to be invoked after fiber is marked for suspension
 620      * @return if control loop must exit
 621      */
 622     private boolean suspend(Holder<Boolean> isRequireUnlock, Runnable onExitRunnable) {
 623         if(isTraceEnabled()) {
 624             LOGGER.log(Level.FINE, "{0} suspending. Will have suspendedCount={1}", new Object[]{getName(), suspendedCount+1});
 625             if (suspendedCount > 0) {
 626                 LOGGER.log(Level.FINE, "WARNING - {0} suspended more than resumed. Will require more than one resume to actually resume this fiber.", getName());
 627             }
 628         }
 629 
 630         List<Listener> listeners = getCurrentListeners();
 631         if (++suspendedCount == 1) {
 632             isInsideSuspendCallbacks = true;
 633             try {
 634                 for (Listener listener: listeners) {
 635                     try {
 636                         listener.fiberSuspended(this);
 637                     } catch (Throwable e) {
 638                         if(isTraceEnabled())
 639                             LOGGER.log(Level.FINE, "Listener {0} threw exception: {1}", new Object[]{listener, e.getMessage()});
 640                     }
 641                 }
 642             } finally {
 643                 isInsideSuspendCallbacks = false;
 644             }
 645         }
 646 
 647         if (suspendedCount <= 0) {
 648             // suspend callback caused fiber to resume
 649             for (Listener listener: listeners) {
 650                 try {
 651                     listener.fiberResumed(this);
 652                 } catch (Throwable e) {
 653                         if(isTraceEnabled())
 654                                 LOGGER.log(Level.FINE, "Listener {0} threw exception: {1}", new Object[]{listener, e.getMessage()});
 655                 }
 656             }
 657 
 658         } else if (onExitRunnable != null) {
 659             // synchronous use cases cannot disconnect from the current thread
 660             if (!synchronous) {
 661                 /* INTENTIONALLY UNLOCKING EARLY */
 662                 synchronized(this) {
 663                     // currentThread is protected by the monitor for this fiber so
 664                     // that it is accessible to cancel() even when the lock is held
 665                     currentThread = null;
 666                 }
 667                 lock.unlock();
 668                 assert(!lock.isHeldByCurrentThread());
 669                 isRequireUnlock.value = Boolean.FALSE;
 670 
 671                 try {
 672                     onExitRunnable.run();
 673                 } catch(Throwable t) {
 674                     throw new OnExitRunnableException(t);
 675                 }
 676 
 677                 return true;
 678 
 679             } else {
 680                 // for synchronous we will stay with current thread, so do not disconnect
 681                 if (isTraceEnabled())
 682                     LOGGER.fine("onExitRunnable used with synchronous Fiber execution -- not exiting current thread");
 683                 onExitRunnable.run();
 684             }
 685         }
 686 
 687         return false;
 688     }
 689 
 690     private static final class OnExitRunnableException extends RuntimeException {
 691         private static final long serialVersionUID = 1L;
 692 
 693         Throwable target;
 694 
 695         public OnExitRunnableException(Throwable target) {
 696             super((Throwable)null); // see pattern for InvocationTargetException
 697             this.target = target;
 698         }
 699     }
 700 
 701     /**
 702      * Adds a new {@link FiberContextSwitchInterceptor} to this fiber.
 703      * <p/>
 704      * <p/>
 705      * The newly installed fiber will take effect immediately after the current
 706      * tube returns from its {@link Tube#processRequest(Packet)} or
 707      * {@link Tube#processResponse(Packet)}, before the next tube begins processing.
 708      * <p/>
 709      * <p/>
 710      * So when the tubeline consists of X and Y, and when X installs an interceptor,
 711      * the order of execution will be as follows:
 712      * <p/>
 713      * <ol>
 714      * <li>X.processRequest()
 715      * <li>interceptor gets installed
 716      * <li>interceptor.execute() is invoked
 717      * <li>Y.processRequest()
 718      * </ol>
 719      */
 720     public synchronized void addInterceptor(@NotNull FiberContextSwitchInterceptor interceptor) {
 721         if (interceptors == null) {
 722             interceptors = new ArrayList<FiberContextSwitchInterceptor>();
 723         } else {
 724             List<FiberContextSwitchInterceptor> l = new ArrayList<FiberContextSwitchInterceptor>();
 725             l.addAll(interceptors);
 726             interceptors = l;
 727         }
 728         interceptors.add(interceptor);
 729     }
 730 
 731     /**
 732      * Removes a {@link FiberContextSwitchInterceptor} from this fiber.
 733      * <p/>
 734      * <p/>
 735      * The removal of the interceptor takes effect immediately after the current
 736      * tube returns from its {@link Tube#processRequest(Packet)} or
 737      * {@link Tube#processResponse(Packet)}, before the next tube begins processing.
 738      * <p/>
 739      * <p/>
 740      * <p/>
 741      * So when the tubeline consists of X and Y, and when Y uninstalls an interceptor
 742      * on the way out, then the order of execution will be as follows:
 743      * <p/>
 744      * <ol>
 745      * <li>Y.processResponse() (notice that this happens with interceptor.execute() in the callstack)
 746      * <li>interceptor gets uninstalled
 747      * <li>interceptor.execute() returns
 748      * <li>X.processResponse()
 749      * </ol>
 750      *
 751      * @return true if the specified interceptor was removed. False if
 752      *         the specified interceptor was not registered with this fiber to begin with.
 753      */
 754     public synchronized boolean removeInterceptor(@NotNull FiberContextSwitchInterceptor interceptor) {
 755         if (interceptors != null) {
 756             boolean result = interceptors.remove(interceptor);
 757             if (interceptors.isEmpty())
 758                 interceptors = null;
 759             else {
 760                 List<FiberContextSwitchInterceptor> l = new ArrayList<FiberContextSwitchInterceptor>();
 761                 l.addAll(interceptors);
 762                 interceptors = l;
 763             }
 764             return result;
 765         }
 766         return false;
 767     }
 768 
 769     /**
 770      * Gets the context {@link ClassLoader} of this fiber.
 771      */
 772     public
 773     @Nullable
 774     ClassLoader getContextClassLoader() {
 775         return contextClassLoader;
 776     }
 777 
 778     /**
 779      * Sets the context {@link ClassLoader} of this fiber.
 780      */
 781     public ClassLoader setContextClassLoader(@Nullable ClassLoader contextClassLoader) {
 782         ClassLoader r = this.contextClassLoader;
 783         this.contextClassLoader = contextClassLoader;
 784         return r;
 785     }
 786 
 787     /**
 788      * DO NOT CALL THIS METHOD. This is an implementation detail
 789      * of {@link Fiber}.
 790      */
 791     @Deprecated
 792     @Override
 793     public void run() {
 794         Container old = ContainerResolver.getDefault().enterContainer(owner.getContainer());
 795         try {
 796             assert !synchronous;
 797             // doRun returns true to indicate an early exit from fiber processing
 798             if (!doRun()) {
 799                 if (startedSync && suspendedCount == 0 &&
 800                     (next != null || contsSize > 0)) {
 801                     // We bailed out of running this fiber we started as sync, and now
 802                     // want to finish running it async
 803                     startedSync = false;
 804                     // Start back up as an async fiber
 805                     dumpFiberContext("restarting (async) after startSync");
 806                     owner.addRunnable(this);
 807                 } else {
 808                     completionCheck();
 809                 }
 810             }
 811         } finally {
 812             ContainerResolver.getDefault().exitContainer(old);
 813         }
 814     }
 815 
 816     /**
 817      * Runs a given {@link Tube} (and everything thereafter) synchronously.
 818      * <p/>
 819      * <p/>
 820      * This method blocks and returns only when all the successive {@link Tube}s
 821      * complete their request/response processing. This method can be used
 822      * if a {@link Tube} needs to fallback to synchronous processing.
 823      * <p/>
 824      * <h3>Example:</h3>
 825      * <pre>
 826      * class FooTube extends {@link AbstractFilterTubeImpl} {
 827      *   NextAction processRequest(Packet request) {
 828      *     // run everything synchronously and return with the response packet
 829      *     return doReturnWith(Fiber.current().runSync(next,request));
 830      *   }
 831      *   NextAction processResponse(Packet response) {
 832      *     // never be invoked
 833      *   }
 834      * }
 835      * </pre>
 836      *
 837      * @param tubeline The first tube of the tubeline that will act on the packet.
 838      * @param request  The request packet to be passed to {@code startPoint.processRequest()}.
 839      * @return The response packet to the {@code request}.
 840      * @see #start(Tube, Packet, CompletionCallback)
 841      */
 842     public
 843     @NotNull
 844     Packet runSync(@NotNull Tube tubeline, @NotNull Packet request) {
 845         lock.lock();
 846         try {
 847             // save the current continuation, so that we return runSync() without executing them.
 848             final Tube[] oldCont = conts;
 849             final int oldContSize = contsSize;
 850             final boolean oldSynchronous = synchronous;
 851             final Tube oldNext = next;
 852 
 853             if (oldContSize > 0) {
 854                 conts = new Tube[16];
 855                 contsSize = 0;
 856             }
 857 
 858             try {
 859                 synchronous = true;
 860                 this.packet = request;
 861                 next = tubeline;
 862                 doRun();
 863                 if (throwable != null) {
 864                     if (isDeliverThrowableInPacket) {
 865                         packet.addSatellite(new ThrowableContainerPropertySet(throwable));
 866                     } else {
 867                         if (throwable instanceof RuntimeException) {
 868                             throw (RuntimeException) throwable;
 869                         }
 870                         if (throwable instanceof Error) {
 871                             throw (Error) throwable;
 872                         }
 873                         // our system is supposed to only accept Error or RuntimeException
 874                         throw new AssertionError(throwable);
 875                     }
 876                 }
 877                 return this.packet;
 878             } finally {
 879                 conts = oldCont;
 880                 contsSize = oldContSize;
 881                 synchronous = oldSynchronous;
 882                 next = oldNext;
 883                 if(interrupted) {
 884                     Thread.currentThread().interrupt();
 885                     interrupted = false;
 886                 }
 887                 if(!started && !startedSync)
 888                     completionCheck();
 889             }
 890         } finally {
 891             lock.unlock();
 892         }
 893     }
 894 
 895     private void completionCheck() {
 896         lock.lock();
 897         try {
 898             // Don't trigger completion and callbacks if fiber is suspended
 899             if(!isCanceled && contsSize==0 && suspendedCount == 0) {
 900                 if(isTraceEnabled())
 901                     LOGGER.log(Level.FINE, "{0} completed", getName());
 902                 clearListeners();
 903                 condition.signalAll();
 904                 if (completionCallback != null) {
 905                     if (throwable != null) {
 906                         if (isDeliverThrowableInPacket) {
 907                             packet.addSatellite(new ThrowableContainerPropertySet(throwable));
 908                             completionCallback.onCompletion(packet);
 909                         } else
 910                             completionCallback.onCompletion(throwable);
 911                     } else
 912                         completionCallback.onCompletion(packet);
 913                 }
 914             }
 915         } finally {
 916             lock.unlock();
 917         }
 918     }
 919 
 920     /**
 921      * Invokes all registered {@link InterceptorHandler}s and then call into
 922      * {@link Fiber#__doRun()}.
 923      */
 924     private class InterceptorHandler implements FiberContextSwitchInterceptor.Work<Tube, Tube> {
 925         private final Holder<Boolean> isUnlockRequired;
 926         private final List<FiberContextSwitchInterceptor> ints;
 927 
 928         /**
 929          * Index in {@link Fiber#interceptors} to invoke next.
 930          */
 931         private int idx;
 932 
 933         public InterceptorHandler(Holder<Boolean> isUnlockRequired, List<FiberContextSwitchInterceptor> ints) {
 934             this.isUnlockRequired = isUnlockRequired;
 935             this.ints = ints;
 936         }
 937 
 938         /**
 939          * Initiate the interception, and eventually invokes {@link Fiber#__doRun()}.
 940          */
 941         Tube invoke(Tube next) {
 942             idx = 0;
 943             return execute(next);
 944         }
 945 
 946         @Override
 947         public Tube execute(Tube next) {
 948             if (idx == ints.size()) {
 949                 Fiber.this.next = next;
 950                 if (__doRun(isUnlockRequired, ints))
 951                     return PLACEHOLDER;
 952             } else {
 953                 FiberContextSwitchInterceptor interceptor = ints.get(idx++);
 954                 return interceptor.execute(Fiber.this, next, this);
 955             }
 956             return Fiber.this.next;
 957         }
 958     }
 959 
 960     private static final PlaceholderTube PLACEHOLDER = new PlaceholderTube();
 961 
 962     private static class PlaceholderTube extends AbstractTubeImpl {
 963 
 964         @Override
 965         public NextAction processRequest(Packet request) {
 966             throw new UnsupportedOperationException();
 967         }
 968 
 969         @Override
 970         public NextAction processResponse(Packet response) {
 971             throw new UnsupportedOperationException();
 972         }
 973 
 974         @Override
 975         public NextAction processException(Throwable t) {
 976             return doThrow(t);
 977         }
 978 
 979         @Override
 980         public void preDestroy() {
 981         }
 982 
 983         @Override
 984         public PlaceholderTube copy(TubeCloner cloner) {
 985             throw new UnsupportedOperationException();
 986         }
 987     }
 988 
 989     /**
 990      * Executes the fiber as much as possible.
 991      *
 992      */
 993     private boolean doRun() {
 994         dumpFiberContext("running");
 995 
 996         if (serializeExecution) {
 997             serializedExecutionLock.lock();
 998             try {
 999                 return _doRun(next);
1000             } finally {
1001                 serializedExecutionLock.unlock();
1002             }
1003         } else {
1004             return _doRun(next);
1005         }
1006     }
1007 
1008     private boolean _doRun(Tube next) {
1009         // isRequireUnlock will contain Boolean.FALSE when lock has already been released in suspend
1010         Holder<Boolean> isRequireUnlock = new Holder<Boolean>(Boolean.TRUE);
1011         lock.lock();
1012         try {
1013             List<FiberContextSwitchInterceptor> ints;
1014             ClassLoader old;
1015             synchronized(this) {
1016                 ints = interceptors;
1017 
1018                 // currentThread is protected by the monitor for this fiber so
1019                 // that it is accessible to cancel() even when the lock is held
1020                 currentThread = Thread.currentThread();
1021                 if (isTraceEnabled()) {
1022                     LOGGER.log(Level.FINE, "Thread entering _doRun(): {0}", currentThread);
1023                 }
1024 
1025                 old = currentThread.getContextClassLoader();
1026                 currentThread.setContextClassLoader(contextClassLoader);
1027             }
1028 
1029             try {
1030                 boolean needsToReenter;
1031                 do {
1032                     // if interceptors are set, go through the interceptors.
1033                     if (ints == null) {
1034                         this.next = next;
1035                         if (__doRun(isRequireUnlock, null /*ints*/)) {
1036                             return true;
1037                         }
1038                     } else {
1039                         next = new InterceptorHandler(isRequireUnlock, ints).invoke(next);
1040                         if (next == PLACEHOLDER) {
1041                             return true;
1042                         }
1043                     }
1044 
1045                     synchronized(this) {
1046                         needsToReenter = (ints != interceptors);
1047                         if (needsToReenter)
1048                             ints = interceptors;
1049                     }
1050                 } while (needsToReenter);
1051             } catch(OnExitRunnableException o) {
1052                 // catching this exception indicates onExitRunnable in suspend() threw.
1053                 // we must still avoid double unlock
1054                 Throwable t = o.target;
1055                 if (t instanceof WebServiceException)
1056                     throw (WebServiceException) t;
1057                 throw new WebServiceException(t);
1058             } finally {
1059                 // don't reference currentThread here because fiber processing
1060                 // may already be running on a different thread (Note: isAlreadyExited
1061                 // tracks this state
1062                 Thread thread = Thread.currentThread();
1063                 thread.setContextClassLoader(old);
1064                 if (isTraceEnabled()) {
1065                     LOGGER.log(Level.FINE, "Thread leaving _doRun(): {0}", thread);
1066                 }
1067             }
1068 
1069             return false;
1070         } finally {
1071             if (isRequireUnlock.value) {
1072                 synchronized(this) {
1073                     currentThread = null;
1074                 }
1075                 lock.unlock();
1076             }
1077         }
1078     }
1079 
1080     /**
1081      * To be invoked from {@link #doRun()}.
1082      *
1083      * @see #doRun()
1084      */
1085     private boolean __doRun(Holder<Boolean> isRequireUnlock, List<FiberContextSwitchInterceptor> originalInterceptors) {
1086         assert(lock.isHeldByCurrentThread());
1087 
1088         final Fiber old = CURRENT_FIBER.get();
1089         CURRENT_FIBER.set(this);
1090 
1091         // if true, lots of debug messages to show what's being executed
1092         final boolean traceEnabled = LOGGER.isLoggable(Level.FINER);
1093 
1094         try {
1095             boolean abortResponse = false;
1096             while(isReady(originalInterceptors)) {
1097                 if (isCanceled) {
1098                     next = null;
1099                     throwable = null;
1100                     contsSize = 0;
1101                     break;
1102                 }
1103 
1104                 try {
1105                     NextAction na;
1106                     Tube last;
1107                     if(throwable!=null) {
1108                         if(contsSize==0 || abortResponse) {
1109                             contsSize = 0; // abortResponse case
1110                             // nothing else to execute. we are done.
1111                             return false;
1112                         }
1113                         last = popCont();
1114                         if (traceEnabled)
1115                             LOGGER.log(Level.FINER, "{0} {1}.processException({2})", new Object[]{getName(), last, throwable});
1116                         na = last.processException(throwable);
1117                     } else {
1118                         if(next!=null) {
1119                             if(traceEnabled)
1120                                 LOGGER.log(Level.FINER, "{0} {1}.processRequest({2})", new Object[]{getName(), next, packet != null ? "Packet@"+Integer.toHexString(packet.hashCode()) : "null"});
1121                             na = next.processRequest(packet);
1122                             last = next;
1123                         } else {
1124                             if(contsSize==0 || abortResponse) {
1125                                 // nothing else to execute. we are done.
1126                                 contsSize = 0;
1127                                 return false;
1128                             }
1129                             last = popCont();
1130                             if(traceEnabled)
1131                                 LOGGER.log(Level.FINER, "{0} {1}.processResponse({2})", new Object[]{getName(), last, packet != null ? "Packet@"+Integer.toHexString(packet.hashCode()) : "null"});
1132                             na = last.processResponse(packet);
1133                         }
1134                     }
1135 
1136                     if (traceEnabled)
1137                         LOGGER.log(Level.FINER, "{0} {1} returned with {2}", new Object[]{getName(), last, na});
1138 
1139                     // If resume is called before suspend, then make sure
1140                     // resume(Packet) is not lost
1141                     if (na.kind != NextAction.SUSPEND) {
1142                         // preserve in-flight packet so that processException may inspect
1143                         if (na.kind != NextAction.THROW &&
1144                           na.kind != NextAction.THROW_ABORT_RESPONSE)
1145                                 packet = na.packet;
1146                         throwable = na.throwable;
1147                     }
1148 
1149                     switch(na.kind) {
1150                     case NextAction.INVOKE:
1151                     case NextAction.INVOKE_ASYNC:
1152                         pushCont(last);
1153                         // fall through next
1154                     case NextAction.INVOKE_AND_FORGET:
1155                         next = na.next;
1156                         if (na.kind == NextAction.INVOKE_ASYNC
1157                             && startedSync) {
1158                           // Break out here
1159                           return false;
1160                         }
1161                         break;
1162                     case NextAction.THROW_ABORT_RESPONSE:
1163                     case NextAction.ABORT_RESPONSE:
1164                         abortResponse = true;
1165                         if (isTraceEnabled()) {
1166                           LOGGER.log(Level.FINE, "Fiber {0} is aborting a response due to exception: {1}", new Object[]{this, na.throwable});
1167                         }
1168                     case NextAction.RETURN:
1169                     case NextAction.THROW:
1170                         next = null;
1171                         break;
1172                     case NextAction.SUSPEND:
1173                         if (next != null) {
1174                           // Only store the 'last' tube when we're processing
1175                           // a request, since conts array is for processResponse
1176                           pushCont(last);
1177                         }
1178                         next = na.next;
1179                         if(suspend(isRequireUnlock, na.onExitRunnable))
1180                             return true; // explicitly exiting control loop
1181                         break;
1182                     default:
1183                         throw new AssertionError();
1184                     }
1185                 } catch (RuntimeException t) {
1186                     if (traceEnabled)
1187                         LOGGER.log(Level.FINER, getName() + " Caught " + t + ". Start stack unwinding", t);
1188                     throwable = t;
1189                 } catch (Error t) {
1190                     if (traceEnabled)
1191                         LOGGER.log(Level.FINER, getName() + " Caught " + t + ". Start stack unwinding", t);
1192                     throwable = t;
1193                 }
1194 
1195                 dumpFiberContext("After tube execution");
1196             }
1197 
1198             // there's nothing we can execute right away.
1199             // we'll be back when this fiber is resumed.
1200 
1201         } finally {
1202             CURRENT_FIBER.set(old);
1203         }
1204 
1205         return false;
1206     }
1207 
1208     private void pushCont(Tube tube) {
1209         conts[contsSize++] = tube;
1210 
1211         // expand if needed
1212         int len = conts.length;
1213         if (contsSize == len) {
1214             Tube[] newBuf = new Tube[len * 2];
1215             System.arraycopy(conts, 0, newBuf, 0, len);
1216             conts = newBuf;
1217         }
1218     }
1219 
1220     private Tube popCont() {
1221         return conts[--contsSize];
1222     }
1223 
1224     private Tube peekCont() {
1225         int index = contsSize - 1;
1226         if (index >= 0 && index < conts.length) {
1227           return conts[index];
1228         } else {
1229           return null;
1230         }
1231     }
1232 
1233     /**
1234      * Only to be used by Tubes that manipulate the Fiber to create alternate flows
1235      * @since 2.2.6
1236      */
1237     public void resetCont(Tube[] conts, int contsSize) {
1238         this.conts = conts;
1239         this.contsSize = contsSize;
1240     }
1241 
1242     /**
1243      * Returns true if the fiber is ready to execute.
1244      */
1245     private boolean isReady(List<FiberContextSwitchInterceptor> originalInterceptors) {
1246         if (synchronous) {
1247             while (suspendedCount == 1)
1248                 try {
1249                     if (isTraceEnabled()) {
1250                         LOGGER.log(Level.FINE, "{0} is blocking thread {1}", new Object[]{getName(), Thread.currentThread().getName()});
1251                     }
1252                     condition.await(); // the synchronized block is the whole runSync method.
1253                 } catch (InterruptedException e) {
1254                     // remember that we are interrupted, but don't respond to it
1255                     // right away. This behavior is in line with what happens
1256                     // when you are actually running the whole thing synchronously.
1257                     interrupted = true;
1258                 }
1259 
1260             synchronized(this) {
1261                 return interceptors == originalInterceptors;
1262             }
1263         }
1264         else {
1265             if (suspendedCount>0)
1266                 return false;
1267             synchronized(this) {
1268                 return interceptors == originalInterceptors;
1269             }
1270         }
1271     }
1272 
1273     private String getName() {
1274         return "engine-" + owner.id + "fiber-" + id;
1275     }
1276 
1277     @Override
1278     public String toString() {
1279         return getName();
1280     }
1281 
1282     /**
1283      * Gets the current {@link Packet} associated with this fiber.
1284      * <p/>
1285      * <p/>
1286      * This method returns null if no packet has been associated with the fiber yet.
1287      */
1288     public
1289     @Nullable
1290     Packet getPacket() {
1291         return packet;
1292     }
1293 
1294     /**
1295      * Returns completion callback associated with this Fiber
1296      * @return Completion callback
1297      * @since 2.2.6
1298      */
1299     public CompletionCallback getCompletionCallback() {
1300         return completionCallback;
1301     }
1302 
1303     /**
1304      * Updates completion callback associated with this Fiber
1305      * @param completionCallback Completion callback
1306      * @since 2.2.6
1307      */
1308     public void setCompletionCallback(CompletionCallback completionCallback) {
1309         this.completionCallback = completionCallback;
1310     }
1311 
1312     /**
1313      * (ADVANCED) Returns true if the current fiber is being executed synchronously.
1314      * <p/>
1315      * <p/>
1316      * Fiber may run synchronously for various reasons. Perhaps this is
1317      * on client side and application has invoked a synchronous method call.
1318      * Perhaps this is on server side and we have deployed on a synchronous
1319      * transport (like servlet.)
1320      * <p/>
1321      * <p/>
1322      * When a fiber is run synchronously (IOW by {@link #runSync(Tube, Packet)}),
1323      * further invocations to {@link #runSync(Tube, Packet)} can be done
1324      * without degrading the performance.
1325      * <p/>
1326      * <p/>
1327      * So this value can be used as a further optimization hint for
1328      * advanced {@link Tube}s to choose the best strategy to invoke
1329      * the next {@link Tube}. For example, a tube may want to install
1330      * a {@link FiberContextSwitchInterceptor} if running async, yet
1331      * it might find it faster to do {@link #runSync(Tube, Packet)}
1332      * if it's already running synchronously.
1333      */
1334     public static boolean isSynchronous() {
1335         return current().synchronous;
1336     }
1337 
1338     /**
1339      * Returns true if the current Fiber on the current thread was started
1340      * synchronously. Note, this is not strictly the same as being synchronous
1341      * because the assumption is that the Fiber will ultimately be dispatched
1342      * asynchronously, possibly have a completion callback associated with it, etc.
1343      * Note, the 'startedSync' flag is cleared once the current Fiber is
1344      * converted to running asynchronously.
1345      * @since 2.2.6
1346      */
1347     public boolean isStartedSync() {
1348         return startedSync;
1349     }
1350 
1351     /**
1352      * Gets the current fiber that's running.
1353      * <p/>
1354      * <p/>
1355      * This works like {@link Thread#currentThread()}.
1356      * This method only works when invoked from {@link Tube}.
1357      */
1358     public static
1359     @NotNull
1360     @SuppressWarnings({"null", "ConstantConditions"})
1361     Fiber current() {
1362         Fiber fiber = CURRENT_FIBER.get();
1363         if (fiber == null)
1364             throw new IllegalStateException("Can be only used from fibers");
1365         return fiber;
1366     }
1367 
1368     /**
1369      * Gets the current fiber that's running, if set.
1370      */
1371     public static Fiber getCurrentIfSet() {
1372         return CURRENT_FIBER.get();
1373     }
1374 
1375     private static final ThreadLocal<Fiber> CURRENT_FIBER = new ThreadLocal<Fiber>();
1376 
1377     /**
1378      * Used to allocate unique number for each fiber.
1379      */
1380     private static final AtomicInteger iotaGen = new AtomicInteger();
1381 
1382     private static boolean isTraceEnabled() {
1383         return LOGGER.isLoggable(Level.FINE);
1384     }
1385 
1386     private static final Logger LOGGER = Logger.getLogger(Fiber.class.getName());
1387 
1388 
1389     private static final ReentrantLock serializedExecutionLock = new ReentrantLock();
1390 
1391     /**
1392      * Set this boolean to true to execute fibers sequentially one by one.
1393      * See class javadoc.
1394      */
1395     public static volatile boolean serializeExecution = Boolean.getBoolean(Fiber.class.getName() + ".serialize");
1396 
1397     private final Set<Component> components = new CopyOnWriteArraySet<Component>();
1398 
1399     @Override
1400     public <S> S getSPI(Class<S> spiType) {
1401         for (Component c : components) {
1402             S spi = c.getSPI(spiType);
1403             if (spi != null) {
1404                 return spi;
1405             }
1406         }
1407         return null;
1408     }
1409 
1410     @Override
1411     public Set<Component> getComponents() {
1412         return components;
1413     }
1414 }