1 /* 2 * Copyright (c) 1997, 2010, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package com.sun.xml.internal.ws.api.pipe; 27 28 import com.sun.istack.internal.NotNull; 29 import com.sun.istack.internal.Nullable; 30 import com.sun.xml.internal.ws.api.Cancelable; 31 import com.sun.xml.internal.ws.api.Component; 32 import com.sun.xml.internal.ws.api.ComponentRegistry; 33 import com.sun.xml.internal.ws.api.SOAPVersion; 34 import com.sun.xml.internal.ws.api.addressing.AddressingVersion; 35 import com.sun.xml.internal.ws.api.message.Packet; 36 import com.sun.xml.internal.ws.api.pipe.helper.AbstractFilterTubeImpl; 37 import com.sun.xml.internal.ws.api.server.Adapter; 38 39 import java.util.ArrayList; 40 import java.util.HashSet; 41 import java.util.List; 42 import java.util.Set; 43 import java.util.concurrent.CopyOnWriteArraySet; 44 import java.util.concurrent.atomic.AtomicInteger; 45 import java.util.concurrent.locks.ReentrantLock; 46 import java.util.logging.Level; 47 import java.util.logging.Logger; 48 49 /** 50 * User-level thread. Represents the execution of one request/response processing. 51 * <p/> 52 * <p/> 53 * JAX-WS RI is capable of running a large number of request/response concurrently by 54 * using a relatively small number of threads. This is made possible by utilizing 55 * a {@link Fiber} — a user-level thread that gets created for each request/response 56 * processing. 57 * <p/> 58 * <p/> 59 * A fiber remembers where in the pipeline the processing is at, what needs to be 60 * executed on the way out (when processing response), and other additional information 61 * specific to the execution of a particular request/response. 62 * <p/> 63 * <h2>Suspend/Resume</h2> 64 * <p/> 65 * Fiber can be {@link NextAction#suspend() suspended} by a {@link Tube}. 66 * When a fiber is suspended, it will be kept on the side until it is 67 * {@link #resume(Packet) resumed}. This allows threads to go execute 68 * other runnable fibers, allowing efficient utilization of smaller number of 91 * Setting the {@link #LOGGER} for FINE would give you basic start/stop/resume/suspend 92 * level logging. Using FINER would cause more detailed logging, which includes 93 * what tubes are executed in what order and how they behaved. 94 * <p/> 95 * <p/> 96 * When you debug the server side, consider setting {@link Fiber#serializeExecution} 97 * to true, so that execution of fibers are serialized. Debugging a server 98 * with more than one running threads is very tricky, and this switch will 99 * prevent that. This can be also enabled by setting the system property on. 100 * See the source code. 101 * 102 * @author Kohsuke Kawaguchi 103 * @author Jitendra Kotamraju 104 */ 105 public final class Fiber implements Runnable, Cancelable, ComponentRegistry { 106 107 /** 108 * Callback interface for notification of suspend and resume. 109 * 110 * @since 2.2.6 111 */ 112 public interface Listener { 113 /** 114 * Fiber has been suspended. Implementations of this callback may resume the Fiber. 115 * @param fiber Fiber 116 */ 117 public void fiberSuspended(Fiber fiber); 118 119 /** 120 * Fiber has been resumed. Behavior is undefined if implementations of this callback attempt to suspend the Fiber. 121 * @param fiber Fiber 122 */ 123 public void fiberResumed(Fiber fiber); 124 } 125 126 private List<Listener> _listeners = new ArrayList<Listener>(); 127 128 /** 129 * Adds suspend/resume callback listener 130 * @param listener Listener 131 * @since 2.2.6 132 */ 133 public void addListener(Listener listener) { 134 synchronized(_listeners) { 135 if (!_listeners.contains(listener)) { 136 _listeners.add(listener); 137 } 138 } 139 } 140 141 /** 142 * Removes suspend/resume callback listener 143 * @param listener Listener 144 * @since 2.2.6 145 */ 146 public void removeListener(Listener listener) { 147 synchronized(_listeners) { 148 _listeners.remove(listener); 149 } 150 } 151 152 private List<Listener> getCurrentListeners() { 153 synchronized(_listeners) { 154 return new ArrayList<Listener>(_listeners); 155 } 156 } 157 158 private void clearListeners() { 159 synchronized(_listeners) { 160 _listeners.clear(); 161 } 162 } 163 164 /** 165 * {@link Tube}s whose {@link Tube#processResponse(Packet)} method needs 166 * to be invoked on the way back. 167 */ 168 private Tube[] conts = new Tube[16]; 169 private int contsSize; 170 171 /** 172 * If this field is non-null, the next instruction to execute is 193 * <li>Tube decides that the fiber needs to be suspended to wait for the external event. 194 * <li>Tube hooks up fiber with some external mechanism (like NIO channel selector) 195 * <li>Tube returns with {@link NextAction#suspend()}. 196 * <li>"External mechanism" becomes signal state and invokes {@link Fiber#resume(Packet)} 197 * to wake up fiber 198 * <li>{@link Fiber#doRun} invokes {@link Fiber#suspend()}. 199 * </ol> 200 * <p/> 201 * <p/> 202 * Using int, this will work OK because {@link #suspendedCount} becomes -1 when 203 * {@link #resume(Packet)} occurs before {@link #suspend()}. 204 * <p/> 205 * <p/> 206 * Increment and decrement is guarded by 'this' object. 207 */ 208 private volatile int suspendedCount = 0; 209 210 private volatile boolean isInsideSuspendCallbacks = false; 211 212 /** 213 * Is this fiber completed? 214 */ 215 private volatile boolean completed; 216 217 /** 218 * Is this {@link Fiber} currently running in the synchronous mode? 219 */ 220 private boolean synchronous; 221 222 private boolean interrupted; 223 224 private final int id; 225 226 /** 227 * Active {@link FiberContextSwitchInterceptor}s for this fiber. 228 */ 229 private List<FiberContextSwitchInterceptor> interceptors; 230 231 /** 232 * Not null when {@link #interceptors} is not null. 233 */ 234 private InterceptorHandler interceptorHandler; 235 236 /** 237 * This flag is set to true when a new interceptor is added. 238 * <p/> 239 * When that happens, we need to first exit the current interceptors 240 * and then reenter them, so that the newly added interceptors start 241 * taking effect. This flag is used to control that flow. 242 */ 243 private boolean needsToReenter; 244 245 /** 246 * Fiber's context {@link ClassLoader}. 247 */ 248 private 249 @Nullable 250 ClassLoader contextClassLoader; 251 252 private 253 @Nullable 254 CompletionCallback completionCallback; 255 256 /** 257 * The thread on which this Fiber is currently executing, if applicable. 258 */ 259 private Thread currentThread; 260 261 private volatile boolean isCanceled; 262 263 /** 264 * Set to true if this fiber is started asynchronously, to avoid 265 * doubly-invoking completion code. 266 */ 267 private boolean started; 268 269 /** 270 * Set to true if this fiber is started sync but allowed to run async. 271 * This property exists for use cases where the processing model is fundamentally async 272 * but some requirement or feature mandates that part of the tubeline run synchronously. For 273 * instance, WS-ReliableMessaging with non-anonymous addressing is compatible with running 274 * asynchronously, but if in-order message delivery is used then message processing must assign 275 * a message number before the remainder of the processing can be asynchronous. 276 */ 277 private boolean startedSync; 278 279 /** 280 * Callback to be invoked when a {@link Fiber} finishs execution. 281 */ 282 public interface CompletionCallback { 283 /** 284 * Indicates that the fiber has finished its execution. 285 * <p/> 286 * <p/> 287 * Since the JAX-WS RI runs asynchronously, 288 * this method maybe invoked by a different thread 289 * than any of the threads that started it or run a part of tubeline. 290 */ 291 void onCompletion(@NotNull Packet response); 292 293 /** 294 * Indicates that the fiber has finished abnormally, by throwing a given {@link Throwable}. 295 */ 296 void onCompletion(@NotNull Throwable error); 297 } 298 299 Fiber(Engine engine) { 300 this.owner = engine; 301 if (isTraceEnabled()) { 302 id = iotaGen.incrementAndGet(); 303 LOGGER.fine(getName() + " created"); 304 } else { 305 id = -1; 306 } 307 308 // if this is run from another fiber, then we naturally inherit its context classloader, 309 // so this code works for fiber->fiber inheritance just fine. 310 contextClassLoader = Thread.currentThread().getContextClassLoader(); 311 } 312 313 /** 314 * Starts the execution of this fiber asynchronously. 315 * <p/> 316 * <p/> 317 * This method works like {@link Thread#start()}. 318 * 319 * @param tubeline The first tube of the tubeline that will act on the packet. 320 * @param request The request packet to be passed to <tt>startPoint.processRequest()</tt>. 321 * @param completionCallback The callback to be invoked when the processing is finished and the 322 * final response packet is available. 323 * @see #runSync(Tube, Packet) 324 */ 325 public void start(@NotNull Tube tubeline, @NotNull Packet request, @Nullable CompletionCallback completionCallback) { 326 start(tubeline, request, completionCallback, false); 327 } 328 329 private void dumpFiberContext(String desc) { 330 if(isTraceEnabled()) { 331 String action = null; 332 String msgId = null; 333 if (packet != null) { 334 for (SOAPVersion sv: SOAPVersion.values()) { 335 for (AddressingVersion av: AddressingVersion.values()) { 336 action = packet.getMessage() != null ? packet.getMessage().getHeaders().getAction(av, sv) : null; 337 msgId = packet.getMessage() != null ? packet.getMessage().getHeaders().getMessageID(av, sv) : null; 338 if (action != null || msgId != null) { 339 break; 340 } 341 } 342 if (action != null || msgId != null) { 343 break; 344 } 345 } 346 } 347 String actionAndMsgDesc; 348 if (action == null && msgId == null) { 349 actionAndMsgDesc = "NO ACTION or MSG ID"; 350 } else { 351 actionAndMsgDesc = "'" + action + "' and msgId '" + msgId + "'"; 352 } 353 354 String tubeDesc; 355 if (next != null) { 356 tubeDesc = next.toString() + ".processRequest()"; 357 } else { 358 tubeDesc = peekCont() + ".processResponse()"; 359 } 360 361 LOGGER.fine(getName() + " " + desc + " with " + actionAndMsgDesc + " and 'current' tube " + tubeDesc + " from thread " + Thread.currentThread().getName() + " with Packet: " + (packet != null ? packet.toShortString() : null)); 362 } 363 } 364 365 /** 366 * Starts the execution of this fiber. 367 * 368 * If forceSync is true, then the fiber is started for an ostensibly async invocation, 369 * but allows for some portion of the tubeline to run sync with the calling 370 * client instance (Port/Dispatch instance). This allows tubes that enforce 371 * ordering to see requests in the order they were sent at the point the 372 * client invoked them. 373 * <p> 374 * The forceSync parameter will be true only when the caller (e.g. AsyncInvoker or 375 * SEIStub) knows one or more tubes need to enforce ordering and thus need 376 * to run sync with the client. Such tubes can return 377 * NextAction.INVOKE_ASYNC to indicate that the next tube in the tubeline 378 * should be invoked async to the current thread. 379 * 380 * <p> 381 * This method works like {@link Thread#start()}. 428 * This method is implemented in a race-free way. Another thread can invoke 429 * this method even before this fiber goes into the suspension mode. So the caller 430 * need not worry about synchronizing {@link NextAction#suspend()} and this method. 431 * 432 * @param resumePacket packet used in the resumed processing 433 */ 434 public void resume(@NotNull Packet resumePacket) { 435 resume(resumePacket, false); 436 } 437 438 /** 439 * Similar to resume(Packet) but allowing the Fiber to be resumed 440 * synchronously (in the current Thread). If you want to know when the 441 * fiber completes (not when this method returns) then add/wrap a 442 * CompletionCallback on this Fiber. 443 * For example, an asynchronous response endpoint that supports WS-ReliableMessaging 444 * including in-order message delivery may need to resume the Fiber synchronously 445 * until message order is confirmed prior to returning to asynchronous processing. 446 * @since 2.2.6 447 */ 448 public synchronized void resume(@NotNull Packet resumePacket, 449 boolean forceSync) { 450 resume(resumePacket, forceSync, null); 451 } 452 453 /** 454 * Similar to resume(Packet, boolean) but allowing the Fiber to be resumed 455 * and at the same time atomically assign a new CompletionCallback to it. 456 * @since 2.2.6 457 */ 458 public void resume(@NotNull Packet resumePacket, 459 boolean forceSync, 460 CompletionCallback callback) { 461 462 synchronized(this) { 463 if (callback != null) { 464 setCompletionCallback(callback); 465 } 466 if(isTraceEnabled()) 467 LOGGER.fine(getName()+" resuming. Will have suspendedCount=" + (suspendedCount-1)); 468 packet = resumePacket; 469 if( --suspendedCount == 0 ) { 470 if (!isInsideSuspendCallbacks) { 471 List<Listener> listeners = getCurrentListeners(); 472 for (Listener listener: listeners) { 473 try { 474 listener.fiberResumed(this); 475 } catch (Throwable e) { 476 if (isTraceEnabled()) 477 LOGGER.fine("Listener " + listener + " threw exception: " + e.getMessage()); 478 } 479 } 480 481 if(synchronous) { 482 notifyAll(); 483 } else if (forceSync || startedSync) { 484 run(); 485 } else { 486 dumpFiberContext("resuming (async)"); 487 owner.addRunnable(this); 488 } 489 } 490 } else { 491 if (isTraceEnabled()) { 492 LOGGER.fine(getName() + " taking no action on resume because suspendedCount != 0: " + suspendedCount); 493 } 494 } 495 } 496 } 497 498 /** 499 * Wakes up a suspended fiber and begins response processing. 500 * @since 2.2.6 501 */ 502 public synchronized void resumeAndReturn(@NotNull Packet resumePacket, 503 boolean forceSync) { 504 if(isTraceEnabled()) 505 LOGGER.fine(getName()+" resumed with Return Packet"); 506 next = null; 507 resume(resumePacket, forceSync); 508 } 509 510 /** 511 * Wakes up a suspended fiber with an exception. 512 * <p/> 513 * <p/> 514 * The execution of the suspended fiber will be resumed in the response 515 * processing direction, by calling the {@link Tube#processException(Throwable)} method 516 * on the next/first {@link Tube} in the {@link Fiber}'s processing stack with 517 * the specified exception as the parameter. 518 * <p/> 519 * <p/> 520 * This method is implemented in a race-free way. Another thread can invoke 521 * this method even before this fiber goes into the suspension mode. So the caller 522 * need not worry about synchronizing {@link NextAction#suspend()} and this method. 523 * 524 * @param throwable exception that is used in the resumed processing 525 */ 526 public synchronized void resume(@NotNull Throwable throwable) { 527 resume(throwable, false); 528 } 529 530 /** 531 * Wakes up a suspend fiber with an exception. 532 * 533 * If forceSync is true, then the suspended fiber will resume with 534 * synchronous processing on the current thread. This will continue 535 * until some Tube indicates that it is safe to switch to asynchronous 536 * processing. 537 * 538 * @param error exception that is used in the resumed processing 539 * @param forceSync if processing begins synchronously 540 * @since 2.2.6 541 */ 542 public synchronized void resume(@NotNull Throwable error, 543 boolean forceSync) { 544 if(isTraceEnabled()) 545 LOGGER.fine(getName()+" resumed with Return Throwable"); 546 next = null; 547 throwable = error; 548 resume(packet, forceSync); 549 } 550 551 /** 552 * Marks this Fiber as cancelled. A cancelled Fiber will never invoke its completion callback 553 * @param mayInterrupt if cancel should use {@link Thread.interrupt()} 554 * @see java.util.Future.cancel 555 * @since 2.2.6 556 */ 557 public void cancel(boolean mayInterrupt) { 558 isCanceled = true; 559 if (mayInterrupt) { 560 synchronized(this) { 561 if (currentThread != null) 562 currentThread.interrupt(); 563 } 564 } 565 } 566 567 /** 568 * Suspends this fiber's execution until the resume method is invoked. 569 * <p/> 570 * The call returns immediately, and when the fiber is resumed 571 * the execution picks up from the last scheduled continuation. 572 */ 573 private boolean suspend() { 574 575 synchronized(this) { 576 if(isTraceEnabled()) { 577 LOGGER.fine(getName()+" suspending. Will have suspendedCount=" + (suspendedCount+1)); 578 if (suspendedCount > 0) { 579 LOGGER.fine("WARNING - " + getName()+" suspended more than resumed. Will require more than one resume to actually resume this fiber."); 580 } 581 } 582 583 List<Listener> listeners = getCurrentListeners(); 584 if (++suspendedCount == 1) { 585 isInsideSuspendCallbacks = true; 586 try { 587 for (Listener listener: listeners) { 588 try { 589 listener.fiberSuspended(this); 590 } catch (Throwable e) { 591 if(isTraceEnabled()) 592 LOGGER.fine("Listener " + listener + " threw exception: " + e.getMessage()); 593 } 594 } 595 } finally { 596 isInsideSuspendCallbacks = false; 597 } 598 } 599 600 if (suspendedCount <= 0) { 601 // suspend callback caused fiber to resume 602 for (Listener listener: listeners) { 603 try { 604 listener.fiberResumed(this); 605 } catch (Throwable e) { 606 if(isTraceEnabled()) 607 LOGGER.fine("Listener " + listener + " threw exception: " + e.getMessage()); 608 } 609 } 610 611 return false; 612 } 613 614 return true; 615 } 616 } 617 618 /** 619 * Adds a new {@link FiberContextSwitchInterceptor} to this fiber. 620 * <p/> 621 * <p/> 622 * The newly installed fiber will take effect immediately after the current 623 * tube returns from its {@link Tube#processRequest(Packet)} or 624 * {@link Tube#processResponse(Packet)}, before the next tube begins processing. 625 * <p/> 626 * <p/> 627 * So when the tubeline consists of X and Y, and when X installs an interceptor, 628 * the order of execution will be as follows: 629 * <p/> 630 * <ol> 631 * <li>X.processRequest() 632 * <li>interceptor gets installed 633 * <li>interceptor.execute() is invoked 634 * <li>Y.processRequest() 635 * </ol> 636 */ 637 public void addInterceptor(@NotNull FiberContextSwitchInterceptor interceptor) { 638 if (interceptors == null) { 639 interceptors = new ArrayList<FiberContextSwitchInterceptor>(); 640 interceptorHandler = new InterceptorHandler(); 641 } 642 interceptors.add(interceptor); 643 needsToReenter = true; 644 } 645 646 /** 647 * Removes a {@link FiberContextSwitchInterceptor} from this fiber. 648 * <p/> 649 * <p/> 650 * The removal of the interceptor takes effect immediately after the current 651 * tube returns from its {@link Tube#processRequest(Packet)} or 652 * {@link Tube#processResponse(Packet)}, before the next tube begins processing. 653 * <p/> 654 * <p/> 655 * <p/> 656 * So when the tubeline consists of X and Y, and when Y uninstalls an interceptor 657 * on the way out, then the order of execution will be as follows: 658 * <p/> 659 * <ol> 660 * <li>Y.processResponse() (notice that this happens with interceptor.execute() in the callstack) 661 * <li>interceptor gets uninstalled 662 * <li>interceptor.execute() returns 663 * <li>X.processResponse() 664 * </ol> 665 * 666 * @return true if the specified interceptor was removed. False if 667 * the specified interceptor was not registered with this fiber to begin with. 668 */ 669 public boolean removeInterceptor(@NotNull FiberContextSwitchInterceptor interceptor) { 670 if (interceptors != null && interceptors.remove(interceptor)) { 671 needsToReenter = true; 672 return true; 673 } 674 return false; 675 } 676 677 /** 678 * Gets the context {@link ClassLoader} of this fiber. 679 */ 680 public 681 @Nullable 682 ClassLoader getContextClassLoader() { 683 return contextClassLoader; 684 } 685 686 /** 687 * Sets the context {@link ClassLoader} of this fiber. 688 */ 689 public ClassLoader setContextClassLoader(@Nullable ClassLoader contextClassLoader) { 690 ClassLoader r = this.contextClassLoader; 691 this.contextClassLoader = contextClassLoader; 692 return r; 693 } 694 695 /** 696 * DO NOT CALL THIS METHOD. This is an implementation detail 697 * of {@link Fiber}. 698 */ 699 @Deprecated 700 public void run() { 701 assert !synchronous; 702 doRun(); 703 if (startedSync && suspendedCount == 0 && 704 (next != null || contsSize > 0)) { 705 // We bailed out of running this fiber we started as sync, and now 706 // want to finish running it async 707 startedSync = false; 708 // Start back up as an async fiber 709 dumpFiberContext("restarting (async) after startSync"); 710 owner.addRunnable(this); 711 } else { 712 completionCheck(); 713 } 714 } 715 716 /** 717 * Runs a given {@link Tube} (and everything thereafter) synchronously. 718 * <p/> 719 * <p/> 720 * This method blocks and returns only when all the successive {@link Tube}s 721 * complete their request/response processing. This method can be used 722 * if a {@link Tube} needs to fallback to synchronous processing. 723 * <p/> 724 * <h3>Example:</h3> 725 * <pre> 726 * class FooTube extends {@link AbstractFilterTubeImpl} { 727 * NextAction processRequest(Packet request) { 728 * // run everything synchronously and return with the response packet 729 * return doReturnWith(Fiber.current().runSync(next,request)); 730 * } 731 * NextAction processResponse(Packet response) { 732 * // never be invoked 733 * } 734 * } 735 * </pre> 736 * 737 * @param tubeline The first tube of the tubeline that will act on the packet. 738 * @param request The request packet to be passed to <tt>startPoint.processRequest()</tt>. 739 * @return The response packet to the <tt>request</tt>. 740 * @see #start(Tube, Packet, CompletionCallback) 741 */ 742 public synchronized 743 @NotNull 744 Packet runSync(@NotNull Tube tubeline, @NotNull Packet request) { 745 // save the current continuation, so that we return runSync() without executing them. 746 final Tube[] oldCont = conts; 747 final int oldContSize = contsSize; 748 final boolean oldSynchronous = synchronous; 749 final Tube oldNext = next; 750 751 if (oldContSize > 0) { 752 conts = new Tube[16]; 753 contsSize = 0; 754 } 755 756 try { 757 synchronous = true; 758 this.packet = request; 759 next = tubeline; 760 doRun(); 761 if (throwable != null) { 762 if (throwable instanceof RuntimeException) { 763 throw (RuntimeException) throwable; 764 } 765 if (throwable instanceof Error) { 766 throw (Error) throwable; 767 } 768 // our system is supposed to only accept Error or RuntimeException 769 throw new AssertionError(throwable); 770 } 771 return this.packet; 772 } finally { 773 conts = oldCont; 774 contsSize = oldContSize; 775 synchronous = oldSynchronous; 776 next = oldNext; 777 if(interrupted) { 778 Thread.currentThread().interrupt(); 779 interrupted = false; 780 } 781 if(!started && !startedSync) 782 completionCheck(); 783 } 784 } 785 786 private synchronized void completionCheck() { 787 // Don't trigger completion and callbacks if fiber is suspended 788 if(!isCanceled && contsSize==0 && suspendedCount == 0) { 789 if(isTraceEnabled()) 790 LOGGER.fine(getName()+" completed"); 791 completed = true; 792 clearListeners(); 793 notifyAll(); 794 if (completionCallback != null) { 795 if (throwable != null) 796 completionCallback.onCompletion(throwable); 797 else 798 completionCallback.onCompletion(packet); 799 } 800 } 801 } 802 803 ///** 804 // * Blocks until the fiber completes. 805 // */ 806 //public synchronized void join() throws InterruptedException { 807 // while(!completed) 808 // wait(); 809 //} 810 811 /** 812 * Invokes all registered {@link InterceptorHandler}s and then call into 813 * {@link Fiber#__doRun()}. 814 */ 815 private class InterceptorHandler implements FiberContextSwitchInterceptor.Work<Tube, Tube> { 816 /** 817 * Index in {@link Fiber#interceptors} to invoke next. 818 */ 819 private int idx; 820 821 /** 822 * Initiate the interception, and eventually invokes {@link Fiber#__doRun()}. 823 */ 824 Tube invoke(Tube next) { 825 idx = 0; 826 return execute(next); 827 } 828 829 public Tube execute(Tube next) { 830 if (idx == interceptors.size()) { 831 Fiber.this.next = next; 832 __doRun(); 833 } else { 834 FiberContextSwitchInterceptor interceptor = interceptors.get(idx++); 835 return interceptor.execute(Fiber.this, next, this); 836 } 837 return Fiber.this.next; 838 } 839 } 840 841 /** 842 * Executes the fiber as much as possible. 843 * 844 */ 845 @SuppressWarnings({"LoopStatementThatDoesntLoop"}) // IntelliJ reports this bogus error 846 private void doRun() { 847 848 dumpFiberContext("running"); 849 850 if (serializeExecution) { 851 serializedExecutionLock.lock(); 852 try { 853 _doRun(next); 854 } finally { 855 serializedExecutionLock.unlock(); 856 } 857 } else { 858 _doRun(next); 859 } 860 } 861 862 private String currentThreadMonitor = "CurrentThreadMonitor"; 863 864 private void _doRun(Tube next) { 865 Thread thread; 866 synchronized(currentThreadMonitor) { 867 if (currentThread != null && !synchronous) { 868 if (LOGGER.isLoggable(Level.FINE)) { 869 LOGGER.fine("Attempt to run Fiber ['" + this + "'] in more than one thread. Current Thread: " + currentThread + " Attempted Thread: " + Thread.currentThread()); 870 } 871 while (currentThread != null) { 872 try { 873 currentThreadMonitor.wait(); 874 } catch (Exception e) { 875 // ignore 876 } 877 } 878 } 879 currentThread = Thread.currentThread(); 880 thread = currentThread; 881 if (LOGGER.isLoggable(Level.FINE)) { 882 LOGGER.fine("Thread entering _doRun(): " + thread); 883 } 884 } 885 886 ClassLoader old = thread.getContextClassLoader(); 887 thread.setContextClassLoader(contextClassLoader); 888 try { 889 do { 890 needsToReenter = false; 891 892 // if interceptors are set, go through the interceptors. 893 if(interceptorHandler ==null) { 894 this.next = next; 895 __doRun(); 896 } 897 else 898 next = interceptorHandler.invoke(next); 899 } while (needsToReenter); 900 901 } finally { 902 thread.setContextClassLoader(old); 903 synchronized(currentThreadMonitor) { 904 currentThread = null; 905 if (LOGGER.isLoggable(Level.FINE)) { 906 LOGGER.fine("Thread leaving _doRun(): " + thread); 907 } 908 currentThreadMonitor.notify(); 909 } 910 } 911 } 912 913 /** 914 * To be invoked from {@link #doRun()}. 915 * 916 * @see #doRun() 917 */ 918 private void __doRun() { 919 final Fiber old = CURRENT_FIBER.get(); 920 CURRENT_FIBER.set(this); 921 922 // if true, lots of debug messages to show what's being executed 923 final boolean traceEnabled = LOGGER.isLoggable(Level.FINER); 924 925 try { 926 boolean abortResponse = false; 927 boolean justSuspended = false; 928 while(!isCanceled && !isBlocking(justSuspended) && !needsToReenter) { 929 try { 930 NextAction na; 931 Tube last; 932 if(throwable!=null) { 933 if(contsSize==0 || abortResponse) { 934 contsSize = 0; // abortResponse case 935 // nothing else to execute. we are done. 936 return; 937 } 938 last = popCont(); 939 if (traceEnabled) 940 LOGGER.finer(getName() + ' ' + last + ".processException(" + throwable + ')'); 941 na = last.processException(throwable); 942 } else { 943 if(next!=null) { 944 if(traceEnabled) 945 LOGGER.finer(getName()+' '+next+".processRequest("+(packet != null ? "Packet@"+Integer.toHexString(packet.hashCode()) : "null")+')'); 946 na = next.processRequest(packet); 947 last = next; 948 } else { 949 if(contsSize==0 || abortResponse) { 950 // nothing else to execute. we are done. 951 contsSize = 0; 952 return; 953 } 954 last = popCont(); 955 if(traceEnabled) 956 LOGGER.finer(getName()+' '+last+".processResponse("+(packet != null ? "Packet@"+Integer.toHexString(packet.hashCode()) : "null")+')'); 957 na = last.processResponse(packet); 958 } 959 } 960 961 if (traceEnabled) 962 LOGGER.finer(getName() + ' ' + last + " returned with " + na); 963 964 // If resume is called before suspend, then make sure 965 // resume(Packet) is not lost 966 if (na.kind != NextAction.SUSPEND) { 967 // preserve in-flight packet so that processException may inspect 968 if (na.kind != NextAction.THROW && 969 na.kind != NextAction.THROW_ABORT_RESPONSE) 970 packet = na.packet; 971 throwable = na.throwable; 972 } 973 974 switch(na.kind) { 975 case NextAction.INVOKE: 976 case NextAction.INVOKE_ASYNC: 977 pushCont(last); 978 // fall through next 979 case NextAction.INVOKE_AND_FORGET: 980 next = na.next; 981 if (na.kind == NextAction.INVOKE_ASYNC 982 && startedSync) { 983 // Break out here 984 return; 985 } 986 break; 987 case NextAction.THROW_ABORT_RESPONSE: 988 case NextAction.ABORT_RESPONSE: 989 abortResponse = true; 990 if (LOGGER.isLoggable(Level.FINE)) { 991 LOGGER.fine("Fiber " + this + " is aborting a response due to exception: " + na.throwable); 992 } 993 case NextAction.RETURN: 994 case NextAction.THROW: 995 next = null; 996 break; 997 case NextAction.SUSPEND: 998 if (next != null) { 999 // Only store the 'last' tube when we're processing 1000 // a request, since conts array is for processResponse 1001 pushCont(last); 1002 } 1003 next = na.next; 1004 justSuspended = suspend(); 1005 break; 1006 default: 1007 throw new AssertionError(); 1008 } 1009 } catch (RuntimeException t) { 1010 if (traceEnabled) 1011 LOGGER.log(Level.FINER, getName() + " Caught " + t + ". Start stack unwinding", t); 1012 throwable = t; 1013 } catch (Error t) { 1014 if (traceEnabled) 1015 LOGGER.log(Level.FINER, getName() + " Caught " + t + ". Start stack unwinding", t); 1016 throwable = t; 1017 } 1018 1019 dumpFiberContext("After tube execution"); 1020 } 1021 1022 if (isCanceled) { 1023 next = null; 1024 throwable = null; 1025 contsSize = 0; 1026 } 1027 1028 // there's nothing we can execute right away. 1029 // we'll be back when this fiber is resumed. 1030 1031 } finally { 1032 CURRENT_FIBER.set(old); 1033 } 1034 } 1035 1036 private void pushCont(Tube tube) { 1037 conts[contsSize++] = tube; 1038 1039 // expand if needed 1040 int len = conts.length; 1041 if (contsSize == len) { 1042 Tube[] newBuf = new Tube[len * 2]; 1043 System.arraycopy(conts, 0, newBuf, 0, len); 1044 conts = newBuf; 1045 } 1046 } 1047 1048 private Tube popCont() { 1049 return conts[--contsSize]; 1050 } 1051 1052 private Tube peekCont() { 1053 int index = contsSize - 1; 1054 if (index >= 0 && index < conts.length) { 1055 return conts[index]; 1056 } else { 1057 return null; 1058 } 1059 } 1060 1061 /** 1062 * Only to be used by Tubes that manipulate the Fiber to create alternate flows 1063 * @since 2.2.6 1064 */ 1065 public void resetCont(Tube[] conts, int contsSize) { 1066 this.conts = conts; 1067 this.contsSize = contsSize; 1068 } 1069 1070 /** 1071 * Returns true if the fiber needs to block its execution. 1072 */ 1073 private boolean isBlocking(boolean justSuspended) { 1074 if (synchronous) { 1075 while (suspendedCount == 1) 1076 try { 1077 if (isTraceEnabled()) { 1078 LOGGER.fine(getName() + " is blocking thread " + Thread.currentThread().getName()); 1079 } 1080 wait(); // the synchronized block is the whole runSync method. 1081 } catch (InterruptedException e) { 1082 // remember that we are interrupted, but don't respond to it 1083 // right away. This behavior is in line with what happens 1084 // when you are actually running the whole thing synchronously. 1085 interrupted = true; 1086 } 1087 return false; 1088 } 1089 else 1090 return justSuspended || suspendedCount==1; 1091 } 1092 1093 private String getName() { 1094 return "engine-" + owner.id + "fiber-" + id; 1095 } 1096 1097 @Override 1098 public String toString() { 1099 return getName(); 1100 } 1101 1102 /** 1103 * Gets the current {@link Packet} associated with this fiber. 1104 * <p/> 1105 * <p/> 1106 * This method returns null if no packet has been associated with the fiber yet. 1107 */ 1108 public 1109 @Nullable 1110 Packet getPacket() { 1198 */ 1199 private static final AtomicInteger iotaGen = new AtomicInteger(); 1200 1201 private static boolean isTraceEnabled() { 1202 return LOGGER.isLoggable(Level.FINE); 1203 } 1204 1205 private static final Logger LOGGER = Logger.getLogger(Fiber.class.getName()); 1206 1207 1208 private static final ReentrantLock serializedExecutionLock = new ReentrantLock(); 1209 1210 /** 1211 * Set this boolean to true to execute fibers sequentially one by one. 1212 * See class javadoc. 1213 */ 1214 public static volatile boolean serializeExecution = Boolean.getBoolean(Fiber.class.getName() + ".serialize"); 1215 1216 private final Set<Component> components = new CopyOnWriteArraySet<Component>(); 1217 1218 public <S> S getSPI(Class<S> spiType) { 1219 for(Component c : components) { 1220 S spi = c.getSPI(spiType); 1221 if (spi != null) 1222 return spi; 1223 } 1224 return null; 1225 } 1226 1227 public Set<Component> getComponents() { 1228 return components; 1229 } 1230 } | 1 /* 2 * Copyright (c) 1997, 2013, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package com.sun.xml.internal.ws.api.pipe; 27 28 import com.sun.istack.internal.NotNull; 29 import com.sun.istack.internal.Nullable; 30 import com.sun.xml.internal.ws.api.Cancelable; 31 import com.sun.xml.internal.ws.api.Component; 32 import com.sun.xml.internal.ws.api.ComponentRegistry; 33 import com.sun.xml.internal.ws.api.SOAPVersion; 34 import com.sun.xml.internal.ws.api.addressing.AddressingVersion; 35 import com.sun.xml.internal.ws.api.message.AddressingUtils; 36 import com.sun.xml.internal.ws.api.message.Packet; 37 import com.sun.xml.internal.ws.api.pipe.helper.AbstractFilterTubeImpl; 38 import com.sun.xml.internal.ws.api.pipe.helper.AbstractTubeImpl; 39 import com.sun.xml.internal.ws.api.server.Adapter; 40 import com.sun.xml.internal.ws.api.server.Container; 41 import com.sun.xml.internal.ws.api.server.ContainerResolver; 42 43 import java.util.ArrayList; 44 import java.util.List; 45 import java.util.Set; 46 import java.util.concurrent.CopyOnWriteArraySet; 47 import java.util.concurrent.atomic.AtomicInteger; 48 import java.util.concurrent.locks.Condition; 49 import java.util.concurrent.locks.ReentrantLock; 50 import java.util.logging.Level; 51 import java.util.logging.Logger; 52 53 import javax.xml.ws.Holder; 54 import javax.xml.ws.WebServiceException; 55 56 /** 57 * User-level thread. Represents the execution of one request/response processing. 58 * <p/> 59 * <p/> 60 * JAX-WS RI is capable of running a large number of request/response concurrently by 61 * using a relatively small number of threads. This is made possible by utilizing 62 * a {@link Fiber} — a user-level thread that gets created for each request/response 63 * processing. 64 * <p/> 65 * <p/> 66 * A fiber remembers where in the pipeline the processing is at, what needs to be 67 * executed on the way out (when processing response), and other additional information 68 * specific to the execution of a particular request/response. 69 * <p/> 70 * <h2>Suspend/Resume</h2> 71 * <p/> 72 * Fiber can be {@link NextAction#suspend() suspended} by a {@link Tube}. 73 * When a fiber is suspended, it will be kept on the side until it is 74 * {@link #resume(Packet) resumed}. This allows threads to go execute 75 * other runnable fibers, allowing efficient utilization of smaller number of 98 * Setting the {@link #LOGGER} for FINE would give you basic start/stop/resume/suspend 99 * level logging. Using FINER would cause more detailed logging, which includes 100 * what tubes are executed in what order and how they behaved. 101 * <p/> 102 * <p/> 103 * When you debug the server side, consider setting {@link Fiber#serializeExecution} 104 * to true, so that execution of fibers are serialized. Debugging a server 105 * with more than one running threads is very tricky, and this switch will 106 * prevent that. This can be also enabled by setting the system property on. 107 * See the source code. 108 * 109 * @author Kohsuke Kawaguchi 110 * @author Jitendra Kotamraju 111 */ 112 public final class Fiber implements Runnable, Cancelable, ComponentRegistry { 113 114 /** 115 * Callback interface for notification of suspend and resume. 116 * 117 * @since 2.2.6 118 * @deprecated Use {@link NextAction#suspend(Runnable)} 119 */ 120 public interface Listener { 121 /** 122 * Fiber has been suspended. Implementations of this callback may resume the Fiber. 123 * @param fiber Fiber 124 */ 125 public void fiberSuspended(Fiber fiber); 126 127 /** 128 * Fiber has been resumed. Behavior is undefined if implementations of this callback attempt to suspend the Fiber. 129 * @param fiber Fiber 130 */ 131 public void fiberResumed(Fiber fiber); 132 } 133 134 private final List<Listener> _listeners = new ArrayList<Listener>(); 135 136 /** 137 * Adds suspend/resume callback listener 138 * @param listener Listener 139 * @since 2.2.6 140 * @deprecated 141 */ 142 public void addListener(Listener listener) { 143 synchronized(_listeners) { 144 if (!_listeners.contains(listener)) { 145 _listeners.add(listener); 146 } 147 } 148 } 149 150 /** 151 * Removes suspend/resume callback listener 152 * @param listener Listener 153 * @since 2.2.6 154 * @deprecated 155 */ 156 public void removeListener(Listener listener) { 157 synchronized(_listeners) { 158 _listeners.remove(listener); 159 } 160 } 161 162 List<Listener> getCurrentListeners() { 163 synchronized(_listeners) { 164 return new ArrayList<Listener>(_listeners); 165 } 166 } 167 168 private void clearListeners() { 169 synchronized(_listeners) { 170 _listeners.clear(); 171 } 172 } 173 174 /** 175 * {@link Tube}s whose {@link Tube#processResponse(Packet)} method needs 176 * to be invoked on the way back. 177 */ 178 private Tube[] conts = new Tube[16]; 179 private int contsSize; 180 181 /** 182 * If this field is non-null, the next instruction to execute is 203 * <li>Tube decides that the fiber needs to be suspended to wait for the external event. 204 * <li>Tube hooks up fiber with some external mechanism (like NIO channel selector) 205 * <li>Tube returns with {@link NextAction#suspend()}. 206 * <li>"External mechanism" becomes signal state and invokes {@link Fiber#resume(Packet)} 207 * to wake up fiber 208 * <li>{@link Fiber#doRun} invokes {@link Fiber#suspend()}. 209 * </ol> 210 * <p/> 211 * <p/> 212 * Using int, this will work OK because {@link #suspendedCount} becomes -1 when 213 * {@link #resume(Packet)} occurs before {@link #suspend()}. 214 * <p/> 215 * <p/> 216 * Increment and decrement is guarded by 'this' object. 217 */ 218 private volatile int suspendedCount = 0; 219 220 private volatile boolean isInsideSuspendCallbacks = false; 221 222 /** 223 * Is this {@link Fiber} currently running in the synchronous mode? 224 */ 225 private boolean synchronous; 226 227 private boolean interrupted; 228 229 private final int id; 230 231 /** 232 * Active {@link FiberContextSwitchInterceptor}s for this fiber. 233 */ 234 private List<FiberContextSwitchInterceptor> interceptors; 235 236 /** 237 * Fiber's context {@link ClassLoader}. 238 */ 239 private 240 @Nullable 241 ClassLoader contextClassLoader; 242 243 private 244 @Nullable 245 CompletionCallback completionCallback; 246 247 private boolean isDeliverThrowableInPacket = false; 248 249 public void setDeliverThrowableInPacket(boolean isDeliverThrowableInPacket) { 250 this.isDeliverThrowableInPacket = isDeliverThrowableInPacket; 251 } 252 253 /** 254 * The thread on which this Fiber is currently executing, if applicable. 255 */ 256 private Thread currentThread; 257 258 /** 259 * Replace uses of synchronized(this) with this lock so that we can control 260 * unlocking for resume use cases 261 */ 262 private final ReentrantLock lock = new ReentrantLock(); 263 private final Condition condition = lock.newCondition(); 264 265 private volatile boolean isCanceled; 266 267 /** 268 * Set to true if this fiber is started asynchronously, to avoid 269 * doubly-invoking completion code. 270 */ 271 private boolean started; 272 273 /** 274 * Set to true if this fiber is started sync but allowed to run async. 275 * This property exists for use cases where the processing model is fundamentally async 276 * but some requirement or feature mandates that part of the tubeline run synchronously. For 277 * instance, WS-ReliableMessaging with non-anonymous addressing is compatible with running 278 * asynchronously, but if in-order message delivery is used then message processing must assign 279 * a message number before the remainder of the processing can be asynchronous. 280 */ 281 private boolean startedSync; 282 283 /** 284 * Callback to be invoked when a {@link Fiber} finishes execution. 285 */ 286 public interface CompletionCallback { 287 /** 288 * Indicates that the fiber has finished its execution. 289 * <p/> 290 * <p/> 291 * Since the JAX-WS RI runs asynchronously, 292 * this method maybe invoked by a different thread 293 * than any of the threads that started it or run a part of tubeline. 294 */ 295 void onCompletion(@NotNull Packet response); 296 297 /** 298 * Indicates that the fiber has finished abnormally, by throwing a given {@link Throwable}. 299 */ 300 void onCompletion(@NotNull Throwable error); 301 } 302 303 Fiber(Engine engine) { 304 this.owner = engine; 305 id = iotaGen.incrementAndGet(); 306 if (isTraceEnabled()) { 307 LOGGER.log(Level.FINE, "{0} created", getName()); 308 } 309 310 // if this is run from another fiber, then we naturally inherit its context classloader, 311 // so this code works for fiber->fiber inheritance just fine. 312 contextClassLoader = Thread.currentThread().getContextClassLoader(); 313 } 314 315 /** 316 * Starts the execution of this fiber asynchronously. 317 * <p/> 318 * <p/> 319 * This method works like {@link Thread#start()}. 320 * 321 * @param tubeline The first tube of the tubeline that will act on the packet. 322 * @param request The request packet to be passed to <tt>startPoint.processRequest()</tt>. 323 * @param completionCallback The callback to be invoked when the processing is finished and the 324 * final response packet is available. 325 * @see #runSync(Tube, Packet) 326 */ 327 public void start(@NotNull Tube tubeline, @NotNull Packet request, @Nullable CompletionCallback completionCallback) { 328 start(tubeline, request, completionCallback, false); 329 } 330 331 private void dumpFiberContext(String desc) { 332 if(isTraceEnabled()) { 333 String action = null; 334 String msgId = null; 335 if (packet != null) { 336 for (SOAPVersion sv: SOAPVersion.values()) { 337 for (AddressingVersion av: AddressingVersion.values()) { 338 action = packet.getMessage() != null ? AddressingUtils.getAction(packet.getMessage().getHeaders(), av, sv) : null; 339 msgId = packet.getMessage() != null ? AddressingUtils.getMessageID(packet.getMessage().getHeaders(), av, sv) : null; 340 if (action != null || msgId != null) { 341 break; 342 } 343 } 344 if (action != null || msgId != null) { 345 break; 346 } 347 } 348 } 349 String actionAndMsgDesc; 350 if (action == null && msgId == null) { 351 actionAndMsgDesc = "NO ACTION or MSG ID"; 352 } else { 353 actionAndMsgDesc = "'" + action + "' and msgId '" + msgId + "'"; 354 } 355 356 String tubeDesc; 357 if (next != null) { 358 tubeDesc = next.toString() + ".processRequest()"; 359 } else { 360 tubeDesc = peekCont() + ".processResponse()"; 361 } 362 363 LOGGER.log(Level.FINE, "{0} {1} with {2} and ''current'' tube {3} from thread {4} with Packet: {5}", new Object[]{getName(), desc, actionAndMsgDesc, tubeDesc, Thread.currentThread().getName(), packet != null ? packet.toShortString() : null}); 364 } 365 } 366 367 /** 368 * Starts the execution of this fiber. 369 * 370 * If forceSync is true, then the fiber is started for an ostensibly async invocation, 371 * but allows for some portion of the tubeline to run sync with the calling 372 * client instance (Port/Dispatch instance). This allows tubes that enforce 373 * ordering to see requests in the order they were sent at the point the 374 * client invoked them. 375 * <p> 376 * The forceSync parameter will be true only when the caller (e.g. AsyncInvoker or 377 * SEIStub) knows one or more tubes need to enforce ordering and thus need 378 * to run sync with the client. Such tubes can return 379 * NextAction.INVOKE_ASYNC to indicate that the next tube in the tubeline 380 * should be invoked async to the current thread. 381 * 382 * <p> 383 * This method works like {@link Thread#start()}. 430 * This method is implemented in a race-free way. Another thread can invoke 431 * this method even before this fiber goes into the suspension mode. So the caller 432 * need not worry about synchronizing {@link NextAction#suspend()} and this method. 433 * 434 * @param resumePacket packet used in the resumed processing 435 */ 436 public void resume(@NotNull Packet resumePacket) { 437 resume(resumePacket, false); 438 } 439 440 /** 441 * Similar to resume(Packet) but allowing the Fiber to be resumed 442 * synchronously (in the current Thread). If you want to know when the 443 * fiber completes (not when this method returns) then add/wrap a 444 * CompletionCallback on this Fiber. 445 * For example, an asynchronous response endpoint that supports WS-ReliableMessaging 446 * including in-order message delivery may need to resume the Fiber synchronously 447 * until message order is confirmed prior to returning to asynchronous processing. 448 * @since 2.2.6 449 */ 450 public void resume(@NotNull Packet resumePacket, 451 boolean forceSync) { 452 resume(resumePacket, forceSync, null); 453 } 454 455 /** 456 * Similar to resume(Packet, boolean) but allowing the Fiber to be resumed 457 * and at the same time atomically assign a new CompletionCallback to it. 458 * @since 2.2.6 459 */ 460 public void resume(@NotNull Packet resumePacket, 461 boolean forceSync, 462 CompletionCallback callback) { 463 lock.lock(); 464 try { 465 if (callback != null) { 466 setCompletionCallback(callback); 467 } 468 if(isTraceEnabled()) 469 LOGGER.log(Level.FINE, "{0} resuming. Will have suspendedCount={1}", new Object[]{getName(), suspendedCount-1}); 470 packet = resumePacket; 471 if( --suspendedCount == 0 ) { 472 if (!isInsideSuspendCallbacks) { 473 List<Listener> listeners = getCurrentListeners(); 474 for (Listener listener: listeners) { 475 try { 476 listener.fiberResumed(this); 477 } catch (Throwable e) { 478 if (isTraceEnabled()) 479 LOGGER.log(Level.FINE, "Listener {0} threw exception: {1}", new Object[]{listener, e.getMessage()}); 480 } 481 } 482 483 if(synchronous) { 484 condition.signalAll(); 485 } else if (forceSync || startedSync) { 486 run(); 487 } else { 488 dumpFiberContext("resuming (async)"); 489 owner.addRunnable(this); 490 } 491 } 492 } else { 493 if (isTraceEnabled()) { 494 LOGGER.log(Level.FINE, "{0} taking no action on resume because suspendedCount != 0: {1}", new Object[]{getName(), suspendedCount}); 495 } 496 } 497 } finally { 498 lock.unlock(); 499 } 500 } 501 502 /** 503 * Wakes up a suspended fiber and begins response processing. 504 * @since 2.2.6 505 */ 506 public void resumeAndReturn(@NotNull Packet resumePacket, 507 boolean forceSync) { 508 if(isTraceEnabled()) 509 LOGGER.log(Level.FINE, "{0} resumed with Return Packet", getName()); 510 next = null; 511 resume(resumePacket, forceSync); 512 } 513 514 /** 515 * Wakes up a suspended fiber with an exception. 516 * <p/> 517 * <p/> 518 * The execution of the suspended fiber will be resumed in the response 519 * processing direction, by calling the {@link Tube#processException(Throwable)} method 520 * on the next/first {@link Tube} in the {@link Fiber}'s processing stack with 521 * the specified exception as the parameter. 522 * <p/> 523 * <p/> 524 * This method is implemented in a race-free way. Another thread can invoke 525 * this method even before this fiber goes into the suspension mode. So the caller 526 * need not worry about synchronizing {@link NextAction#suspend()} and this method. 527 * 528 * @param throwable exception that is used in the resumed processing 529 */ 530 public void resume(@NotNull Throwable throwable) { 531 resume(throwable, packet, false); 532 } 533 534 /** 535 * Wakes up a suspended fiber with an exception. 536 * <p/> 537 * <p/> 538 * The execution of the suspended fiber will be resumed in the response 539 * processing direction, by calling the {@link Tube#processException(Throwable)} method 540 * on the next/first {@link Tube} in the {@link Fiber}'s processing stack with 541 * the specified exception as the parameter. 542 * <p/> 543 * <p/> 544 * This method is implemented in a race-free way. Another thread can invoke 545 * this method even before this fiber goes into the suspension mode. So the caller 546 * need not worry about synchronizing {@link NextAction#suspend()} and this method. 547 * 548 * @param throwable exception that is used in the resumed processing 549 * @param packet Packet that will be visible on the Fiber after the resume 550 * @since 2.2.8 551 */ 552 public void resume(@NotNull Throwable throwable, @NotNull Packet packet) { 553 resume(throwable, packet, false); 554 } 555 556 /** 557 * Wakes up a suspend fiber with an exception. 558 * 559 * If forceSync is true, then the suspended fiber will resume with 560 * synchronous processing on the current thread. This will continue 561 * until some Tube indicates that it is safe to switch to asynchronous 562 * processing. 563 * 564 * @param error exception that is used in the resumed processing 565 * @param forceSync if processing begins synchronously 566 * @since 2.2.6 567 */ 568 public void resume(@NotNull Throwable error, 569 boolean forceSync) { 570 resume(error, packet, forceSync); 571 } 572 573 /** 574 * Wakes up a suspend fiber with an exception. 575 * 576 * If forceSync is true, then the suspended fiber will resume with 577 * synchronous processing on the current thread. This will continue 578 * until some Tube indicates that it is safe to switch to asynchronous 579 * processing. 580 * 581 * @param error exception that is used in the resumed processing 582 * @param packet Packet that will be visible on the Fiber after the resume 583 * @param forceSync if processing begins synchronously 584 * @since 2.2.8 585 */ 586 public void resume(@NotNull Throwable error, 587 @NotNull Packet packet, 588 boolean forceSync) { 589 if(isTraceEnabled()) 590 LOGGER.log(Level.FINE, "{0} resumed with Return Throwable", getName()); 591 next = null; 592 throwable = error; 593 resume(packet, forceSync); 594 } 595 596 /** 597 * Marks this Fiber as cancelled. A cancelled Fiber will never invoke its completion callback 598 * @param mayInterrupt if cancel should use {@link Thread#interrupt()} 599 * @see java.util.concurrent.Future#cancel(boolean) 600 * @since 2.2.6 601 */ 602 @Override 603 public void cancel(boolean mayInterrupt) { 604 isCanceled = true; 605 if (mayInterrupt) { 606 // synchronized(this) is used as Thread running Fiber will be holding lock 607 synchronized(this) { 608 if (currentThread != null) 609 currentThread.interrupt(); 610 } 611 } 612 } 613 614 /** 615 * Suspends this fiber's execution until the resume method is invoked. 616 * <p/> 617 * The call returns immediately, and when the fiber is resumed 618 * the execution picks up from the last scheduled continuation. 619 * @param onExitRunnable runnable to be invoked after fiber is marked for suspension 620 * @return if control loop must exit 621 */ 622 private boolean suspend(Holder<Boolean> isRequireUnlock, Runnable onExitRunnable) { 623 if(isTraceEnabled()) { 624 LOGGER.log(Level.FINE, "{0} suspending. Will have suspendedCount={1}", new Object[]{getName(), suspendedCount+1}); 625 if (suspendedCount > 0) { 626 LOGGER.log(Level.FINE, "WARNING - {0} suspended more than resumed. Will require more than one resume to actually resume this fiber.", getName()); 627 } 628 } 629 630 List<Listener> listeners = getCurrentListeners(); 631 if (++suspendedCount == 1) { 632 isInsideSuspendCallbacks = true; 633 try { 634 for (Listener listener: listeners) { 635 try { 636 listener.fiberSuspended(this); 637 } catch (Throwable e) { 638 if(isTraceEnabled()) 639 LOGGER.log(Level.FINE, "Listener {0} threw exception: {1}", new Object[]{listener, e.getMessage()}); 640 } 641 } 642 } finally { 643 isInsideSuspendCallbacks = false; 644 } 645 } 646 647 if (suspendedCount <= 0) { 648 // suspend callback caused fiber to resume 649 for (Listener listener: listeners) { 650 try { 651 listener.fiberResumed(this); 652 } catch (Throwable e) { 653 if(isTraceEnabled()) 654 LOGGER.log(Level.FINE, "Listener {0} threw exception: {1}", new Object[]{listener, e.getMessage()}); 655 } 656 } 657 658 } else if (onExitRunnable != null) { 659 // synchronous use cases cannot disconnect from the current thread 660 if (!synchronous) { 661 /* INTENTIONALLY UNLOCKING EARLY */ 662 synchronized(this) { 663 // currentThread is protected by the monitor for this fiber so 664 // that it is accessible to cancel() even when the lock is held 665 currentThread = null; 666 } 667 lock.unlock(); 668 assert(!lock.isHeldByCurrentThread()); 669 isRequireUnlock.value = Boolean.FALSE; 670 671 try { 672 onExitRunnable.run(); 673 } catch(Throwable t) { 674 throw new OnExitRunnableException(t); 675 } 676 677 return true; 678 679 } else { 680 // for synchronous we will stay with current thread, so do not disconnect 681 if (isTraceEnabled()) 682 LOGGER.fine("onExitRunnable used with synchronous Fiber execution -- not exiting current thread"); 683 onExitRunnable.run(); 684 } 685 } 686 687 return false; 688 } 689 690 private static final class OnExitRunnableException extends RuntimeException { 691 private static final long serialVersionUID = 1L; 692 693 Throwable target; 694 695 public OnExitRunnableException(Throwable target) { 696 super((Throwable)null); // see pattern for InvocationTargetException 697 this.target = target; 698 } 699 } 700 701 /** 702 * Adds a new {@link FiberContextSwitchInterceptor} to this fiber. 703 * <p/> 704 * <p/> 705 * The newly installed fiber will take effect immediately after the current 706 * tube returns from its {@link Tube#processRequest(Packet)} or 707 * {@link Tube#processResponse(Packet)}, before the next tube begins processing. 708 * <p/> 709 * <p/> 710 * So when the tubeline consists of X and Y, and when X installs an interceptor, 711 * the order of execution will be as follows: 712 * <p/> 713 * <ol> 714 * <li>X.processRequest() 715 * <li>interceptor gets installed 716 * <li>interceptor.execute() is invoked 717 * <li>Y.processRequest() 718 * </ol> 719 */ 720 public synchronized void addInterceptor(@NotNull FiberContextSwitchInterceptor interceptor) { 721 if (interceptors == null) { 722 interceptors = new ArrayList<FiberContextSwitchInterceptor>(); 723 } else { 724 List<FiberContextSwitchInterceptor> l = new ArrayList<FiberContextSwitchInterceptor>(); 725 l.addAll(interceptors); 726 interceptors = l; 727 } 728 interceptors.add(interceptor); 729 } 730 731 /** 732 * Removes a {@link FiberContextSwitchInterceptor} from this fiber. 733 * <p/> 734 * <p/> 735 * The removal of the interceptor takes effect immediately after the current 736 * tube returns from its {@link Tube#processRequest(Packet)} or 737 * {@link Tube#processResponse(Packet)}, before the next tube begins processing. 738 * <p/> 739 * <p/> 740 * <p/> 741 * So when the tubeline consists of X and Y, and when Y uninstalls an interceptor 742 * on the way out, then the order of execution will be as follows: 743 * <p/> 744 * <ol> 745 * <li>Y.processResponse() (notice that this happens with interceptor.execute() in the callstack) 746 * <li>interceptor gets uninstalled 747 * <li>interceptor.execute() returns 748 * <li>X.processResponse() 749 * </ol> 750 * 751 * @return true if the specified interceptor was removed. False if 752 * the specified interceptor was not registered with this fiber to begin with. 753 */ 754 public synchronized boolean removeInterceptor(@NotNull FiberContextSwitchInterceptor interceptor) { 755 if (interceptors != null) { 756 boolean result = interceptors.remove(interceptor); 757 if (interceptors.isEmpty()) 758 interceptors = null; 759 else { 760 List<FiberContextSwitchInterceptor> l = new ArrayList<FiberContextSwitchInterceptor>(); 761 l.addAll(interceptors); 762 interceptors = l; 763 } 764 return result; 765 } 766 return false; 767 } 768 769 /** 770 * Gets the context {@link ClassLoader} of this fiber. 771 */ 772 public 773 @Nullable 774 ClassLoader getContextClassLoader() { 775 return contextClassLoader; 776 } 777 778 /** 779 * Sets the context {@link ClassLoader} of this fiber. 780 */ 781 public ClassLoader setContextClassLoader(@Nullable ClassLoader contextClassLoader) { 782 ClassLoader r = this.contextClassLoader; 783 this.contextClassLoader = contextClassLoader; 784 return r; 785 } 786 787 /** 788 * DO NOT CALL THIS METHOD. This is an implementation detail 789 * of {@link Fiber}. 790 */ 791 @Deprecated 792 @Override 793 public void run() { 794 Container old = ContainerResolver.getDefault().enterContainer(owner.getContainer()); 795 try { 796 assert !synchronous; 797 // doRun returns true to indicate an early exit from fiber processing 798 if (!doRun()) { 799 if (startedSync && suspendedCount == 0 && 800 (next != null || contsSize > 0)) { 801 // We bailed out of running this fiber we started as sync, and now 802 // want to finish running it async 803 startedSync = false; 804 // Start back up as an async fiber 805 dumpFiberContext("restarting (async) after startSync"); 806 owner.addRunnable(this); 807 } else { 808 completionCheck(); 809 } 810 } 811 } finally { 812 ContainerResolver.getDefault().exitContainer(old); 813 } 814 } 815 816 /** 817 * Runs a given {@link Tube} (and everything thereafter) synchronously. 818 * <p/> 819 * <p/> 820 * This method blocks and returns only when all the successive {@link Tube}s 821 * complete their request/response processing. This method can be used 822 * if a {@link Tube} needs to fallback to synchronous processing. 823 * <p/> 824 * <h3>Example:</h3> 825 * <pre> 826 * class FooTube extends {@link AbstractFilterTubeImpl} { 827 * NextAction processRequest(Packet request) { 828 * // run everything synchronously and return with the response packet 829 * return doReturnWith(Fiber.current().runSync(next,request)); 830 * } 831 * NextAction processResponse(Packet response) { 832 * // never be invoked 833 * } 834 * } 835 * </pre> 836 * 837 * @param tubeline The first tube of the tubeline that will act on the packet. 838 * @param request The request packet to be passed to <tt>startPoint.processRequest()</tt>. 839 * @return The response packet to the <tt>request</tt>. 840 * @see #start(Tube, Packet, CompletionCallback) 841 */ 842 public 843 @NotNull 844 Packet runSync(@NotNull Tube tubeline, @NotNull Packet request) { 845 lock.lock(); 846 try { 847 // save the current continuation, so that we return runSync() without executing them. 848 final Tube[] oldCont = conts; 849 final int oldContSize = contsSize; 850 final boolean oldSynchronous = synchronous; 851 final Tube oldNext = next; 852 853 if (oldContSize > 0) { 854 conts = new Tube[16]; 855 contsSize = 0; 856 } 857 858 try { 859 synchronous = true; 860 this.packet = request; 861 next = tubeline; 862 doRun(); 863 if (throwable != null) { 864 if (isDeliverThrowableInPacket) { 865 packet.addSatellite(new ThrowableContainerPropertySet(throwable)); 866 } else { 867 if (throwable instanceof RuntimeException) { 868 throw (RuntimeException) throwable; 869 } 870 if (throwable instanceof Error) { 871 throw (Error) throwable; 872 } 873 // our system is supposed to only accept Error or RuntimeException 874 throw new AssertionError(throwable); 875 } 876 } 877 return this.packet; 878 } finally { 879 conts = oldCont; 880 contsSize = oldContSize; 881 synchronous = oldSynchronous; 882 next = oldNext; 883 if(interrupted) { 884 Thread.currentThread().interrupt(); 885 interrupted = false; 886 } 887 if(!started && !startedSync) 888 completionCheck(); 889 } 890 } finally { 891 lock.unlock(); 892 } 893 } 894 895 private void completionCheck() { 896 lock.lock(); 897 try { 898 // Don't trigger completion and callbacks if fiber is suspended 899 if(!isCanceled && contsSize==0 && suspendedCount == 0) { 900 if(isTraceEnabled()) 901 LOGGER.log(Level.FINE, "{0} completed", getName()); 902 clearListeners(); 903 condition.signalAll(); 904 if (completionCallback != null) { 905 if (throwable != null) { 906 if (isDeliverThrowableInPacket) { 907 packet.addSatellite(new ThrowableContainerPropertySet(throwable)); 908 completionCallback.onCompletion(packet); 909 } else 910 completionCallback.onCompletion(throwable); 911 } else 912 completionCallback.onCompletion(packet); 913 } 914 } 915 } finally { 916 lock.unlock(); 917 } 918 } 919 920 /** 921 * Invokes all registered {@link InterceptorHandler}s and then call into 922 * {@link Fiber#__doRun()}. 923 */ 924 private class InterceptorHandler implements FiberContextSwitchInterceptor.Work<Tube, Tube> { 925 private final Holder<Boolean> isUnlockRequired; 926 private final List<FiberContextSwitchInterceptor> ints; 927 928 /** 929 * Index in {@link Fiber#interceptors} to invoke next. 930 */ 931 private int idx; 932 933 public InterceptorHandler(Holder<Boolean> isUnlockRequired, List<FiberContextSwitchInterceptor> ints) { 934 this.isUnlockRequired = isUnlockRequired; 935 this.ints = ints; 936 } 937 938 /** 939 * Initiate the interception, and eventually invokes {@link Fiber#__doRun()}. 940 */ 941 Tube invoke(Tube next) { 942 idx = 0; 943 return execute(next); 944 } 945 946 @Override 947 public Tube execute(Tube next) { 948 if (idx == ints.size()) { 949 Fiber.this.next = next; 950 if (__doRun(isUnlockRequired, ints)) 951 return PLACEHOLDER; 952 } else { 953 FiberContextSwitchInterceptor interceptor = ints.get(idx++); 954 return interceptor.execute(Fiber.this, next, this); 955 } 956 return Fiber.this.next; 957 } 958 } 959 960 private static final PlaceholderTube PLACEHOLDER = new PlaceholderTube(); 961 962 private static class PlaceholderTube extends AbstractTubeImpl { 963 964 @Override 965 public NextAction processRequest(Packet request) { 966 throw new UnsupportedOperationException(); 967 } 968 969 @Override 970 public NextAction processResponse(Packet response) { 971 throw new UnsupportedOperationException(); 972 } 973 974 @Override 975 public NextAction processException(Throwable t) { 976 return doThrow(t); 977 } 978 979 @Override 980 public void preDestroy() { 981 } 982 983 @Override 984 public PlaceholderTube copy(TubeCloner cloner) { 985 throw new UnsupportedOperationException(); 986 } 987 } 988 989 /** 990 * Executes the fiber as much as possible. 991 * 992 */ 993 private boolean doRun() { 994 dumpFiberContext("running"); 995 996 if (serializeExecution) { 997 serializedExecutionLock.lock(); 998 try { 999 return _doRun(next); 1000 } finally { 1001 serializedExecutionLock.unlock(); 1002 } 1003 } else { 1004 return _doRun(next); 1005 } 1006 } 1007 1008 private boolean _doRun(Tube next) { 1009 // isRequireUnlock will contain Boolean.FALSE when lock has already been released in suspend 1010 Holder<Boolean> isRequireUnlock = new Holder<Boolean>(Boolean.TRUE); 1011 lock.lock(); 1012 try { 1013 List<FiberContextSwitchInterceptor> ints; 1014 ClassLoader old; 1015 synchronized(this) { 1016 ints = interceptors; 1017 1018 // currentThread is protected by the monitor for this fiber so 1019 // that it is accessible to cancel() even when the lock is held 1020 currentThread = Thread.currentThread(); 1021 if (isTraceEnabled()) { 1022 LOGGER.log(Level.FINE, "Thread entering _doRun(): {0}", currentThread); 1023 } 1024 1025 old = currentThread.getContextClassLoader(); 1026 currentThread.setContextClassLoader(contextClassLoader); 1027 } 1028 1029 try { 1030 boolean needsToReenter = false; 1031 do { 1032 // if interceptors are set, go through the interceptors. 1033 if (ints == null) { 1034 this.next = next; 1035 if (__doRun(isRequireUnlock, ints)) { 1036 return true; 1037 } 1038 } else { 1039 next = new InterceptorHandler(isRequireUnlock, ints).invoke(next); 1040 if (next == PLACEHOLDER) { 1041 return true; 1042 } 1043 } 1044 1045 synchronized(this) { 1046 needsToReenter = (ints != interceptors); 1047 if (needsToReenter) 1048 ints = interceptors; 1049 } 1050 } while (needsToReenter); 1051 } catch(OnExitRunnableException o) { 1052 // catching this exception indicates onExitRunnable in suspend() threw. 1053 // we must still avoid double unlock 1054 Throwable t = o.target; 1055 if (t instanceof WebServiceException) 1056 throw (WebServiceException) t; 1057 throw new WebServiceException(t); 1058 } finally { 1059 // don't reference currentThread here because fiber processing 1060 // may already be running on a different thread (Note: isAlreadyExited 1061 // tracks this state 1062 Thread thread = Thread.currentThread(); 1063 thread.setContextClassLoader(old); 1064 if (isTraceEnabled()) { 1065 LOGGER.log(Level.FINE, "Thread leaving _doRun(): {0}", thread); 1066 } 1067 } 1068 1069 return false; 1070 } finally { 1071 if (isRequireUnlock.value) { 1072 synchronized(this) { 1073 currentThread = null; 1074 } 1075 lock.unlock(); 1076 } 1077 } 1078 } 1079 1080 /** 1081 * To be invoked from {@link #doRun()}. 1082 * 1083 * @see #doRun() 1084 */ 1085 private boolean __doRun(Holder<Boolean> isRequireUnlock, List<FiberContextSwitchInterceptor> originalInterceptors) { 1086 assert(lock.isHeldByCurrentThread()); 1087 1088 final Fiber old = CURRENT_FIBER.get(); 1089 CURRENT_FIBER.set(this); 1090 1091 // if true, lots of debug messages to show what's being executed 1092 final boolean traceEnabled = LOGGER.isLoggable(Level.FINER); 1093 1094 try { 1095 boolean abortResponse = false; 1096 while(isReady(originalInterceptors)) { 1097 if (isCanceled) { 1098 next = null; 1099 throwable = null; 1100 contsSize = 0; 1101 break; 1102 } 1103 1104 try { 1105 NextAction na; 1106 Tube last; 1107 if(throwable!=null) { 1108 if(contsSize==0 || abortResponse) { 1109 contsSize = 0; // abortResponse case 1110 // nothing else to execute. we are done. 1111 return false; 1112 } 1113 last = popCont(); 1114 if (traceEnabled) 1115 LOGGER.log(Level.FINER, "{0} {1}.processException({2})", new Object[]{getName(), last, throwable}); 1116 na = last.processException(throwable); 1117 } else { 1118 if(next!=null) { 1119 if(traceEnabled) 1120 LOGGER.log(Level.FINER, "{0} {1}.processRequest({2})", new Object[]{getName(), next, packet != null ? "Packet@"+Integer.toHexString(packet.hashCode()) : "null"}); 1121 na = next.processRequest(packet); 1122 last = next; 1123 } else { 1124 if(contsSize==0 || abortResponse) { 1125 // nothing else to execute. we are done. 1126 contsSize = 0; 1127 return false; 1128 } 1129 last = popCont(); 1130 if(traceEnabled) 1131 LOGGER.log(Level.FINER, "{0} {1}.processResponse({2})", new Object[]{getName(), last, packet != null ? "Packet@"+Integer.toHexString(packet.hashCode()) : "null"}); 1132 na = last.processResponse(packet); 1133 } 1134 } 1135 1136 if (traceEnabled) 1137 LOGGER.log(Level.FINER, "{0} {1} returned with {2}", new Object[]{getName(), last, na}); 1138 1139 // If resume is called before suspend, then make sure 1140 // resume(Packet) is not lost 1141 if (na.kind != NextAction.SUSPEND) { 1142 // preserve in-flight packet so that processException may inspect 1143 if (na.kind != NextAction.THROW && 1144 na.kind != NextAction.THROW_ABORT_RESPONSE) 1145 packet = na.packet; 1146 throwable = na.throwable; 1147 } 1148 1149 switch(na.kind) { 1150 case NextAction.INVOKE: 1151 case NextAction.INVOKE_ASYNC: 1152 pushCont(last); 1153 // fall through next 1154 case NextAction.INVOKE_AND_FORGET: 1155 next = na.next; 1156 if (na.kind == NextAction.INVOKE_ASYNC 1157 && startedSync) { 1158 // Break out here 1159 return false; 1160 } 1161 break; 1162 case NextAction.THROW_ABORT_RESPONSE: 1163 case NextAction.ABORT_RESPONSE: 1164 abortResponse = true; 1165 if (isTraceEnabled()) { 1166 LOGGER.log(Level.FINE, "Fiber {0} is aborting a response due to exception: {1}", new Object[]{this, na.throwable}); 1167 } 1168 case NextAction.RETURN: 1169 case NextAction.THROW: 1170 next = null; 1171 break; 1172 case NextAction.SUSPEND: 1173 if (next != null) { 1174 // Only store the 'last' tube when we're processing 1175 // a request, since conts array is for processResponse 1176 pushCont(last); 1177 } 1178 next = na.next; 1179 if(suspend(isRequireUnlock, na.onExitRunnable)) 1180 return true; // explicitly exiting control loop 1181 break; 1182 default: 1183 throw new AssertionError(); 1184 } 1185 } catch (RuntimeException t) { 1186 if (traceEnabled) 1187 LOGGER.log(Level.FINER, getName() + " Caught " + t + ". Start stack unwinding", t); 1188 throwable = t; 1189 } catch (Error t) { 1190 if (traceEnabled) 1191 LOGGER.log(Level.FINER, getName() + " Caught " + t + ". Start stack unwinding", t); 1192 throwable = t; 1193 } 1194 1195 dumpFiberContext("After tube execution"); 1196 } 1197 1198 // there's nothing we can execute right away. 1199 // we'll be back when this fiber is resumed. 1200 1201 } finally { 1202 CURRENT_FIBER.set(old); 1203 } 1204 1205 return false; 1206 } 1207 1208 private void pushCont(Tube tube) { 1209 conts[contsSize++] = tube; 1210 1211 // expand if needed 1212 int len = conts.length; 1213 if (contsSize == len) { 1214 Tube[] newBuf = new Tube[len * 2]; 1215 System.arraycopy(conts, 0, newBuf, 0, len); 1216 conts = newBuf; 1217 } 1218 } 1219 1220 private Tube popCont() { 1221 return conts[--contsSize]; 1222 } 1223 1224 private Tube peekCont() { 1225 int index = contsSize - 1; 1226 if (index >= 0 && index < conts.length) { 1227 return conts[index]; 1228 } else { 1229 return null; 1230 } 1231 } 1232 1233 /** 1234 * Only to be used by Tubes that manipulate the Fiber to create alternate flows 1235 * @since 2.2.6 1236 */ 1237 public void resetCont(Tube[] conts, int contsSize) { 1238 this.conts = conts; 1239 this.contsSize = contsSize; 1240 } 1241 1242 /** 1243 * Returns true if the fiber is ready to execute. 1244 */ 1245 private boolean isReady(List<FiberContextSwitchInterceptor> originalInterceptors) { 1246 if (synchronous) { 1247 while (suspendedCount == 1) 1248 try { 1249 if (isTraceEnabled()) { 1250 LOGGER.log(Level.FINE, "{0} is blocking thread {1}", new Object[]{getName(), Thread.currentThread().getName()}); 1251 } 1252 condition.await(); // the synchronized block is the whole runSync method. 1253 } catch (InterruptedException e) { 1254 // remember that we are interrupted, but don't respond to it 1255 // right away. This behavior is in line with what happens 1256 // when you are actually running the whole thing synchronously. 1257 interrupted = true; 1258 } 1259 1260 synchronized(this) { 1261 return interceptors == originalInterceptors; 1262 } 1263 } 1264 else { 1265 if (suspendedCount>0) 1266 return false; 1267 synchronized(this) { 1268 return interceptors == originalInterceptors; 1269 } 1270 } 1271 } 1272 1273 private String getName() { 1274 return "engine-" + owner.id + "fiber-" + id; 1275 } 1276 1277 @Override 1278 public String toString() { 1279 return getName(); 1280 } 1281 1282 /** 1283 * Gets the current {@link Packet} associated with this fiber. 1284 * <p/> 1285 * <p/> 1286 * This method returns null if no packet has been associated with the fiber yet. 1287 */ 1288 public 1289 @Nullable 1290 Packet getPacket() { 1378 */ 1379 private static final AtomicInteger iotaGen = new AtomicInteger(); 1380 1381 private static boolean isTraceEnabled() { 1382 return LOGGER.isLoggable(Level.FINE); 1383 } 1384 1385 private static final Logger LOGGER = Logger.getLogger(Fiber.class.getName()); 1386 1387 1388 private static final ReentrantLock serializedExecutionLock = new ReentrantLock(); 1389 1390 /** 1391 * Set this boolean to true to execute fibers sequentially one by one. 1392 * See class javadoc. 1393 */ 1394 public static volatile boolean serializeExecution = Boolean.getBoolean(Fiber.class.getName() + ".serialize"); 1395 1396 private final Set<Component> components = new CopyOnWriteArraySet<Component>(); 1397 1398 @Override 1399 public <S> S getSPI(Class<S> spiType) { 1400 for (Component c : components) { 1401 S spi = c.getSPI(spiType); 1402 if (spi != null) { 1403 return spi; 1404 } 1405 } 1406 return null; 1407 } 1408 1409 @Override 1410 public Set<Component> getComponents() { 1411 return components; 1412 } 1413 } |