1 /* 2 * Copyright (c) 1997, 2013, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package com.sun.xml.internal.ws.api.pipe; 27 28 import com.sun.istack.internal.NotNull; 29 import com.sun.istack.internal.Nullable; 30 import com.sun.xml.internal.ws.api.Cancelable; 31 import com.sun.xml.internal.ws.api.Component; 32 import com.sun.xml.internal.ws.api.ComponentRegistry; 33 import com.sun.xml.internal.ws.api.SOAPVersion; 34 import com.sun.xml.internal.ws.api.addressing.AddressingVersion; 35 import com.sun.xml.internal.ws.api.message.AddressingUtils; 36 import com.sun.xml.internal.ws.api.message.Packet; 37 import com.sun.xml.internal.ws.api.pipe.helper.AbstractFilterTubeImpl; 38 import com.sun.xml.internal.ws.api.pipe.helper.AbstractTubeImpl; 39 import com.sun.xml.internal.ws.api.server.Adapter; 40 import com.sun.xml.internal.ws.api.server.Container; 41 import com.sun.xml.internal.ws.api.server.ContainerResolver; 42 43 import java.util.ArrayList; 44 import java.util.List; 45 import java.util.Set; 46 import java.util.concurrent.CopyOnWriteArraySet; 47 import java.util.concurrent.atomic.AtomicInteger; 48 import java.util.concurrent.locks.Condition; 49 import java.util.concurrent.locks.ReentrantLock; 50 import java.util.logging.Level; 51 import java.util.logging.Logger; 52 53 import javax.xml.ws.Holder; 54 import javax.xml.ws.WebServiceException; 55 56 /** 57 * User-level thread. Represents the execution of one request/response processing. 58 * <p/> 59 * <p/> 60 * JAX-WS RI is capable of running a large number of request/response concurrently by 61 * using a relatively small number of threads. This is made possible by utilizing 62 * a {@link Fiber} — a user-level thread that gets created for each request/response 63 * processing. 64 * <p/> 65 * <p/> 66 * A fiber remembers where in the pipeline the processing is at, what needs to be 67 * executed on the way out (when processing response), and other additional information 68 * specific to the execution of a particular request/response. 69 * <p/> 70 * <h2>Suspend/Resume</h2> 71 * <p/> 72 * Fiber can be {@link NextAction#suspend() suspended} by a {@link Tube}. 73 * When a fiber is suspended, it will be kept on the side until it is 74 * {@link #resume(Packet) resumed}. This allows threads to go execute 75 * other runnable fibers, allowing efficient utilization of smaller number of 76 * threads. 77 * <p/> 78 * <h2>Context-switch Interception</h2> 79 * <p/> 80 * {@link FiberContextSwitchInterceptor} allows {@link Tube}s and {@link Adapter}s 81 * to perform additional processing every time a thread starts running a fiber 82 * and stops running it. 83 * <p/> 84 * <h2>Context ClassLoader</h2> 85 * <p/> 86 * Just like thread, a fiber has a context class loader (CCL.) A fiber's CCL 87 * becomes the thread's CCL when it's executing the fiber. The original CCL 88 * of the thread will be restored when the thread leaves the fiber execution. 89 * <p/> 90 * <p/> 91 * <h2>Debugging Aid</h2> 92 * <p/> 93 * Because {@link Fiber} doesn't keep much in the call stack, and instead use 94 * {@link #conts} to store the continuation, debugging fiber related activities 95 * could be harder. 96 * <p/> 97 * <p/> 98 * Setting the {@link #LOGGER} for FINE would give you basic start/stop/resume/suspend 99 * level logging. Using FINER would cause more detailed logging, which includes 100 * what tubes are executed in what order and how they behaved. 101 * <p/> 102 * <p/> 103 * When you debug the server side, consider setting {@link Fiber#serializeExecution} 104 * to true, so that execution of fibers are serialized. Debugging a server 105 * with more than one running threads is very tricky, and this switch will 106 * prevent that. This can be also enabled by setting the system property on. 107 * See the source code. 108 * 109 * @author Kohsuke Kawaguchi 110 * @author Jitendra Kotamraju 111 */ 112 public final class Fiber implements Runnable, Cancelable, ComponentRegistry { 113 114 /** 115 * Callback interface for notification of suspend and resume. 116 * 117 * @since 2.2.6 118 * @deprecated Use {@link NextAction#suspend(Runnable)} 119 */ 120 public interface Listener { 121 /** 122 * Fiber has been suspended. Implementations of this callback may resume the Fiber. 123 * @param fiber Fiber 124 */ 125 public void fiberSuspended(Fiber fiber); 126 127 /** 128 * Fiber has been resumed. Behavior is undefined if implementations of this callback attempt to suspend the Fiber. 129 * @param fiber Fiber 130 */ 131 public void fiberResumed(Fiber fiber); 132 } 133 134 private final List<Listener> _listeners = new ArrayList<Listener>(); 135 136 /** 137 * Adds suspend/resume callback listener 138 * @param listener Listener 139 * @since 2.2.6 140 * @deprecated 141 */ 142 public void addListener(Listener listener) { 143 synchronized(_listeners) { 144 if (!_listeners.contains(listener)) { 145 _listeners.add(listener); 146 } 147 } 148 } 149 150 /** 151 * Removes suspend/resume callback listener 152 * @param listener Listener 153 * @since 2.2.6 154 * @deprecated 155 */ 156 public void removeListener(Listener listener) { 157 synchronized(_listeners) { 158 _listeners.remove(listener); 159 } 160 } 161 162 List<Listener> getCurrentListeners() { 163 synchronized(_listeners) { 164 return new ArrayList<Listener>(_listeners); 165 } 166 } 167 168 private void clearListeners() { 169 synchronized(_listeners) { 170 _listeners.clear(); 171 } 172 } 173 174 /** 175 * {@link Tube}s whose {@link Tube#processResponse(Packet)} method needs 176 * to be invoked on the way back. 177 */ 178 private Tube[] conts = new Tube[16]; 179 private int contsSize; 180 181 /** 182 * If this field is non-null, the next instruction to execute is 183 * to call its {@link Tube#processRequest(Packet)}. Otherwise 184 * the instruction is to call {@link #conts}. 185 */ 186 private Tube next; 187 188 private Packet packet; 189 190 private Throwable/*but really it's either RuntimeException or Error*/ throwable; 191 192 public final Engine owner; 193 194 /** 195 * Is this thread suspended? 0=not suspended, 1=suspended. 196 * <p/> 197 * <p/> 198 * Logically this is just a boolean, but we need to prepare for the case 199 * where the thread is {@link #resume(Packet) resumed} before we get to the {@link #suspend()}. 200 * This happens when things happen in the following order: 201 * <p/> 202 * <ol> 203 * <li>Tube decides that the fiber needs to be suspended to wait for the external event. 204 * <li>Tube hooks up fiber with some external mechanism (like NIO channel selector) 205 * <li>Tube returns with {@link NextAction#suspend()}. 206 * <li>"External mechanism" becomes signal state and invokes {@link Fiber#resume(Packet)} 207 * to wake up fiber 208 * <li>{@link Fiber#doRun} invokes {@link Fiber#suspend()}. 209 * </ol> 210 * <p/> 211 * <p/> 212 * Using int, this will work OK because {@link #suspendedCount} becomes -1 when 213 * {@link #resume(Packet)} occurs before {@link #suspend()}. 214 * <p/> 215 * <p/> 216 * Increment and decrement is guarded by 'this' object. 217 */ 218 private volatile int suspendedCount = 0; 219 220 private volatile boolean isInsideSuspendCallbacks = false; 221 222 /** 223 * Is this {@link Fiber} currently running in the synchronous mode? 224 */ 225 private boolean synchronous; 226 227 private boolean interrupted; 228 229 private final int id; 230 231 /** 232 * Active {@link FiberContextSwitchInterceptor}s for this fiber. 233 */ 234 private List<FiberContextSwitchInterceptor> interceptors; 235 236 /** 237 * Fiber's context {@link ClassLoader}. 238 */ 239 private 240 @Nullable 241 ClassLoader contextClassLoader; 242 243 private 244 @Nullable 245 CompletionCallback completionCallback; 246 247 private boolean isDeliverThrowableInPacket = false; 248 249 public void setDeliverThrowableInPacket(boolean isDeliverThrowableInPacket) { 250 this.isDeliverThrowableInPacket = isDeliverThrowableInPacket; 251 } 252 253 /** 254 * The thread on which this Fiber is currently executing, if applicable. 255 */ 256 private Thread currentThread; 257 258 /** 259 * Replace uses of synchronized(this) with this lock so that we can control 260 * unlocking for resume use cases 261 */ 262 private final ReentrantLock lock = new ReentrantLock(); 263 private final Condition condition = lock.newCondition(); 264 265 private volatile boolean isCanceled; 266 267 /** 268 * Set to true if this fiber is started asynchronously, to avoid 269 * doubly-invoking completion code. 270 */ 271 private boolean started; 272 273 /** 274 * Set to true if this fiber is started sync but allowed to run async. 275 * This property exists for use cases where the processing model is fundamentally async 276 * but some requirement or feature mandates that part of the tubeline run synchronously. For 277 * instance, WS-ReliableMessaging with non-anonymous addressing is compatible with running 278 * asynchronously, but if in-order message delivery is used then message processing must assign 279 * a message number before the remainder of the processing can be asynchronous. 280 */ 281 private boolean startedSync; 282 283 /** 284 * Callback to be invoked when a {@link Fiber} finishes execution. 285 */ 286 public interface CompletionCallback { 287 /** 288 * Indicates that the fiber has finished its execution. 289 * <p/> 290 * <p/> 291 * Since the JAX-WS RI runs asynchronously, 292 * this method maybe invoked by a different thread 293 * than any of the threads that started it or run a part of tubeline. 294 */ 295 void onCompletion(@NotNull Packet response); 296 297 /** 298 * Indicates that the fiber has finished abnormally, by throwing a given {@link Throwable}. 299 */ 300 void onCompletion(@NotNull Throwable error); 301 } 302 303 Fiber(Engine engine) { 304 this.owner = engine; 305 id = iotaGen.incrementAndGet(); 306 if (isTraceEnabled()) { 307 LOGGER.log(Level.FINE, "{0} created", getName()); 308 } 309 310 // if this is run from another fiber, then we naturally inherit its context classloader, 311 // so this code works for fiber->fiber inheritance just fine. 312 contextClassLoader = Thread.currentThread().getContextClassLoader(); 313 } 314 315 /** 316 * Starts the execution of this fiber asynchronously. 317 * <p/> 318 * <p/> 319 * This method works like {@link Thread#start()}. 320 * 321 * @param tubeline The first tube of the tubeline that will act on the packet. 322 * @param request The request packet to be passed to <tt>startPoint.processRequest()</tt>. 323 * @param completionCallback The callback to be invoked when the processing is finished and the 324 * final response packet is available. 325 * @see #runSync(Tube, Packet) 326 */ 327 public void start(@NotNull Tube tubeline, @NotNull Packet request, @Nullable CompletionCallback completionCallback) { 328 start(tubeline, request, completionCallback, false); 329 } 330 331 private void dumpFiberContext(String desc) { 332 if(isTraceEnabled()) { 333 String action = null; 334 String msgId = null; 335 if (packet != null) { 336 for (SOAPVersion sv: SOAPVersion.values()) { 337 for (AddressingVersion av: AddressingVersion.values()) { 338 action = packet.getMessage() != null ? AddressingUtils.getAction(packet.getMessage().getHeaders(), av, sv) : null; 339 msgId = packet.getMessage() != null ? AddressingUtils.getMessageID(packet.getMessage().getHeaders(), av, sv) : null; 340 if (action != null || msgId != null) { 341 break; 342 } 343 } 344 if (action != null || msgId != null) { 345 break; 346 } 347 } 348 } 349 String actionAndMsgDesc; 350 if (action == null && msgId == null) { 351 actionAndMsgDesc = "NO ACTION or MSG ID"; 352 } else { 353 actionAndMsgDesc = "'" + action + "' and msgId '" + msgId + "'"; 354 } 355 356 String tubeDesc; 357 if (next != null) { 358 tubeDesc = next.toString() + ".processRequest()"; 359 } else { 360 tubeDesc = peekCont() + ".processResponse()"; 361 } 362 363 LOGGER.log(Level.FINE, "{0} {1} with {2} and ''current'' tube {3} from thread {4} with Packet: {5}", new Object[]{getName(), desc, actionAndMsgDesc, tubeDesc, Thread.currentThread().getName(), packet != null ? packet.toShortString() : null}); 364 } 365 } 366 367 /** 368 * Starts the execution of this fiber. 369 * 370 * If forceSync is true, then the fiber is started for an ostensibly async invocation, 371 * but allows for some portion of the tubeline to run sync with the calling 372 * client instance (Port/Dispatch instance). This allows tubes that enforce 373 * ordering to see requests in the order they were sent at the point the 374 * client invoked them. 375 * <p> 376 * The forceSync parameter will be true only when the caller (e.g. AsyncInvoker or 377 * SEIStub) knows one or more tubes need to enforce ordering and thus need 378 * to run sync with the client. Such tubes can return 379 * NextAction.INVOKE_ASYNC to indicate that the next tube in the tubeline 380 * should be invoked async to the current thread. 381 * 382 * <p> 383 * This method works like {@link Thread#start()}. 384 * 385 * @param tubeline 386 * The first tube of the tubeline that will act on the packet. 387 * @param request 388 * The request packet to be passed to <tt>startPoint.processRequest()</tt>. 389 * @param completionCallback 390 * The callback to be invoked when the processing is finished and the 391 * final response packet is available. 392 * 393 * @see #start(Tube,Packet,CompletionCallback) 394 * @see #runSync(Tube,Packet) 395 * @since 2.2.6 396 */ 397 public void start(@NotNull Tube tubeline, @NotNull Packet request, @Nullable CompletionCallback completionCallback, boolean forceSync) { 398 next = tubeline; 399 this.packet = request; 400 this.completionCallback = completionCallback; 401 402 if (forceSync) { 403 this.startedSync = true; 404 dumpFiberContext("starting (sync)"); 405 run(); 406 } else { 407 this.started = true; 408 dumpFiberContext("starting (async)"); 409 owner.addRunnable(this); 410 } 411 } 412 413 /** 414 * Wakes up a suspended fiber. 415 * <p/> 416 * <p/> 417 * If a fiber was suspended without specifying the next {@link Tube}, 418 * then the execution will be resumed in the response processing direction, 419 * by calling the {@link Tube#processResponse(Packet)} method on the next/first 420 * {@link Tube} in the {@link Fiber}'s processing stack with the specified resume 421 * packet as the parameter. 422 * <p/> 423 * <p/> 424 * If a fiber was suspended with specifying the next {@link Tube}, 425 * then the execution will be resumed in the request processing direction, 426 * by calling the next tube's {@link Tube#processRequest(Packet)} method with the 427 * specified resume packet as the parameter. 428 * <p/> 429 * <p/> 430 * This method is implemented in a race-free way. Another thread can invoke 431 * this method even before this fiber goes into the suspension mode. So the caller 432 * need not worry about synchronizing {@link NextAction#suspend()} and this method. 433 * 434 * @param resumePacket packet used in the resumed processing 435 */ 436 public void resume(@NotNull Packet resumePacket) { 437 resume(resumePacket, false); 438 } 439 440 /** 441 * Similar to resume(Packet) but allowing the Fiber to be resumed 442 * synchronously (in the current Thread). If you want to know when the 443 * fiber completes (not when this method returns) then add/wrap a 444 * CompletionCallback on this Fiber. 445 * For example, an asynchronous response endpoint that supports WS-ReliableMessaging 446 * including in-order message delivery may need to resume the Fiber synchronously 447 * until message order is confirmed prior to returning to asynchronous processing. 448 * @since 2.2.6 449 */ 450 public void resume(@NotNull Packet resumePacket, 451 boolean forceSync) { 452 resume(resumePacket, forceSync, null); 453 } 454 455 /** 456 * Similar to resume(Packet, boolean) but allowing the Fiber to be resumed 457 * and at the same time atomically assign a new CompletionCallback to it. 458 * @since 2.2.6 459 */ 460 public void resume(@NotNull Packet resumePacket, 461 boolean forceSync, 462 CompletionCallback callback) { 463 lock.lock(); 464 try { 465 if (callback != null) { 466 setCompletionCallback(callback); 467 } 468 if(isTraceEnabled()) 469 LOGGER.log(Level.FINE, "{0} resuming. Will have suspendedCount={1}", new Object[]{getName(), suspendedCount-1}); 470 packet = resumePacket; 471 if( --suspendedCount == 0 ) { 472 if (!isInsideSuspendCallbacks) { 473 List<Listener> listeners = getCurrentListeners(); 474 for (Listener listener: listeners) { 475 try { 476 listener.fiberResumed(this); 477 } catch (Throwable e) { 478 if (isTraceEnabled()) 479 LOGGER.log(Level.FINE, "Listener {0} threw exception: {1}", new Object[]{listener, e.getMessage()}); 480 } 481 } 482 483 if(synchronous) { 484 condition.signalAll(); 485 } else if (forceSync || startedSync) { 486 run(); 487 } else { 488 dumpFiberContext("resuming (async)"); 489 owner.addRunnable(this); 490 } 491 } 492 } else { 493 if (isTraceEnabled()) { 494 LOGGER.log(Level.FINE, "{0} taking no action on resume because suspendedCount != 0: {1}", new Object[]{getName(), suspendedCount}); 495 } 496 } 497 } finally { 498 lock.unlock(); 499 } 500 } 501 502 /** 503 * Wakes up a suspended fiber and begins response processing. 504 * @since 2.2.6 505 */ 506 public void resumeAndReturn(@NotNull Packet resumePacket, 507 boolean forceSync) { 508 if(isTraceEnabled()) 509 LOGGER.log(Level.FINE, "{0} resumed with Return Packet", getName()); 510 next = null; 511 resume(resumePacket, forceSync); 512 } 513 514 /** 515 * Wakes up a suspended fiber with an exception. 516 * <p/> 517 * <p/> 518 * The execution of the suspended fiber will be resumed in the response 519 * processing direction, by calling the {@link Tube#processException(Throwable)} method 520 * on the next/first {@link Tube} in the {@link Fiber}'s processing stack with 521 * the specified exception as the parameter. 522 * <p/> 523 * <p/> 524 * This method is implemented in a race-free way. Another thread can invoke 525 * this method even before this fiber goes into the suspension mode. So the caller 526 * need not worry about synchronizing {@link NextAction#suspend()} and this method. 527 * 528 * @param throwable exception that is used in the resumed processing 529 */ 530 public void resume(@NotNull Throwable throwable) { 531 resume(throwable, packet, false); 532 } 533 534 /** 535 * Wakes up a suspended fiber with an exception. 536 * <p/> 537 * <p/> 538 * The execution of the suspended fiber will be resumed in the response 539 * processing direction, by calling the {@link Tube#processException(Throwable)} method 540 * on the next/first {@link Tube} in the {@link Fiber}'s processing stack with 541 * the specified exception as the parameter. 542 * <p/> 543 * <p/> 544 * This method is implemented in a race-free way. Another thread can invoke 545 * this method even before this fiber goes into the suspension mode. So the caller 546 * need not worry about synchronizing {@link NextAction#suspend()} and this method. 547 * 548 * @param throwable exception that is used in the resumed processing 549 * @param packet Packet that will be visible on the Fiber after the resume 550 * @since 2.2.8 551 */ 552 public void resume(@NotNull Throwable throwable, @NotNull Packet packet) { 553 resume(throwable, packet, false); 554 } 555 556 /** 557 * Wakes up a suspend fiber with an exception. 558 * 559 * If forceSync is true, then the suspended fiber will resume with 560 * synchronous processing on the current thread. This will continue 561 * until some Tube indicates that it is safe to switch to asynchronous 562 * processing. 563 * 564 * @param error exception that is used in the resumed processing 565 * @param forceSync if processing begins synchronously 566 * @since 2.2.6 567 */ 568 public void resume(@NotNull Throwable error, 569 boolean forceSync) { 570 resume(error, packet, forceSync); 571 } 572 573 /** 574 * Wakes up a suspend fiber with an exception. 575 * 576 * If forceSync is true, then the suspended fiber will resume with 577 * synchronous processing on the current thread. This will continue 578 * until some Tube indicates that it is safe to switch to asynchronous 579 * processing. 580 * 581 * @param error exception that is used in the resumed processing 582 * @param packet Packet that will be visible on the Fiber after the resume 583 * @param forceSync if processing begins synchronously 584 * @since 2.2.8 585 */ 586 public void resume(@NotNull Throwable error, 587 @NotNull Packet packet, 588 boolean forceSync) { 589 if(isTraceEnabled()) 590 LOGGER.log(Level.FINE, "{0} resumed with Return Throwable", getName()); 591 next = null; 592 throwable = error; 593 resume(packet, forceSync); 594 } 595 596 /** 597 * Marks this Fiber as cancelled. A cancelled Fiber will never invoke its completion callback 598 * @param mayInterrupt if cancel should use {@link Thread#interrupt()} 599 * @see java.util.concurrent.Future#cancel(boolean) 600 * @since 2.2.6 601 */ 602 @Override 603 public void cancel(boolean mayInterrupt) { 604 isCanceled = true; 605 if (mayInterrupt) { 606 // synchronized(this) is used as Thread running Fiber will be holding lock 607 synchronized(this) { 608 if (currentThread != null) 609 currentThread.interrupt(); 610 } 611 } 612 } 613 614 /** 615 * Suspends this fiber's execution until the resume method is invoked. 616 * <p/> 617 * The call returns immediately, and when the fiber is resumed 618 * the execution picks up from the last scheduled continuation. 619 * @param onExitRunnable runnable to be invoked after fiber is marked for suspension 620 * @return if control loop must exit 621 */ 622 private boolean suspend(Holder<Boolean> isRequireUnlock, Runnable onExitRunnable) { 623 if(isTraceEnabled()) { 624 LOGGER.log(Level.FINE, "{0} suspending. Will have suspendedCount={1}", new Object[]{getName(), suspendedCount+1}); 625 if (suspendedCount > 0) { 626 LOGGER.log(Level.FINE, "WARNING - {0} suspended more than resumed. Will require more than one resume to actually resume this fiber.", getName()); 627 } 628 } 629 630 List<Listener> listeners = getCurrentListeners(); 631 if (++suspendedCount == 1) { 632 isInsideSuspendCallbacks = true; 633 try { 634 for (Listener listener: listeners) { 635 try { 636 listener.fiberSuspended(this); 637 } catch (Throwable e) { 638 if(isTraceEnabled()) 639 LOGGER.log(Level.FINE, "Listener {0} threw exception: {1}", new Object[]{listener, e.getMessage()}); 640 } 641 } 642 } finally { 643 isInsideSuspendCallbacks = false; 644 } 645 } 646 647 if (suspendedCount <= 0) { 648 // suspend callback caused fiber to resume 649 for (Listener listener: listeners) { 650 try { 651 listener.fiberResumed(this); 652 } catch (Throwable e) { 653 if(isTraceEnabled()) 654 LOGGER.log(Level.FINE, "Listener {0} threw exception: {1}", new Object[]{listener, e.getMessage()}); 655 } 656 } 657 658 } else if (onExitRunnable != null) { 659 // synchronous use cases cannot disconnect from the current thread 660 if (!synchronous) { 661 /* INTENTIONALLY UNLOCKING EARLY */ 662 synchronized(this) { 663 // currentThread is protected by the monitor for this fiber so 664 // that it is accessible to cancel() even when the lock is held 665 currentThread = null; 666 } 667 lock.unlock(); 668 assert(!lock.isHeldByCurrentThread()); 669 isRequireUnlock.value = Boolean.FALSE; 670 671 try { 672 onExitRunnable.run(); 673 } catch(Throwable t) { 674 throw new OnExitRunnableException(t); 675 } 676 677 return true; 678 679 } else { 680 // for synchronous we will stay with current thread, so do not disconnect 681 if (isTraceEnabled()) 682 LOGGER.fine("onExitRunnable used with synchronous Fiber execution -- not exiting current thread"); 683 onExitRunnable.run(); 684 } 685 } 686 687 return false; 688 } 689 690 private static final class OnExitRunnableException extends RuntimeException { 691 private static final long serialVersionUID = 1L; 692 693 Throwable target; 694 695 public OnExitRunnableException(Throwable target) { 696 super((Throwable)null); // see pattern for InvocationTargetException 697 this.target = target; 698 } 699 } 700 701 /** 702 * Adds a new {@link FiberContextSwitchInterceptor} to this fiber. 703 * <p/> 704 * <p/> 705 * The newly installed fiber will take effect immediately after the current 706 * tube returns from its {@link Tube#processRequest(Packet)} or 707 * {@link Tube#processResponse(Packet)}, before the next tube begins processing. 708 * <p/> 709 * <p/> 710 * So when the tubeline consists of X and Y, and when X installs an interceptor, 711 * the order of execution will be as follows: 712 * <p/> 713 * <ol> 714 * <li>X.processRequest() 715 * <li>interceptor gets installed 716 * <li>interceptor.execute() is invoked 717 * <li>Y.processRequest() 718 * </ol> 719 */ 720 public synchronized void addInterceptor(@NotNull FiberContextSwitchInterceptor interceptor) { 721 if (interceptors == null) { 722 interceptors = new ArrayList<FiberContextSwitchInterceptor>(); 723 } else { 724 List<FiberContextSwitchInterceptor> l = new ArrayList<FiberContextSwitchInterceptor>(); 725 l.addAll(interceptors); 726 interceptors = l; 727 } 728 interceptors.add(interceptor); 729 } 730 731 /** 732 * Removes a {@link FiberContextSwitchInterceptor} from this fiber. 733 * <p/> 734 * <p/> 735 * The removal of the interceptor takes effect immediately after the current 736 * tube returns from its {@link Tube#processRequest(Packet)} or 737 * {@link Tube#processResponse(Packet)}, before the next tube begins processing. 738 * <p/> 739 * <p/> 740 * <p/> 741 * So when the tubeline consists of X and Y, and when Y uninstalls an interceptor 742 * on the way out, then the order of execution will be as follows: 743 * <p/> 744 * <ol> 745 * <li>Y.processResponse() (notice that this happens with interceptor.execute() in the callstack) 746 * <li>interceptor gets uninstalled 747 * <li>interceptor.execute() returns 748 * <li>X.processResponse() 749 * </ol> 750 * 751 * @return true if the specified interceptor was removed. False if 752 * the specified interceptor was not registered with this fiber to begin with. 753 */ 754 public synchronized boolean removeInterceptor(@NotNull FiberContextSwitchInterceptor interceptor) { 755 if (interceptors != null) { 756 boolean result = interceptors.remove(interceptor); 757 if (interceptors.isEmpty()) 758 interceptors = null; 759 else { 760 List<FiberContextSwitchInterceptor> l = new ArrayList<FiberContextSwitchInterceptor>(); 761 l.addAll(interceptors); 762 interceptors = l; 763 } 764 return result; 765 } 766 return false; 767 } 768 769 /** 770 * Gets the context {@link ClassLoader} of this fiber. 771 */ 772 public 773 @Nullable 774 ClassLoader getContextClassLoader() { 775 return contextClassLoader; 776 } 777 778 /** 779 * Sets the context {@link ClassLoader} of this fiber. 780 */ 781 public ClassLoader setContextClassLoader(@Nullable ClassLoader contextClassLoader) { 782 ClassLoader r = this.contextClassLoader; 783 this.contextClassLoader = contextClassLoader; 784 return r; 785 } 786 787 /** 788 * DO NOT CALL THIS METHOD. This is an implementation detail 789 * of {@link Fiber}. 790 */ 791 @Deprecated 792 @Override 793 public void run() { 794 Container old = ContainerResolver.getDefault().enterContainer(owner.getContainer()); 795 try { 796 assert !synchronous; 797 // doRun returns true to indicate an early exit from fiber processing 798 if (!doRun()) { 799 if (startedSync && suspendedCount == 0 && 800 (next != null || contsSize > 0)) { 801 // We bailed out of running this fiber we started as sync, and now 802 // want to finish running it async 803 startedSync = false; 804 // Start back up as an async fiber 805 dumpFiberContext("restarting (async) after startSync"); 806 owner.addRunnable(this); 807 } else { 808 completionCheck(); 809 } 810 } 811 } finally { 812 ContainerResolver.getDefault().exitContainer(old); 813 } 814 } 815 816 /** 817 * Runs a given {@link Tube} (and everything thereafter) synchronously. 818 * <p/> 819 * <p/> 820 * This method blocks and returns only when all the successive {@link Tube}s 821 * complete their request/response processing. This method can be used 822 * if a {@link Tube} needs to fallback to synchronous processing. 823 * <p/> 824 * <h3>Example:</h3> 825 * <pre> 826 * class FooTube extends {@link AbstractFilterTubeImpl} { 827 * NextAction processRequest(Packet request) { 828 * // run everything synchronously and return with the response packet 829 * return doReturnWith(Fiber.current().runSync(next,request)); 830 * } 831 * NextAction processResponse(Packet response) { 832 * // never be invoked 833 * } 834 * } 835 * </pre> 836 * 837 * @param tubeline The first tube of the tubeline that will act on the packet. 838 * @param request The request packet to be passed to <tt>startPoint.processRequest()</tt>. 839 * @return The response packet to the <tt>request</tt>. 840 * @see #start(Tube, Packet, CompletionCallback) 841 */ 842 public 843 @NotNull 844 Packet runSync(@NotNull Tube tubeline, @NotNull Packet request) { 845 lock.lock(); 846 try { 847 // save the current continuation, so that we return runSync() without executing them. 848 final Tube[] oldCont = conts; 849 final int oldContSize = contsSize; 850 final boolean oldSynchronous = synchronous; 851 final Tube oldNext = next; 852 853 if (oldContSize > 0) { 854 conts = new Tube[16]; 855 contsSize = 0; 856 } 857 858 try { 859 synchronous = true; 860 this.packet = request; 861 next = tubeline; 862 doRun(); 863 if (throwable != null) { 864 if (isDeliverThrowableInPacket) { 865 packet.addSatellite(new ThrowableContainerPropertySet(throwable)); 866 } else { 867 if (throwable instanceof RuntimeException) { 868 throw (RuntimeException) throwable; 869 } 870 if (throwable instanceof Error) { 871 throw (Error) throwable; 872 } 873 // our system is supposed to only accept Error or RuntimeException 874 throw new AssertionError(throwable); 875 } 876 } 877 return this.packet; 878 } finally { 879 conts = oldCont; 880 contsSize = oldContSize; 881 synchronous = oldSynchronous; 882 next = oldNext; 883 if(interrupted) { 884 Thread.currentThread().interrupt(); 885 interrupted = false; 886 } 887 if(!started && !startedSync) 888 completionCheck(); 889 } 890 } finally { 891 lock.unlock(); 892 } 893 } 894 895 private void completionCheck() { 896 lock.lock(); 897 try { 898 // Don't trigger completion and callbacks if fiber is suspended 899 if(!isCanceled && contsSize==0 && suspendedCount == 0) { 900 if(isTraceEnabled()) 901 LOGGER.log(Level.FINE, "{0} completed", getName()); 902 clearListeners(); 903 condition.signalAll(); 904 if (completionCallback != null) { 905 if (throwable != null) { 906 if (isDeliverThrowableInPacket) { 907 packet.addSatellite(new ThrowableContainerPropertySet(throwable)); 908 completionCallback.onCompletion(packet); 909 } else 910 completionCallback.onCompletion(throwable); 911 } else 912 completionCallback.onCompletion(packet); 913 } 914 } 915 } finally { 916 lock.unlock(); 917 } 918 } 919 920 /** 921 * Invokes all registered {@link InterceptorHandler}s and then call into 922 * {@link Fiber#__doRun()}. 923 */ 924 private class InterceptorHandler implements FiberContextSwitchInterceptor.Work<Tube, Tube> { 925 private final Holder<Boolean> isUnlockRequired; 926 private final List<FiberContextSwitchInterceptor> ints; 927 928 /** 929 * Index in {@link Fiber#interceptors} to invoke next. 930 */ 931 private int idx; 932 933 public InterceptorHandler(Holder<Boolean> isUnlockRequired, List<FiberContextSwitchInterceptor> ints) { 934 this.isUnlockRequired = isUnlockRequired; 935 this.ints = ints; 936 } 937 938 /** 939 * Initiate the interception, and eventually invokes {@link Fiber#__doRun()}. 940 */ 941 Tube invoke(Tube next) { 942 idx = 0; 943 return execute(next); 944 } 945 946 @Override 947 public Tube execute(Tube next) { 948 if (idx == ints.size()) { 949 Fiber.this.next = next; 950 if (__doRun(isUnlockRequired, ints)) 951 return PLACEHOLDER; 952 } else { 953 FiberContextSwitchInterceptor interceptor = ints.get(idx++); 954 return interceptor.execute(Fiber.this, next, this); 955 } 956 return Fiber.this.next; 957 } 958 } 959 960 private static final PlaceholderTube PLACEHOLDER = new PlaceholderTube(); 961 962 private static class PlaceholderTube extends AbstractTubeImpl { 963 964 @Override 965 public NextAction processRequest(Packet request) { 966 throw new UnsupportedOperationException(); 967 } 968 969 @Override 970 public NextAction processResponse(Packet response) { 971 throw new UnsupportedOperationException(); 972 } 973 974 @Override 975 public NextAction processException(Throwable t) { 976 return doThrow(t); 977 } 978 979 @Override 980 public void preDestroy() { 981 } 982 983 @Override 984 public PlaceholderTube copy(TubeCloner cloner) { 985 throw new UnsupportedOperationException(); 986 } 987 } 988 989 /** 990 * Executes the fiber as much as possible. 991 * 992 */ 993 private boolean doRun() { 994 dumpFiberContext("running"); 995 996 if (serializeExecution) { 997 serializedExecutionLock.lock(); 998 try { 999 return _doRun(next); 1000 } finally { 1001 serializedExecutionLock.unlock(); 1002 } 1003 } else { 1004 return _doRun(next); 1005 } 1006 } 1007 1008 private boolean _doRun(Tube next) { 1009 // isRequireUnlock will contain Boolean.FALSE when lock has already been released in suspend 1010 Holder<Boolean> isRequireUnlock = new Holder<Boolean>(Boolean.TRUE); 1011 lock.lock(); 1012 try { 1013 List<FiberContextSwitchInterceptor> ints; 1014 ClassLoader old; 1015 synchronized(this) { 1016 ints = interceptors; 1017 1018 // currentThread is protected by the monitor for this fiber so 1019 // that it is accessible to cancel() even when the lock is held 1020 currentThread = Thread.currentThread(); 1021 if (isTraceEnabled()) { 1022 LOGGER.log(Level.FINE, "Thread entering _doRun(): {0}", currentThread); 1023 } 1024 1025 old = currentThread.getContextClassLoader(); 1026 currentThread.setContextClassLoader(contextClassLoader); 1027 } 1028 1029 try { 1030 boolean needsToReenter; 1031 do { 1032 // if interceptors are set, go through the interceptors. 1033 if (ints == null) { 1034 this.next = next; 1035 if (__doRun(isRequireUnlock, null /*ints*/)) { 1036 return true; 1037 } 1038 } else { 1039 next = new InterceptorHandler(isRequireUnlock, ints).invoke(next); 1040 if (next == PLACEHOLDER) { 1041 return true; 1042 } 1043 } 1044 1045 synchronized(this) { 1046 needsToReenter = (ints != interceptors); 1047 if (needsToReenter) 1048 ints = interceptors; 1049 } 1050 } while (needsToReenter); 1051 } catch(OnExitRunnableException o) { 1052 // catching this exception indicates onExitRunnable in suspend() threw. 1053 // we must still avoid double unlock 1054 Throwable t = o.target; 1055 if (t instanceof WebServiceException) 1056 throw (WebServiceException) t; 1057 throw new WebServiceException(t); 1058 } finally { 1059 // don't reference currentThread here because fiber processing 1060 // may already be running on a different thread (Note: isAlreadyExited 1061 // tracks this state 1062 Thread thread = Thread.currentThread(); 1063 thread.setContextClassLoader(old); 1064 if (isTraceEnabled()) { 1065 LOGGER.log(Level.FINE, "Thread leaving _doRun(): {0}", thread); 1066 } 1067 } 1068 1069 return false; 1070 } finally { 1071 if (isRequireUnlock.value) { 1072 synchronized(this) { 1073 currentThread = null; 1074 } 1075 lock.unlock(); 1076 } 1077 } 1078 } 1079 1080 /** 1081 * To be invoked from {@link #doRun()}. 1082 * 1083 * @see #doRun() 1084 */ 1085 private boolean __doRun(Holder<Boolean> isRequireUnlock, List<FiberContextSwitchInterceptor> originalInterceptors) { 1086 assert(lock.isHeldByCurrentThread()); 1087 1088 final Fiber old = CURRENT_FIBER.get(); 1089 CURRENT_FIBER.set(this); 1090 1091 // if true, lots of debug messages to show what's being executed 1092 final boolean traceEnabled = LOGGER.isLoggable(Level.FINER); 1093 1094 try { 1095 boolean abortResponse = false; 1096 while(isReady(originalInterceptors)) { 1097 if (isCanceled) { 1098 next = null; 1099 throwable = null; 1100 contsSize = 0; 1101 break; 1102 } 1103 1104 try { 1105 NextAction na; 1106 Tube last; 1107 if(throwable!=null) { 1108 if(contsSize==0 || abortResponse) { 1109 contsSize = 0; // abortResponse case 1110 // nothing else to execute. we are done. 1111 return false; 1112 } 1113 last = popCont(); 1114 if (traceEnabled) 1115 LOGGER.log(Level.FINER, "{0} {1}.processException({2})", new Object[]{getName(), last, throwable}); 1116 na = last.processException(throwable); 1117 } else { 1118 if(next!=null) { 1119 if(traceEnabled) 1120 LOGGER.log(Level.FINER, "{0} {1}.processRequest({2})", new Object[]{getName(), next, packet != null ? "Packet@"+Integer.toHexString(packet.hashCode()) : "null"}); 1121 na = next.processRequest(packet); 1122 last = next; 1123 } else { 1124 if(contsSize==0 || abortResponse) { 1125 // nothing else to execute. we are done. 1126 contsSize = 0; 1127 return false; 1128 } 1129 last = popCont(); 1130 if(traceEnabled) 1131 LOGGER.log(Level.FINER, "{0} {1}.processResponse({2})", new Object[]{getName(), last, packet != null ? "Packet@"+Integer.toHexString(packet.hashCode()) : "null"}); 1132 na = last.processResponse(packet); 1133 } 1134 } 1135 1136 if (traceEnabled) 1137 LOGGER.log(Level.FINER, "{0} {1} returned with {2}", new Object[]{getName(), last, na}); 1138 1139 // If resume is called before suspend, then make sure 1140 // resume(Packet) is not lost 1141 if (na.kind != NextAction.SUSPEND) { 1142 // preserve in-flight packet so that processException may inspect 1143 if (na.kind != NextAction.THROW && 1144 na.kind != NextAction.THROW_ABORT_RESPONSE) 1145 packet = na.packet; 1146 throwable = na.throwable; 1147 } 1148 1149 switch(na.kind) { 1150 case NextAction.INVOKE: 1151 case NextAction.INVOKE_ASYNC: 1152 pushCont(last); 1153 // fall through next 1154 case NextAction.INVOKE_AND_FORGET: 1155 next = na.next; 1156 if (na.kind == NextAction.INVOKE_ASYNC 1157 && startedSync) { 1158 // Break out here 1159 return false; 1160 } 1161 break; 1162 case NextAction.THROW_ABORT_RESPONSE: 1163 case NextAction.ABORT_RESPONSE: 1164 abortResponse = true; 1165 if (isTraceEnabled()) { 1166 LOGGER.log(Level.FINE, "Fiber {0} is aborting a response due to exception: {1}", new Object[]{this, na.throwable}); 1167 } 1168 case NextAction.RETURN: 1169 case NextAction.THROW: 1170 next = null; 1171 break; 1172 case NextAction.SUSPEND: 1173 if (next != null) { 1174 // Only store the 'last' tube when we're processing 1175 // a request, since conts array is for processResponse 1176 pushCont(last); 1177 } 1178 next = na.next; 1179 if(suspend(isRequireUnlock, na.onExitRunnable)) 1180 return true; // explicitly exiting control loop 1181 break; 1182 default: 1183 throw new AssertionError(); 1184 } 1185 } catch (RuntimeException t) { 1186 if (traceEnabled) 1187 LOGGER.log(Level.FINER, getName() + " Caught " + t + ". Start stack unwinding", t); 1188 throwable = t; 1189 } catch (Error t) { 1190 if (traceEnabled) 1191 LOGGER.log(Level.FINER, getName() + " Caught " + t + ". Start stack unwinding", t); 1192 throwable = t; 1193 } 1194 1195 dumpFiberContext("After tube execution"); 1196 } 1197 1198 // there's nothing we can execute right away. 1199 // we'll be back when this fiber is resumed. 1200 1201 } finally { 1202 CURRENT_FIBER.set(old); 1203 } 1204 1205 return false; 1206 } 1207 1208 private void pushCont(Tube tube) { 1209 conts[contsSize++] = tube; 1210 1211 // expand if needed 1212 int len = conts.length; 1213 if (contsSize == len) { 1214 Tube[] newBuf = new Tube[len * 2]; 1215 System.arraycopy(conts, 0, newBuf, 0, len); 1216 conts = newBuf; 1217 } 1218 } 1219 1220 private Tube popCont() { 1221 return conts[--contsSize]; 1222 } 1223 1224 private Tube peekCont() { 1225 int index = contsSize - 1; 1226 if (index >= 0 && index < conts.length) { 1227 return conts[index]; 1228 } else { 1229 return null; 1230 } 1231 } 1232 1233 /** 1234 * Only to be used by Tubes that manipulate the Fiber to create alternate flows 1235 * @since 2.2.6 1236 */ 1237 public void resetCont(Tube[] conts, int contsSize) { 1238 this.conts = conts; 1239 this.contsSize = contsSize; 1240 } 1241 1242 /** 1243 * Returns true if the fiber is ready to execute. 1244 */ 1245 private boolean isReady(List<FiberContextSwitchInterceptor> originalInterceptors) { 1246 if (synchronous) { 1247 while (suspendedCount == 1) 1248 try { 1249 if (isTraceEnabled()) { 1250 LOGGER.log(Level.FINE, "{0} is blocking thread {1}", new Object[]{getName(), Thread.currentThread().getName()}); 1251 } 1252 condition.await(); // the synchronized block is the whole runSync method. 1253 } catch (InterruptedException e) { 1254 // remember that we are interrupted, but don't respond to it 1255 // right away. This behavior is in line with what happens 1256 // when you are actually running the whole thing synchronously. 1257 interrupted = true; 1258 } 1259 1260 synchronized(this) { 1261 return interceptors == originalInterceptors; 1262 } 1263 } 1264 else { 1265 if (suspendedCount>0) 1266 return false; 1267 synchronized(this) { 1268 return interceptors == originalInterceptors; 1269 } 1270 } 1271 } 1272 1273 private String getName() { 1274 return "engine-" + owner.id + "fiber-" + id; 1275 } 1276 1277 @Override 1278 public String toString() { 1279 return getName(); 1280 } 1281 1282 /** 1283 * Gets the current {@link Packet} associated with this fiber. 1284 * <p/> 1285 * <p/> 1286 * This method returns null if no packet has been associated with the fiber yet. 1287 */ 1288 public 1289 @Nullable 1290 Packet getPacket() { 1291 return packet; 1292 } 1293 1294 /** 1295 * Returns completion callback associated with this Fiber 1296 * @return Completion callback 1297 * @since 2.2.6 1298 */ 1299 public CompletionCallback getCompletionCallback() { 1300 return completionCallback; 1301 } 1302 1303 /** 1304 * Updates completion callback associated with this Fiber 1305 * @param completionCallback Completion callback 1306 * @since 2.2.6 1307 */ 1308 public void setCompletionCallback(CompletionCallback completionCallback) { 1309 this.completionCallback = completionCallback; 1310 } 1311 1312 /** 1313 * (ADVANCED) Returns true if the current fiber is being executed synchronously. 1314 * <p/> 1315 * <p/> 1316 * Fiber may run synchronously for various reasons. Perhaps this is 1317 * on client side and application has invoked a synchronous method call. 1318 * Perhaps this is on server side and we have deployed on a synchronous 1319 * transport (like servlet.) 1320 * <p/> 1321 * <p/> 1322 * When a fiber is run synchronously (IOW by {@link #runSync(Tube, Packet)}), 1323 * further invocations to {@link #runSync(Tube, Packet)} can be done 1324 * without degrading the performance. 1325 * <p/> 1326 * <p/> 1327 * So this value can be used as a further optimization hint for 1328 * advanced {@link Tube}s to choose the best strategy to invoke 1329 * the next {@link Tube}. For example, a tube may want to install 1330 * a {@link FiberContextSwitchInterceptor} if running async, yet 1331 * it might find it faster to do {@link #runSync(Tube, Packet)} 1332 * if it's already running synchronously. 1333 */ 1334 public static boolean isSynchronous() { 1335 return current().synchronous; 1336 } 1337 1338 /** 1339 * Returns true if the current Fiber on the current thread was started 1340 * synchronously. Note, this is not strictly the same as being synchronous 1341 * because the assumption is that the Fiber will ultimately be dispatched 1342 * asynchronously, possibly have a completion callback associated with it, etc. 1343 * Note, the 'startedSync' flag is cleared once the current Fiber is 1344 * converted to running asynchronously. 1345 * @since 2.2.6 1346 */ 1347 public boolean isStartedSync() { 1348 return startedSync; 1349 } 1350 1351 /** 1352 * Gets the current fiber that's running. 1353 * <p/> 1354 * <p/> 1355 * This works like {@link Thread#currentThread()}. 1356 * This method only works when invoked from {@link Tube}. 1357 */ 1358 public static 1359 @NotNull 1360 @SuppressWarnings({"null", "ConstantConditions"}) 1361 Fiber current() { 1362 Fiber fiber = CURRENT_FIBER.get(); 1363 if (fiber == null) 1364 throw new IllegalStateException("Can be only used from fibers"); 1365 return fiber; 1366 } 1367 1368 /** 1369 * Gets the current fiber that's running, if set. 1370 */ 1371 public static Fiber getCurrentIfSet() { 1372 return CURRENT_FIBER.get(); 1373 } 1374 1375 private static final ThreadLocal<Fiber> CURRENT_FIBER = new ThreadLocal<Fiber>(); 1376 1377 /** 1378 * Used to allocate unique number for each fiber. 1379 */ 1380 private static final AtomicInteger iotaGen = new AtomicInteger(); 1381 1382 private static boolean isTraceEnabled() { 1383 return LOGGER.isLoggable(Level.FINE); 1384 } 1385 1386 private static final Logger LOGGER = Logger.getLogger(Fiber.class.getName()); 1387 1388 1389 private static final ReentrantLock serializedExecutionLock = new ReentrantLock(); 1390 1391 /** 1392 * Set this boolean to true to execute fibers sequentially one by one. 1393 * See class javadoc. 1394 */ 1395 public static volatile boolean serializeExecution = Boolean.getBoolean(Fiber.class.getName() + ".serialize"); 1396 1397 private final Set<Component> components = new CopyOnWriteArraySet<Component>(); 1398 1399 @Override 1400 public <S> S getSPI(Class<S> spiType) { 1401 for (Component c : components) { 1402 S spi = c.getSPI(spiType); 1403 if (spi != null) { 1404 return spi; 1405 } 1406 } 1407 return null; 1408 } 1409 1410 @Override 1411 public Set<Component> getComponents() { 1412 return components; 1413 } 1414 }