1 /* 2 * Copyright (c) 1997, 2010, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package com.sun.xml.internal.ws.api.pipe; 27 28 import com.sun.istack.internal.NotNull; 29 import com.sun.istack.internal.Nullable; 30 import com.sun.xml.internal.ws.api.Cancelable; 31 import com.sun.xml.internal.ws.api.Component; 32 import com.sun.xml.internal.ws.api.ComponentRegistry; 33 import com.sun.xml.internal.ws.api.SOAPVersion; 34 import com.sun.xml.internal.ws.api.addressing.AddressingVersion; 35 import com.sun.xml.internal.ws.api.message.Packet; 36 import com.sun.xml.internal.ws.api.pipe.helper.AbstractFilterTubeImpl; 37 import com.sun.xml.internal.ws.api.server.Adapter; 38 39 import java.util.ArrayList; 40 import java.util.HashSet; 41 import java.util.List; 42 import java.util.Set; 43 import java.util.concurrent.CopyOnWriteArraySet; 44 import java.util.concurrent.atomic.AtomicInteger; 45 import java.util.concurrent.locks.ReentrantLock; 46 import java.util.logging.Level; 47 import java.util.logging.Logger; 48 49 /** 50 * User-level thread. Represents the execution of one request/response processing. 51 * <p/> 52 * <p/> 53 * JAX-WS RI is capable of running a large number of request/response concurrently by 54 * using a relatively small number of threads. This is made possible by utilizing 55 * a {@link Fiber} — a user-level thread that gets created for each request/response 56 * processing. 57 * <p/> 58 * <p/> 59 * A fiber remembers where in the pipeline the processing is at, what needs to be 60 * executed on the way out (when processing response), and other additional information 61 * specific to the execution of a particular request/response. 62 * <p/> 63 * <h2>Suspend/Resume</h2> 64 * <p/> 65 * Fiber can be {@link NextAction#suspend() suspended} by a {@link Tube}. 66 * When a fiber is suspended, it will be kept on the side until it is 67 * {@link #resume(Packet) resumed}. This allows threads to go execute 68 * other runnable fibers, allowing efficient utilization of smaller number of 69 * threads. 70 * <p/> 71 * <h2>Context-switch Interception</h2> 72 * <p/> 73 * {@link FiberContextSwitchInterceptor} allows {@link Tube}s and {@link Adapter}s 74 * to perform additional processing every time a thread starts running a fiber 75 * and stops running it. 76 * <p/> 77 * <h2>Context ClassLoader</h2> 78 * <p/> 79 * Just like thread, a fiber has a context class loader (CCL.) A fiber's CCL 80 * becomes the thread's CCL when it's executing the fiber. The original CCL 81 * of the thread will be restored when the thread leaves the fiber execution. 82 * <p/> 83 * <p/> 84 * <h2>Debugging Aid</h2> 85 * <p/> 86 * Because {@link Fiber} doesn't keep much in the call stack, and instead use 87 * {@link #conts} to store the continuation, debugging fiber related activities 88 * could be harder. 89 * <p/> 90 * <p/> 91 * Setting the {@link #LOGGER} for FINE would give you basic start/stop/resume/suspend 92 * level logging. Using FINER would cause more detailed logging, which includes 93 * what tubes are executed in what order and how they behaved. 94 * <p/> 95 * <p/> 96 * When you debug the server side, consider setting {@link Fiber#serializeExecution} 97 * to true, so that execution of fibers are serialized. Debugging a server 98 * with more than one running threads is very tricky, and this switch will 99 * prevent that. This can be also enabled by setting the system property on. 100 * See the source code. 101 * 102 * @author Kohsuke Kawaguchi 103 * @author Jitendra Kotamraju 104 */ 105 public final class Fiber implements Runnable, Cancelable, ComponentRegistry { 106 107 /** 108 * Callback interface for notification of suspend and resume. 109 * 110 * @since 2.2.6 111 */ 112 public interface Listener { 113 /** 114 * Fiber has been suspended. Implementations of this callback may resume the Fiber. 115 * @param fiber Fiber 116 */ 117 public void fiberSuspended(Fiber fiber); 118 119 /** 120 * Fiber has been resumed. Behavior is undefined if implementations of this callback attempt to suspend the Fiber. 121 * @param fiber Fiber 122 */ 123 public void fiberResumed(Fiber fiber); 124 } 125 126 private List<Listener> _listeners = new ArrayList<Listener>(); 127 128 /** 129 * Adds suspend/resume callback listener 130 * @param listener Listener 131 * @since 2.2.6 132 */ 133 public void addListener(Listener listener) { 134 synchronized(_listeners) { 135 if (!_listeners.contains(listener)) { 136 _listeners.add(listener); 137 } 138 } 139 } 140 141 /** 142 * Removes suspend/resume callback listener 143 * @param listener Listener 144 * @since 2.2.6 145 */ 146 public void removeListener(Listener listener) { 147 synchronized(_listeners) { 148 _listeners.remove(listener); 149 } 150 } 151 152 private List<Listener> getCurrentListeners() { 153 synchronized(_listeners) { 154 return new ArrayList<Listener>(_listeners); 155 } 156 } 157 158 private void clearListeners() { 159 synchronized(_listeners) { 160 _listeners.clear(); 161 } 162 } 163 164 /** 165 * {@link Tube}s whose {@link Tube#processResponse(Packet)} method needs 166 * to be invoked on the way back. 167 */ 168 private Tube[] conts = new Tube[16]; 169 private int contsSize; 170 171 /** 172 * If this field is non-null, the next instruction to execute is 173 * to call its {@link Tube#processRequest(Packet)}. Otherwise 174 * the instruction is to call {@link #conts}. 175 */ 176 private Tube next; 177 178 private Packet packet; 179 180 private Throwable/*but really it's either RuntimeException or Error*/ throwable; 181 182 public final Engine owner; 183 184 /** 185 * Is this thread suspended? 0=not suspended, 1=suspended. 186 * <p/> 187 * <p/> 188 * Logically this is just a boolean, but we need to prepare for the case 189 * where the thread is {@link #resume(Packet) resumed} before we get to the {@link #suspend()}. 190 * This happens when things happen in the following order: 191 * <p/> 192 * <ol> 193 * <li>Tube decides that the fiber needs to be suspended to wait for the external event. 194 * <li>Tube hooks up fiber with some external mechanism (like NIO channel selector) 195 * <li>Tube returns with {@link NextAction#suspend()}. 196 * <li>"External mechanism" becomes signal state and invokes {@link Fiber#resume(Packet)} 197 * to wake up fiber 198 * <li>{@link Fiber#doRun} invokes {@link Fiber#suspend()}. 199 * </ol> 200 * <p/> 201 * <p/> 202 * Using int, this will work OK because {@link #suspendedCount} becomes -1 when 203 * {@link #resume(Packet)} occurs before {@link #suspend()}. 204 * <p/> 205 * <p/> 206 * Increment and decrement is guarded by 'this' object. 207 */ 208 private volatile int suspendedCount = 0; 209 210 private volatile boolean isInsideSuspendCallbacks = false; 211 212 /** 213 * Is this fiber completed? 214 */ 215 private volatile boolean completed; 216 217 /** 218 * Is this {@link Fiber} currently running in the synchronous mode? 219 */ 220 private boolean synchronous; 221 222 private boolean interrupted; 223 224 private final int id; 225 226 /** 227 * Active {@link FiberContextSwitchInterceptor}s for this fiber. 228 */ 229 private List<FiberContextSwitchInterceptor> interceptors; 230 231 /** 232 * Not null when {@link #interceptors} is not null. 233 */ 234 private InterceptorHandler interceptorHandler; 235 236 /** 237 * This flag is set to true when a new interceptor is added. 238 * <p/> 239 * When that happens, we need to first exit the current interceptors 240 * and then reenter them, so that the newly added interceptors start 241 * taking effect. This flag is used to control that flow. 242 */ 243 private boolean needsToReenter; 244 245 /** 246 * Fiber's context {@link ClassLoader}. 247 */ 248 private 249 @Nullable 250 ClassLoader contextClassLoader; 251 252 private 253 @Nullable 254 CompletionCallback completionCallback; 255 256 /** 257 * The thread on which this Fiber is currently executing, if applicable. 258 */ 259 private Thread currentThread; 260 261 private volatile boolean isCanceled; 262 263 /** 264 * Set to true if this fiber is started asynchronously, to avoid 265 * doubly-invoking completion code. 266 */ 267 private boolean started; 268 269 /** 270 * Set to true if this fiber is started sync but allowed to run async. 271 * This property exists for use cases where the processing model is fundamentally async 272 * but some requirement or feature mandates that part of the tubeline run synchronously. For 273 * instance, WS-ReliableMessaging with non-anonymous addressing is compatible with running 274 * asynchronously, but if in-order message delivery is used then message processing must assign 275 * a message number before the remainder of the processing can be asynchronous. 276 */ 277 private boolean startedSync; 278 279 /** 280 * Callback to be invoked when a {@link Fiber} finishs execution. 281 */ 282 public interface CompletionCallback { 283 /** 284 * Indicates that the fiber has finished its execution. 285 * <p/> 286 * <p/> 287 * Since the JAX-WS RI runs asynchronously, 288 * this method maybe invoked by a different thread 289 * than any of the threads that started it or run a part of tubeline. 290 */ 291 void onCompletion(@NotNull Packet response); 292 293 /** 294 * Indicates that the fiber has finished abnormally, by throwing a given {@link Throwable}. 295 */ 296 void onCompletion(@NotNull Throwable error); 297 } 298 299 Fiber(Engine engine) { 300 this.owner = engine; 301 if (isTraceEnabled()) { 302 id = iotaGen.incrementAndGet(); 303 LOGGER.fine(getName() + " created"); 304 } else { 305 id = -1; 306 } 307 308 // if this is run from another fiber, then we naturally inherit its context classloader, 309 // so this code works for fiber->fiber inheritance just fine. 310 contextClassLoader = Thread.currentThread().getContextClassLoader(); 311 } 312 313 /** 314 * Starts the execution of this fiber asynchronously. 315 * <p/> 316 * <p/> 317 * This method works like {@link Thread#start()}. 318 * 319 * @param tubeline The first tube of the tubeline that will act on the packet. 320 * @param request The request packet to be passed to <tt>startPoint.processRequest()</tt>. 321 * @param completionCallback The callback to be invoked when the processing is finished and the 322 * final response packet is available. 323 * @see #runSync(Tube, Packet) 324 */ 325 public void start(@NotNull Tube tubeline, @NotNull Packet request, @Nullable CompletionCallback completionCallback) { 326 start(tubeline, request, completionCallback, false); 327 } 328 329 private void dumpFiberContext(String desc) { 330 if(isTraceEnabled()) { 331 String action = null; 332 String msgId = null; 333 if (packet != null) { 334 for (SOAPVersion sv: SOAPVersion.values()) { 335 for (AddressingVersion av: AddressingVersion.values()) { 336 action = packet.getMessage() != null ? packet.getMessage().getHeaders().getAction(av, sv) : null; 337 msgId = packet.getMessage() != null ? packet.getMessage().getHeaders().getMessageID(av, sv) : null; 338 if (action != null || msgId != null) { 339 break; 340 } 341 } 342 if (action != null || msgId != null) { 343 break; 344 } 345 } 346 } 347 String actionAndMsgDesc; 348 if (action == null && msgId == null) { 349 actionAndMsgDesc = "NO ACTION or MSG ID"; 350 } else { 351 actionAndMsgDesc = "'" + action + "' and msgId '" + msgId + "'"; 352 } 353 354 String tubeDesc; 355 if (next != null) { 356 tubeDesc = next.toString() + ".processRequest()"; 357 } else { 358 tubeDesc = peekCont() + ".processResponse()"; 359 } 360 361 LOGGER.fine(getName() + " " + desc + " with " + actionAndMsgDesc + " and 'current' tube " + tubeDesc + " from thread " + Thread.currentThread().getName() + " with Packet: " + (packet != null ? packet.toShortString() : null)); 362 } 363 } 364 365 /** 366 * Starts the execution of this fiber. 367 * 368 * If forceSync is true, then the fiber is started for an ostensibly async invocation, 369 * but allows for some portion of the tubeline to run sync with the calling 370 * client instance (Port/Dispatch instance). This allows tubes that enforce 371 * ordering to see requests in the order they were sent at the point the 372 * client invoked them. 373 * <p> 374 * The forceSync parameter will be true only when the caller (e.g. AsyncInvoker or 375 * SEIStub) knows one or more tubes need to enforce ordering and thus need 376 * to run sync with the client. Such tubes can return 377 * NextAction.INVOKE_ASYNC to indicate that the next tube in the tubeline 378 * should be invoked async to the current thread. 379 * 380 * <p> 381 * This method works like {@link Thread#start()}. 382 * 383 * @param tubeline 384 * The first tube of the tubeline that will act on the packet. 385 * @param request 386 * The request packet to be passed to <tt>startPoint.processRequest()</tt>. 387 * @param completionCallback 388 * The callback to be invoked when the processing is finished and the 389 * final response packet is available. 390 * 391 * @see #start(Tube,Packet,CompletionCallback) 392 * @see #runSync(Tube,Packet) 393 * @since 2.2.6 394 */ 395 public void start(@NotNull Tube tubeline, @NotNull Packet request, @Nullable CompletionCallback completionCallback, boolean forceSync) { 396 next = tubeline; 397 this.packet = request; 398 this.completionCallback = completionCallback; 399 400 if (forceSync) { 401 this.startedSync = true; 402 dumpFiberContext("starting (sync)"); 403 run(); 404 } else { 405 this.started = true; 406 dumpFiberContext("starting (async)"); 407 owner.addRunnable(this); 408 } 409 } 410 411 /** 412 * Wakes up a suspended fiber. 413 * <p/> 414 * <p/> 415 * If a fiber was suspended without specifying the next {@link Tube}, 416 * then the execution will be resumed in the response processing direction, 417 * by calling the {@link Tube#processResponse(Packet)} method on the next/first 418 * {@link Tube} in the {@link Fiber}'s processing stack with the specified resume 419 * packet as the parameter. 420 * <p/> 421 * <p/> 422 * If a fiber was suspended with specifying the next {@link Tube}, 423 * then the execution will be resumed in the request processing direction, 424 * by calling the next tube's {@link Tube#processRequest(Packet)} method with the 425 * specified resume packet as the parameter. 426 * <p/> 427 * <p/> 428 * This method is implemented in a race-free way. Another thread can invoke 429 * this method even before this fiber goes into the suspension mode. So the caller 430 * need not worry about synchronizing {@link NextAction#suspend()} and this method. 431 * 432 * @param resumePacket packet used in the resumed processing 433 */ 434 public void resume(@NotNull Packet resumePacket) { 435 resume(resumePacket, false); 436 } 437 438 /** 439 * Similar to resume(Packet) but allowing the Fiber to be resumed 440 * synchronously (in the current Thread). If you want to know when the 441 * fiber completes (not when this method returns) then add/wrap a 442 * CompletionCallback on this Fiber. 443 * For example, an asynchronous response endpoint that supports WS-ReliableMessaging 444 * including in-order message delivery may need to resume the Fiber synchronously 445 * until message order is confirmed prior to returning to asynchronous processing. 446 * @since 2.2.6 447 */ 448 public synchronized void resume(@NotNull Packet resumePacket, 449 boolean forceSync) { 450 resume(resumePacket, forceSync, null); 451 } 452 453 /** 454 * Similar to resume(Packet, boolean) but allowing the Fiber to be resumed 455 * and at the same time atomically assign a new CompletionCallback to it. 456 * @since 2.2.6 457 */ 458 public void resume(@NotNull Packet resumePacket, 459 boolean forceSync, 460 CompletionCallback callback) { 461 462 synchronized(this) { 463 if (callback != null) { 464 setCompletionCallback(callback); 465 } 466 if(isTraceEnabled()) 467 LOGGER.fine(getName()+" resuming. Will have suspendedCount=" + (suspendedCount-1)); 468 packet = resumePacket; 469 if( --suspendedCount == 0 ) { 470 if (!isInsideSuspendCallbacks) { 471 List<Listener> listeners = getCurrentListeners(); 472 for (Listener listener: listeners) { 473 try { 474 listener.fiberResumed(this); 475 } catch (Throwable e) { 476 if (isTraceEnabled()) 477 LOGGER.fine("Listener " + listener + " threw exception: " + e.getMessage()); 478 } 479 } 480 481 if(synchronous) { 482 notifyAll(); 483 } else if (forceSync || startedSync) { 484 run(); 485 } else { 486 dumpFiberContext("resuming (async)"); 487 owner.addRunnable(this); 488 } 489 } 490 } else { 491 if (isTraceEnabled()) { 492 LOGGER.fine(getName() + " taking no action on resume because suspendedCount != 0: " + suspendedCount); 493 } 494 } 495 } 496 } 497 498 /** 499 * Wakes up a suspended fiber and begins response processing. 500 * @since 2.2.6 501 */ 502 public synchronized void resumeAndReturn(@NotNull Packet resumePacket, 503 boolean forceSync) { 504 if(isTraceEnabled()) 505 LOGGER.fine(getName()+" resumed with Return Packet"); 506 next = null; 507 resume(resumePacket, forceSync); 508 } 509 510 /** 511 * Wakes up a suspended fiber with an exception. 512 * <p/> 513 * <p/> 514 * The execution of the suspended fiber will be resumed in the response 515 * processing direction, by calling the {@link Tube#processException(Throwable)} method 516 * on the next/first {@link Tube} in the {@link Fiber}'s processing stack with 517 * the specified exception as the parameter. 518 * <p/> 519 * <p/> 520 * This method is implemented in a race-free way. Another thread can invoke 521 * this method even before this fiber goes into the suspension mode. So the caller 522 * need not worry about synchronizing {@link NextAction#suspend()} and this method. 523 * 524 * @param throwable exception that is used in the resumed processing 525 */ 526 public synchronized void resume(@NotNull Throwable throwable) { 527 resume(throwable, false); 528 } 529 530 /** 531 * Wakes up a suspend fiber with an exception. 532 * 533 * If forceSync is true, then the suspended fiber will resume with 534 * synchronous processing on the current thread. This will continue 535 * until some Tube indicates that it is safe to switch to asynchronous 536 * processing. 537 * 538 * @param error exception that is used in the resumed processing 539 * @param forceSync if processing begins synchronously 540 * @since 2.2.6 541 */ 542 public synchronized void resume(@NotNull Throwable error, 543 boolean forceSync) { 544 if(isTraceEnabled()) 545 LOGGER.fine(getName()+" resumed with Return Throwable"); 546 next = null; 547 throwable = error; 548 resume(packet, forceSync); 549 } 550 551 /** 552 * Marks this Fiber as cancelled. A cancelled Fiber will never invoke its completion callback 553 * @param mayInterrupt if cancel should use {@link Thread.interrupt()} 554 * @see java.util.Future.cancel 555 * @since 2.2.6 556 */ 557 public void cancel(boolean mayInterrupt) { 558 isCanceled = true; 559 if (mayInterrupt) { 560 synchronized(this) { 561 if (currentThread != null) 562 currentThread.interrupt(); 563 } 564 } 565 } 566 567 /** 568 * Suspends this fiber's execution until the resume method is invoked. 569 * <p/> 570 * The call returns immediately, and when the fiber is resumed 571 * the execution picks up from the last scheduled continuation. 572 */ 573 private boolean suspend() { 574 575 synchronized(this) { 576 if(isTraceEnabled()) { 577 LOGGER.fine(getName()+" suspending. Will have suspendedCount=" + (suspendedCount+1)); 578 if (suspendedCount > 0) { 579 LOGGER.fine("WARNING - " + getName()+" suspended more than resumed. Will require more than one resume to actually resume this fiber."); 580 } 581 } 582 583 List<Listener> listeners = getCurrentListeners(); 584 if (++suspendedCount == 1) { 585 isInsideSuspendCallbacks = true; 586 try { 587 for (Listener listener: listeners) { 588 try { 589 listener.fiberSuspended(this); 590 } catch (Throwable e) { 591 if(isTraceEnabled()) 592 LOGGER.fine("Listener " + listener + " threw exception: " + e.getMessage()); 593 } 594 } 595 } finally { 596 isInsideSuspendCallbacks = false; 597 } 598 } 599 600 if (suspendedCount <= 0) { 601 // suspend callback caused fiber to resume 602 for (Listener listener: listeners) { 603 try { 604 listener.fiberResumed(this); 605 } catch (Throwable e) { 606 if(isTraceEnabled()) 607 LOGGER.fine("Listener " + listener + " threw exception: " + e.getMessage()); 608 } 609 } 610 611 return false; 612 } 613 614 return true; 615 } 616 } 617 618 /** 619 * Adds a new {@link FiberContextSwitchInterceptor} to this fiber. 620 * <p/> 621 * <p/> 622 * The newly installed fiber will take effect immediately after the current 623 * tube returns from its {@link Tube#processRequest(Packet)} or 624 * {@link Tube#processResponse(Packet)}, before the next tube begins processing. 625 * <p/> 626 * <p/> 627 * So when the tubeline consists of X and Y, and when X installs an interceptor, 628 * the order of execution will be as follows: 629 * <p/> 630 * <ol> 631 * <li>X.processRequest() 632 * <li>interceptor gets installed 633 * <li>interceptor.execute() is invoked 634 * <li>Y.processRequest() 635 * </ol> 636 */ 637 public void addInterceptor(@NotNull FiberContextSwitchInterceptor interceptor) { 638 if (interceptors == null) { 639 interceptors = new ArrayList<FiberContextSwitchInterceptor>(); 640 interceptorHandler = new InterceptorHandler(); 641 } 642 interceptors.add(interceptor); 643 needsToReenter = true; 644 } 645 646 /** 647 * Removes a {@link FiberContextSwitchInterceptor} from this fiber. 648 * <p/> 649 * <p/> 650 * The removal of the interceptor takes effect immediately after the current 651 * tube returns from its {@link Tube#processRequest(Packet)} or 652 * {@link Tube#processResponse(Packet)}, before the next tube begins processing. 653 * <p/> 654 * <p/> 655 * <p/> 656 * So when the tubeline consists of X and Y, and when Y uninstalls an interceptor 657 * on the way out, then the order of execution will be as follows: 658 * <p/> 659 * <ol> 660 * <li>Y.processResponse() (notice that this happens with interceptor.execute() in the callstack) 661 * <li>interceptor gets uninstalled 662 * <li>interceptor.execute() returns 663 * <li>X.processResponse() 664 * </ol> 665 * 666 * @return true if the specified interceptor was removed. False if 667 * the specified interceptor was not registered with this fiber to begin with. 668 */ 669 public boolean removeInterceptor(@NotNull FiberContextSwitchInterceptor interceptor) { 670 if (interceptors != null && interceptors.remove(interceptor)) { 671 needsToReenter = true; 672 return true; 673 } 674 return false; 675 } 676 677 /** 678 * Gets the context {@link ClassLoader} of this fiber. 679 */ 680 public 681 @Nullable 682 ClassLoader getContextClassLoader() { 683 return contextClassLoader; 684 } 685 686 /** 687 * Sets the context {@link ClassLoader} of this fiber. 688 */ 689 public ClassLoader setContextClassLoader(@Nullable ClassLoader contextClassLoader) { 690 ClassLoader r = this.contextClassLoader; 691 this.contextClassLoader = contextClassLoader; 692 return r; 693 } 694 695 /** 696 * DO NOT CALL THIS METHOD. This is an implementation detail 697 * of {@link Fiber}. 698 */ 699 @Deprecated 700 public void run() { 701 assert !synchronous; 702 doRun(); 703 if (startedSync && suspendedCount == 0 && 704 (next != null || contsSize > 0)) { 705 // We bailed out of running this fiber we started as sync, and now 706 // want to finish running it async 707 startedSync = false; 708 // Start back up as an async fiber 709 dumpFiberContext("restarting (async) after startSync"); 710 owner.addRunnable(this); 711 } else { 712 completionCheck(); 713 } 714 } 715 716 /** 717 * Runs a given {@link Tube} (and everything thereafter) synchronously. 718 * <p/> 719 * <p/> 720 * This method blocks and returns only when all the successive {@link Tube}s 721 * complete their request/response processing. This method can be used 722 * if a {@link Tube} needs to fallback to synchronous processing. 723 * <p/> 724 * <h3>Example:</h3> 725 * <pre> 726 * class FooTube extends {@link AbstractFilterTubeImpl} { 727 * NextAction processRequest(Packet request) { 728 * // run everything synchronously and return with the response packet 729 * return doReturnWith(Fiber.current().runSync(next,request)); 730 * } 731 * NextAction processResponse(Packet response) { 732 * // never be invoked 733 * } 734 * } 735 * </pre> 736 * 737 * @param tubeline The first tube of the tubeline that will act on the packet. 738 * @param request The request packet to be passed to <tt>startPoint.processRequest()</tt>. 739 * @return The response packet to the <tt>request</tt>. 740 * @see #start(Tube, Packet, CompletionCallback) 741 */ 742 public synchronized 743 @NotNull 744 Packet runSync(@NotNull Tube tubeline, @NotNull Packet request) { 745 // save the current continuation, so that we return runSync() without executing them. 746 final Tube[] oldCont = conts; 747 final int oldContSize = contsSize; 748 final boolean oldSynchronous = synchronous; 749 final Tube oldNext = next; 750 751 if (oldContSize > 0) { 752 conts = new Tube[16]; 753 contsSize = 0; 754 } 755 756 try { 757 synchronous = true; 758 this.packet = request; 759 next = tubeline; 760 doRun(); 761 if (throwable != null) { 762 if (throwable instanceof RuntimeException) { 763 throw (RuntimeException) throwable; 764 } 765 if (throwable instanceof Error) { 766 throw (Error) throwable; 767 } 768 // our system is supposed to only accept Error or RuntimeException 769 throw new AssertionError(throwable); 770 } 771 return this.packet; 772 } finally { 773 conts = oldCont; 774 contsSize = oldContSize; 775 synchronous = oldSynchronous; 776 next = oldNext; 777 if(interrupted) { 778 Thread.currentThread().interrupt(); 779 interrupted = false; 780 } 781 if(!started && !startedSync) 782 completionCheck(); 783 } 784 } 785 786 private synchronized void completionCheck() { 787 // Don't trigger completion and callbacks if fiber is suspended 788 if(!isCanceled && contsSize==0 && suspendedCount == 0) { 789 if(isTraceEnabled()) 790 LOGGER.fine(getName()+" completed"); 791 completed = true; 792 clearListeners(); 793 notifyAll(); 794 if (completionCallback != null) { 795 if (throwable != null) 796 completionCallback.onCompletion(throwable); 797 else 798 completionCallback.onCompletion(packet); 799 } 800 } 801 } 802 803 ///** 804 // * Blocks until the fiber completes. 805 // */ 806 //public synchronized void join() throws InterruptedException { 807 // while(!completed) 808 // wait(); 809 //} 810 811 /** 812 * Invokes all registered {@link InterceptorHandler}s and then call into 813 * {@link Fiber#__doRun()}. 814 */ 815 private class InterceptorHandler implements FiberContextSwitchInterceptor.Work<Tube, Tube> { 816 /** 817 * Index in {@link Fiber#interceptors} to invoke next. 818 */ 819 private int idx; 820 821 /** 822 * Initiate the interception, and eventually invokes {@link Fiber#__doRun()}. 823 */ 824 Tube invoke(Tube next) { 825 idx = 0; 826 return execute(next); 827 } 828 829 public Tube execute(Tube next) { 830 if (idx == interceptors.size()) { 831 Fiber.this.next = next; 832 __doRun(); 833 } else { 834 FiberContextSwitchInterceptor interceptor = interceptors.get(idx++); 835 return interceptor.execute(Fiber.this, next, this); 836 } 837 return Fiber.this.next; 838 } 839 } 840 841 /** 842 * Executes the fiber as much as possible. 843 * 844 */ 845 @SuppressWarnings({"LoopStatementThatDoesntLoop"}) // IntelliJ reports this bogus error 846 private void doRun() { 847 848 dumpFiberContext("running"); 849 850 if (serializeExecution) { 851 serializedExecutionLock.lock(); 852 try { 853 _doRun(next); 854 } finally { 855 serializedExecutionLock.unlock(); 856 } 857 } else { 858 _doRun(next); 859 } 860 } 861 862 private String currentThreadMonitor = "CurrentThreadMonitor"; 863 864 private void _doRun(Tube next) { 865 Thread thread; 866 synchronized(currentThreadMonitor) { 867 if (currentThread != null && !synchronous) { 868 if (LOGGER.isLoggable(Level.FINE)) { 869 LOGGER.fine("Attempt to run Fiber ['" + this + "'] in more than one thread. Current Thread: " + currentThread + " Attempted Thread: " + Thread.currentThread()); 870 } 871 while (currentThread != null) { 872 try { 873 currentThreadMonitor.wait(); 874 } catch (Exception e) { 875 // ignore 876 } 877 } 878 } 879 currentThread = Thread.currentThread(); 880 thread = currentThread; 881 if (LOGGER.isLoggable(Level.FINE)) { 882 LOGGER.fine("Thread entering _doRun(): " + thread); 883 } 884 } 885 886 ClassLoader old = thread.getContextClassLoader(); 887 thread.setContextClassLoader(contextClassLoader); 888 try { 889 do { 890 needsToReenter = false; 891 892 // if interceptors are set, go through the interceptors. 893 if(interceptorHandler ==null) { 894 this.next = next; 895 __doRun(); 896 } 897 else 898 next = interceptorHandler.invoke(next); 899 } while (needsToReenter); 900 901 } finally { 902 thread.setContextClassLoader(old); 903 synchronized(currentThreadMonitor) { 904 currentThread = null; 905 if (LOGGER.isLoggable(Level.FINE)) { 906 LOGGER.fine("Thread leaving _doRun(): " + thread); 907 } 908 currentThreadMonitor.notify(); 909 } 910 } 911 } 912 913 /** 914 * To be invoked from {@link #doRun()}. 915 * 916 * @see #doRun() 917 */ 918 private void __doRun() { 919 final Fiber old = CURRENT_FIBER.get(); 920 CURRENT_FIBER.set(this); 921 922 // if true, lots of debug messages to show what's being executed 923 final boolean traceEnabled = LOGGER.isLoggable(Level.FINER); 924 925 try { 926 boolean abortResponse = false; 927 boolean justSuspended = false; 928 while(!isCanceled && !isBlocking(justSuspended) && !needsToReenter) { 929 try { 930 NextAction na; 931 Tube last; 932 if(throwable!=null) { 933 if(contsSize==0 || abortResponse) { 934 contsSize = 0; // abortResponse case 935 // nothing else to execute. we are done. 936 return; 937 } 938 last = popCont(); 939 if (traceEnabled) 940 LOGGER.finer(getName() + ' ' + last + ".processException(" + throwable + ')'); 941 na = last.processException(throwable); 942 } else { 943 if(next!=null) { 944 if(traceEnabled) 945 LOGGER.finer(getName()+' '+next+".processRequest("+(packet != null ? "Packet@"+Integer.toHexString(packet.hashCode()) : "null")+')'); 946 na = next.processRequest(packet); 947 last = next; 948 } else { 949 if(contsSize==0 || abortResponse) { 950 // nothing else to execute. we are done. 951 contsSize = 0; 952 return; 953 } 954 last = popCont(); 955 if(traceEnabled) 956 LOGGER.finer(getName()+' '+last+".processResponse("+(packet != null ? "Packet@"+Integer.toHexString(packet.hashCode()) : "null")+')'); 957 na = last.processResponse(packet); 958 } 959 } 960 961 if (traceEnabled) 962 LOGGER.finer(getName() + ' ' + last + " returned with " + na); 963 964 // If resume is called before suspend, then make sure 965 // resume(Packet) is not lost 966 if (na.kind != NextAction.SUSPEND) { 967 // preserve in-flight packet so that processException may inspect 968 if (na.kind != NextAction.THROW && 969 na.kind != NextAction.THROW_ABORT_RESPONSE) 970 packet = na.packet; 971 throwable = na.throwable; 972 } 973 974 switch(na.kind) { 975 case NextAction.INVOKE: 976 case NextAction.INVOKE_ASYNC: 977 pushCont(last); 978 // fall through next 979 case NextAction.INVOKE_AND_FORGET: 980 next = na.next; 981 if (na.kind == NextAction.INVOKE_ASYNC 982 && startedSync) { 983 // Break out here 984 return; 985 } 986 break; 987 case NextAction.THROW_ABORT_RESPONSE: 988 case NextAction.ABORT_RESPONSE: 989 abortResponse = true; 990 if (LOGGER.isLoggable(Level.FINE)) { 991 LOGGER.fine("Fiber " + this + " is aborting a response due to exception: " + na.throwable); 992 } 993 case NextAction.RETURN: 994 case NextAction.THROW: 995 next = null; 996 break; 997 case NextAction.SUSPEND: 998 if (next != null) { 999 // Only store the 'last' tube when we're processing 1000 // a request, since conts array is for processResponse 1001 pushCont(last); 1002 } 1003 next = na.next; 1004 justSuspended = suspend(); 1005 break; 1006 default: 1007 throw new AssertionError(); 1008 } 1009 } catch (RuntimeException t) { 1010 if (traceEnabled) 1011 LOGGER.log(Level.FINER, getName() + " Caught " + t + ". Start stack unwinding", t); 1012 throwable = t; 1013 } catch (Error t) { 1014 if (traceEnabled) 1015 LOGGER.log(Level.FINER, getName() + " Caught " + t + ". Start stack unwinding", t); 1016 throwable = t; 1017 } 1018 1019 dumpFiberContext("After tube execution"); 1020 } 1021 1022 if (isCanceled) { 1023 next = null; 1024 throwable = null; 1025 contsSize = 0; 1026 } 1027 1028 // there's nothing we can execute right away. 1029 // we'll be back when this fiber is resumed. 1030 1031 } finally { 1032 CURRENT_FIBER.set(old); 1033 } 1034 } 1035 1036 private void pushCont(Tube tube) { 1037 conts[contsSize++] = tube; 1038 1039 // expand if needed 1040 int len = conts.length; 1041 if (contsSize == len) { 1042 Tube[] newBuf = new Tube[len * 2]; 1043 System.arraycopy(conts, 0, newBuf, 0, len); 1044 conts = newBuf; 1045 } 1046 } 1047 1048 private Tube popCont() { 1049 return conts[--contsSize]; 1050 } 1051 1052 private Tube peekCont() { 1053 int index = contsSize - 1; 1054 if (index >= 0 && index < conts.length) { 1055 return conts[index]; 1056 } else { 1057 return null; 1058 } 1059 } 1060 1061 /** 1062 * Only to be used by Tubes that manipulate the Fiber to create alternate flows 1063 * @since 2.2.6 1064 */ 1065 public void resetCont(Tube[] conts, int contsSize) { 1066 this.conts = conts; 1067 this.contsSize = contsSize; 1068 } 1069 1070 /** 1071 * Returns true if the fiber needs to block its execution. 1072 */ 1073 private boolean isBlocking(boolean justSuspended) { 1074 if (synchronous) { 1075 while (suspendedCount == 1) 1076 try { 1077 if (isTraceEnabled()) { 1078 LOGGER.fine(getName() + " is blocking thread " + Thread.currentThread().getName()); 1079 } 1080 wait(); // the synchronized block is the whole runSync method. 1081 } catch (InterruptedException e) { 1082 // remember that we are interrupted, but don't respond to it 1083 // right away. This behavior is in line with what happens 1084 // when you are actually running the whole thing synchronously. 1085 interrupted = true; 1086 } 1087 return false; 1088 } 1089 else 1090 return justSuspended || suspendedCount==1; 1091 } 1092 1093 private String getName() { 1094 return "engine-" + owner.id + "fiber-" + id; 1095 } 1096 1097 @Override 1098 public String toString() { 1099 return getName(); 1100 } 1101 1102 /** 1103 * Gets the current {@link Packet} associated with this fiber. 1104 * <p/> 1105 * <p/> 1106 * This method returns null if no packet has been associated with the fiber yet. 1107 */ 1108 public 1109 @Nullable 1110 Packet getPacket() { 1111 return packet; 1112 } 1113 1114 /** 1115 * Returns completion callback associated with this Fiber 1116 * @return Completion callback 1117 * @since 2.2.6 1118 */ 1119 public CompletionCallback getCompletionCallback() { 1120 return completionCallback; 1121 } 1122 1123 /** 1124 * Updates completion callback associated with this Fiber 1125 * @param completionCallback Completion callback 1126 * @since 2.2.6 1127 */ 1128 public void setCompletionCallback(CompletionCallback completionCallback) { 1129 this.completionCallback = completionCallback; 1130 } 1131 1132 /** 1133 * (ADVANCED) Returns true if the current fiber is being executed synchronously. 1134 * <p/> 1135 * <p/> 1136 * Fiber may run synchronously for various reasons. Perhaps this is 1137 * on client side and application has invoked a synchronous method call. 1138 * Perhaps this is on server side and we have deployed on a synchronous 1139 * transport (like servlet.) 1140 * <p/> 1141 * <p/> 1142 * When a fiber is run synchronously (IOW by {@link #runSync(Tube, Packet)}), 1143 * further invocations to {@link #runSync(Tube, Packet)} can be done 1144 * without degrading the performance. 1145 * <p/> 1146 * <p/> 1147 * So this value can be used as a further optimization hint for 1148 * advanced {@link Tube}s to choose the best strategy to invoke 1149 * the next {@link Tube}. For example, a tube may want to install 1150 * a {@link FiberContextSwitchInterceptor} if running async, yet 1151 * it might find it faster to do {@link #runSync(Tube, Packet)} 1152 * if it's already running synchronously. 1153 */ 1154 public static boolean isSynchronous() { 1155 return current().synchronous; 1156 } 1157 1158 /** 1159 * Returns true if the current Fiber on the current thread was started 1160 * synchronously. Note, this is not strictly the same as being synchronous 1161 * because the assumption is that the Fiber will ultimately be dispatched 1162 * asynchronously, possibly have a completion callback associated with it, etc. 1163 * Note, the 'startedSync' flag is cleared once the current Fiber is 1164 * converted to running asynchronously. 1165 * @since 2.2.6 1166 */ 1167 public boolean isStartedSync() { 1168 return startedSync; 1169 } 1170 1171 /** 1172 * Gets the current fiber that's running. 1173 * <p/> 1174 * <p/> 1175 * This works like {@link Thread#currentThread()}. 1176 * This method only works when invoked from {@link Tube}. 1177 */ 1178 public static 1179 @NotNull 1180 Fiber current() { 1181 Fiber fiber = CURRENT_FIBER.get(); 1182 if (fiber == null) 1183 throw new IllegalStateException("Can be only used from fibers"); 1184 return fiber; 1185 } 1186 1187 /** 1188 * Gets the current fiber that's running, if set. 1189 */ 1190 public static Fiber getCurrentIfSet() { 1191 return CURRENT_FIBER.get(); 1192 } 1193 1194 private static final ThreadLocal<Fiber> CURRENT_FIBER = new ThreadLocal<Fiber>(); 1195 1196 /** 1197 * Used to allocate unique number for each fiber. 1198 */ 1199 private static final AtomicInteger iotaGen = new AtomicInteger(); 1200 1201 private static boolean isTraceEnabled() { 1202 return LOGGER.isLoggable(Level.FINE); 1203 } 1204 1205 private static final Logger LOGGER = Logger.getLogger(Fiber.class.getName()); 1206 1207 1208 private static final ReentrantLock serializedExecutionLock = new ReentrantLock(); 1209 1210 /** 1211 * Set this boolean to true to execute fibers sequentially one by one. 1212 * See class javadoc. 1213 */ 1214 public static volatile boolean serializeExecution = Boolean.getBoolean(Fiber.class.getName() + ".serialize"); 1215 1216 private final Set<Component> components = new CopyOnWriteArraySet<Component>(); 1217 1218 public <S> S getSPI(Class<S> spiType) { 1219 for(Component c : components) { 1220 S spi = c.getSPI(spiType); 1221 if (spi != null) 1222 return spi; 1223 } 1224 return null; 1225 } 1226 1227 public Set<Component> getComponents() { 1228 return components; 1229 } 1230 }