src/share/jaxws_classes/com/sun/xml/internal/ws/api/pipe/Fiber.java

Print this page

        

*** 1,7 **** /* ! * Copyright (c) 1997, 2010, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License version 2 only, as * published by the Free Software Foundation. Oracle designates this --- 1,7 ---- /* ! * Copyright (c) 1997, 2013, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License version 2 only, as * published by the Free Software Foundation. Oracle designates this
*** 30,53 **** import com.sun.xml.internal.ws.api.Cancelable; import com.sun.xml.internal.ws.api.Component; import com.sun.xml.internal.ws.api.ComponentRegistry; import com.sun.xml.internal.ws.api.SOAPVersion; import com.sun.xml.internal.ws.api.addressing.AddressingVersion; import com.sun.xml.internal.ws.api.message.Packet; import com.sun.xml.internal.ws.api.pipe.helper.AbstractFilterTubeImpl; import com.sun.xml.internal.ws.api.server.Adapter; import java.util.ArrayList; - import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantLock; import java.util.logging.Level; import java.util.logging.Logger; /** * User-level thread&#x2E; Represents the execution of one request/response processing. * <p/> * <p/> * JAX-WS RI is capable of running a large number of request/response concurrently by --- 30,60 ---- import com.sun.xml.internal.ws.api.Cancelable; import com.sun.xml.internal.ws.api.Component; import com.sun.xml.internal.ws.api.ComponentRegistry; import com.sun.xml.internal.ws.api.SOAPVersion; import com.sun.xml.internal.ws.api.addressing.AddressingVersion; + import com.sun.xml.internal.ws.api.message.AddressingUtils; import com.sun.xml.internal.ws.api.message.Packet; import com.sun.xml.internal.ws.api.pipe.helper.AbstractFilterTubeImpl; + import com.sun.xml.internal.ws.api.pipe.helper.AbstractTubeImpl; import com.sun.xml.internal.ws.api.server.Adapter; + import com.sun.xml.internal.ws.api.server.Container; + import com.sun.xml.internal.ws.api.server.ContainerResolver; import java.util.ArrayList; import java.util.List; import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.atomic.AtomicInteger; + import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; import java.util.logging.Level; import java.util.logging.Logger; + import javax.xml.ws.Holder; + import javax.xml.ws.WebServiceException; + /** * User-level thread&#x2E; Represents the execution of one request/response processing. * <p/> * <p/> * JAX-WS RI is capable of running a large number of request/response concurrently by
*** 106,115 **** --- 113,123 ---- /** * Callback interface for notification of suspend and resume. * * @since 2.2.6 + * @deprecated Use {@link NextAction#suspend(Runnable)} */ public interface Listener { /** * Fiber has been suspended. Implementations of this callback may resume the Fiber. * @param fiber Fiber
*** 121,136 **** * @param fiber Fiber */ public void fiberResumed(Fiber fiber); } ! private List<Listener> _listeners = new ArrayList<Listener>(); /** * Adds suspend/resume callback listener * @param listener Listener * @since 2.2.6 */ public void addListener(Listener listener) { synchronized(_listeners) { if (!_listeners.contains(listener)) { _listeners.add(listener); --- 129,145 ---- * @param fiber Fiber */ public void fiberResumed(Fiber fiber); } ! private final List<Listener> _listeners = new ArrayList<Listener>(); /** * Adds suspend/resume callback listener * @param listener Listener * @since 2.2.6 + * @deprecated */ public void addListener(Listener listener) { synchronized(_listeners) { if (!_listeners.contains(listener)) { _listeners.add(listener);
*** 140,157 **** /** * Removes suspend/resume callback listener * @param listener Listener * @since 2.2.6 */ public void removeListener(Listener listener) { synchronized(_listeners) { _listeners.remove(listener); } } ! private List<Listener> getCurrentListeners() { synchronized(_listeners) { return new ArrayList<Listener>(_listeners); } } --- 149,167 ---- /** * Removes suspend/resume callback listener * @param listener Listener * @since 2.2.6 + * @deprecated */ public void removeListener(Listener listener) { synchronized(_listeners) { _listeners.remove(listener); } } ! List<Listener> getCurrentListeners() { synchronized(_listeners) { return new ArrayList<Listener>(_listeners); } }
*** 208,222 **** private volatile int suspendedCount = 0; private volatile boolean isInsideSuspendCallbacks = false; /** - * Is this fiber completed? - */ - private volatile boolean completed; - - /** * Is this {@link Fiber} currently running in the synchronous mode? */ private boolean synchronous; private boolean interrupted; --- 218,227 ----
*** 227,265 **** * Active {@link FiberContextSwitchInterceptor}s for this fiber. */ private List<FiberContextSwitchInterceptor> interceptors; /** - * Not null when {@link #interceptors} is not null. - */ - private InterceptorHandler interceptorHandler; - - /** - * This flag is set to true when a new interceptor is added. - * <p/> - * When that happens, we need to first exit the current interceptors - * and then reenter them, so that the newly added interceptors start - * taking effect. This flag is used to control that flow. - */ - private boolean needsToReenter; - - /** * Fiber's context {@link ClassLoader}. */ private @Nullable ClassLoader contextClassLoader; private @Nullable CompletionCallback completionCallback; /** * The thread on which this Fiber is currently executing, if applicable. */ private Thread currentThread; private volatile boolean isCanceled; /** * Set to true if this fiber is started asynchronously, to avoid * doubly-invoking completion code. --- 232,269 ---- * Active {@link FiberContextSwitchInterceptor}s for this fiber. */ private List<FiberContextSwitchInterceptor> interceptors; /** * Fiber's context {@link ClassLoader}. */ private @Nullable ClassLoader contextClassLoader; private @Nullable CompletionCallback completionCallback; + private boolean isDeliverThrowableInPacket = false; + + public void setDeliverThrowableInPacket(boolean isDeliverThrowableInPacket) { + this.isDeliverThrowableInPacket = isDeliverThrowableInPacket; + } + /** * The thread on which this Fiber is currently executing, if applicable. */ private Thread currentThread; + /** + * Replace uses of synchronized(this) with this lock so that we can control + * unlocking for resume use cases + */ + private final ReentrantLock lock = new ReentrantLock(); + private final Condition condition = lock.newCondition(); + private volatile boolean isCanceled; /** * Set to true if this fiber is started asynchronously, to avoid * doubly-invoking completion code.
*** 275,285 **** * a message number before the remainder of the processing can be asynchronous. */ private boolean startedSync; /** ! * Callback to be invoked when a {@link Fiber} finishs execution. */ public interface CompletionCallback { /** * Indicates that the fiber has finished its execution. * <p/> --- 279,289 ---- * a message number before the remainder of the processing can be asynchronous. */ private boolean startedSync; /** ! * Callback to be invoked when a {@link Fiber} finishes execution. */ public interface CompletionCallback { /** * Indicates that the fiber has finished its execution. * <p/>
*** 296,310 **** void onCompletion(@NotNull Throwable error); } Fiber(Engine engine) { this.owner = engine; - if (isTraceEnabled()) { id = iotaGen.incrementAndGet(); ! LOGGER.fine(getName() + " created"); ! } else { ! id = -1; } // if this is run from another fiber, then we naturally inherit its context classloader, // so this code works for fiber->fiber inheritance just fine. contextClassLoader = Thread.currentThread().getContextClassLoader(); --- 300,312 ---- void onCompletion(@NotNull Throwable error); } Fiber(Engine engine) { this.owner = engine; id = iotaGen.incrementAndGet(); ! if (isTraceEnabled()) { ! LOGGER.log(Level.FINE, "{0} created", getName()); } // if this is run from another fiber, then we naturally inherit its context classloader, // so this code works for fiber->fiber inheritance just fine. contextClassLoader = Thread.currentThread().getContextClassLoader();
*** 331,342 **** String action = null; String msgId = null; if (packet != null) { for (SOAPVersion sv: SOAPVersion.values()) { for (AddressingVersion av: AddressingVersion.values()) { ! action = packet.getMessage() != null ? packet.getMessage().getHeaders().getAction(av, sv) : null; ! msgId = packet.getMessage() != null ? packet.getMessage().getHeaders().getMessageID(av, sv) : null; if (action != null || msgId != null) { break; } } if (action != null || msgId != null) { --- 333,344 ---- String action = null; String msgId = null; if (packet != null) { for (SOAPVersion sv: SOAPVersion.values()) { for (AddressingVersion av: AddressingVersion.values()) { ! action = packet.getMessage() != null ? AddressingUtils.getAction(packet.getMessage().getHeaders(), av, sv) : null; ! msgId = packet.getMessage() != null ? AddressingUtils.getMessageID(packet.getMessage().getHeaders(), av, sv) : null; if (action != null || msgId != null) { break; } } if (action != null || msgId != null) {
*** 356,366 **** tubeDesc = next.toString() + ".processRequest()"; } else { tubeDesc = peekCont() + ".processResponse()"; } ! LOGGER.fine(getName() + " " + desc + " with " + actionAndMsgDesc + " and 'current' tube " + tubeDesc + " from thread " + Thread.currentThread().getName() + " with Packet: " + (packet != null ? packet.toShortString() : null)); } } /** * Starts the execution of this fiber. --- 358,368 ---- tubeDesc = next.toString() + ".processRequest()"; } else { tubeDesc = peekCont() + ".processResponse()"; } ! LOGGER.log(Level.FINE, "{0} {1} with {2} and ''current'' tube {3} from thread {4} with Packet: {5}", new Object[]{getName(), desc, actionAndMsgDesc, tubeDesc, Thread.currentThread().getName(), packet != null ? packet.toShortString() : null}); } } /** * Starts the execution of this fiber.
*** 443,453 **** * For example, an asynchronous response endpoint that supports WS-ReliableMessaging * including in-order message delivery may need to resume the Fiber synchronously * until message order is confirmed prior to returning to asynchronous processing. * @since 2.2.6 */ ! public synchronized void resume(@NotNull Packet resumePacket, boolean forceSync) { resume(resumePacket, forceSync, null); } /** --- 445,455 ---- * For example, an asynchronous response endpoint that supports WS-ReliableMessaging * including in-order message delivery may need to resume the Fiber synchronously * until message order is confirmed prior to returning to asynchronous processing. * @since 2.2.6 */ ! public void resume(@NotNull Packet resumePacket, boolean forceSync) { resume(resumePacket, forceSync, null); } /**
*** 456,510 **** * @since 2.2.6 */ public void resume(@NotNull Packet resumePacket, boolean forceSync, CompletionCallback callback) { ! ! synchronized(this) { if (callback != null) { setCompletionCallback(callback); } if(isTraceEnabled()) ! LOGGER.fine(getName()+" resuming. Will have suspendedCount=" + (suspendedCount-1)); packet = resumePacket; if( --suspendedCount == 0 ) { if (!isInsideSuspendCallbacks) { List<Listener> listeners = getCurrentListeners(); for (Listener listener: listeners) { try { listener.fiberResumed(this); } catch (Throwable e) { if (isTraceEnabled()) ! LOGGER.fine("Listener " + listener + " threw exception: " + e.getMessage()); } } if(synchronous) { ! notifyAll(); } else if (forceSync || startedSync) { run(); } else { dumpFiberContext("resuming (async)"); owner.addRunnable(this); } } } else { if (isTraceEnabled()) { ! LOGGER.fine(getName() + " taking no action on resume because suspendedCount != 0: " + suspendedCount); } } } } /** * Wakes up a suspended fiber and begins response processing. * @since 2.2.6 */ ! public synchronized void resumeAndReturn(@NotNull Packet resumePacket, boolean forceSync) { if(isTraceEnabled()) ! LOGGER.fine(getName()+" resumed with Return Packet"); next = null; resume(resumePacket, forceSync); } /** --- 458,514 ---- * @since 2.2.6 */ public void resume(@NotNull Packet resumePacket, boolean forceSync, CompletionCallback callback) { ! lock.lock(); ! try { if (callback != null) { setCompletionCallback(callback); } if(isTraceEnabled()) ! LOGGER.log(Level.FINE, "{0} resuming. Will have suspendedCount={1}", new Object[]{getName(), suspendedCount-1}); packet = resumePacket; if( --suspendedCount == 0 ) { if (!isInsideSuspendCallbacks) { List<Listener> listeners = getCurrentListeners(); for (Listener listener: listeners) { try { listener.fiberResumed(this); } catch (Throwable e) { if (isTraceEnabled()) ! LOGGER.log(Level.FINE, "Listener {0} threw exception: {1}", new Object[]{listener, e.getMessage()}); } } if(synchronous) { ! condition.signalAll(); } else if (forceSync || startedSync) { run(); } else { dumpFiberContext("resuming (async)"); owner.addRunnable(this); } } } else { if (isTraceEnabled()) { ! LOGGER.log(Level.FINE, "{0} taking no action on resume because suspendedCount != 0: {1}", new Object[]{getName(), suspendedCount}); } } + } finally { + lock.unlock(); } } /** * Wakes up a suspended fiber and begins response processing. * @since 2.2.6 */ ! public void resumeAndReturn(@NotNull Packet resumePacket, boolean forceSync) { if(isTraceEnabled()) ! LOGGER.log(Level.FINE, "{0} resumed with Return Packet", getName()); next = null; resume(resumePacket, forceSync); } /**
*** 521,532 **** * this method even before this fiber goes into the suspension mode. So the caller * need not worry about synchronizing {@link NextAction#suspend()} and this method. * * @param throwable exception that is used in the resumed processing */ ! public synchronized void resume(@NotNull Throwable throwable) { ! resume(throwable, false); } /** * Wakes up a suspend fiber with an exception. * --- 525,558 ---- * this method even before this fiber goes into the suspension mode. So the caller * need not worry about synchronizing {@link NextAction#suspend()} and this method. * * @param throwable exception that is used in the resumed processing */ ! public void resume(@NotNull Throwable throwable) { ! resume(throwable, packet, false); ! } ! ! /** ! * Wakes up a suspended fiber with an exception. ! * <p/> ! * <p/> ! * The execution of the suspended fiber will be resumed in the response ! * processing direction, by calling the {@link Tube#processException(Throwable)} method ! * on the next/first {@link Tube} in the {@link Fiber}'s processing stack with ! * the specified exception as the parameter. ! * <p/> ! * <p/> ! * This method is implemented in a race-free way. Another thread can invoke ! * this method even before this fiber goes into the suspension mode. So the caller ! * need not worry about synchronizing {@link NextAction#suspend()} and this method. ! * ! * @param throwable exception that is used in the resumed processing ! * @param packet Packet that will be visible on the Fiber after the resume ! * @since 2.2.8 ! */ ! public void resume(@NotNull Throwable throwable, @NotNull Packet packet) { ! resume(throwable, packet, false); } /** * Wakes up a suspend fiber with an exception. *
*** 537,564 **** * * @param error exception that is used in the resumed processing * @param forceSync if processing begins synchronously * @since 2.2.6 */ ! public synchronized void resume(@NotNull Throwable error, boolean forceSync) { if(isTraceEnabled()) ! LOGGER.fine(getName()+" resumed with Return Throwable"); next = null; throwable = error; resume(packet, forceSync); } /** * Marks this Fiber as cancelled. A cancelled Fiber will never invoke its completion callback ! * @param mayInterrupt if cancel should use {@link Thread.interrupt()} ! * @see java.util.Future.cancel * @since 2.2.6 */ public void cancel(boolean mayInterrupt) { isCanceled = true; if (mayInterrupt) { synchronized(this) { if (currentThread != null) currentThread.interrupt(); } } --- 563,611 ---- * * @param error exception that is used in the resumed processing * @param forceSync if processing begins synchronously * @since 2.2.6 */ ! public void resume(@NotNull Throwable error, ! boolean forceSync) { ! resume(error, packet, forceSync); ! } ! ! /** ! * Wakes up a suspend fiber with an exception. ! * ! * If forceSync is true, then the suspended fiber will resume with ! * synchronous processing on the current thread. This will continue ! * until some Tube indicates that it is safe to switch to asynchronous ! * processing. ! * ! * @param error exception that is used in the resumed processing ! * @param packet Packet that will be visible on the Fiber after the resume ! * @param forceSync if processing begins synchronously ! * @since 2.2.8 ! */ ! public void resume(@NotNull Throwable error, ! @NotNull Packet packet, boolean forceSync) { if(isTraceEnabled()) ! LOGGER.log(Level.FINE, "{0} resumed with Return Throwable", getName()); next = null; throwable = error; resume(packet, forceSync); } /** * Marks this Fiber as cancelled. A cancelled Fiber will never invoke its completion callback ! * @param mayInterrupt if cancel should use {@link Thread#interrupt()} ! * @see java.util.concurrent.Future#cancel(boolean) * @since 2.2.6 */ + @Override public void cancel(boolean mayInterrupt) { isCanceled = true; if (mayInterrupt) { + // synchronized(this) is used as Thread running Fiber will be holding lock synchronized(this) { if (currentThread != null) currentThread.interrupt(); } }
*** 567,584 **** /** * Suspends this fiber's execution until the resume method is invoked. * <p/> * The call returns immediately, and when the fiber is resumed * the execution picks up from the last scheduled continuation. */ ! private boolean suspend() { ! ! synchronized(this) { if(isTraceEnabled()) { ! LOGGER.fine(getName()+" suspending. Will have suspendedCount=" + (suspendedCount+1)); if (suspendedCount > 0) { ! LOGGER.fine("WARNING - " + getName()+" suspended more than resumed. Will require more than one resume to actually resume this fiber."); } } List<Listener> listeners = getCurrentListeners(); if (++suspendedCount == 1) { --- 614,631 ---- /** * Suspends this fiber's execution until the resume method is invoked. * <p/> * The call returns immediately, and when the fiber is resumed * the execution picks up from the last scheduled continuation. + * @param onExitRunnable runnable to be invoked after fiber is marked for suspension + * @return if control loop must exit */ ! private boolean suspend(Holder<Boolean> isRequireUnlock, Runnable onExitRunnable) { if(isTraceEnabled()) { ! LOGGER.log(Level.FINE, "{0} suspending. Will have suspendedCount={1}", new Object[]{getName(), suspendedCount+1}); if (suspendedCount > 0) { ! LOGGER.log(Level.FINE, "WARNING - {0} suspended more than resumed. Will require more than one resume to actually resume this fiber.", getName()); } } List<Listener> listeners = getCurrentListeners(); if (++suspendedCount == 1) {
*** 587,597 **** for (Listener listener: listeners) { try { listener.fiberSuspended(this); } catch (Throwable e) { if(isTraceEnabled()) ! LOGGER.fine("Listener " + listener + " threw exception: " + e.getMessage()); } } } finally { isInsideSuspendCallbacks = false; } --- 634,644 ---- for (Listener listener: listeners) { try { listener.fiberSuspended(this); } catch (Throwable e) { if(isTraceEnabled()) ! LOGGER.log(Level.FINE, "Listener {0} threw exception: {1}", new Object[]{listener, e.getMessage()}); } } } finally { isInsideSuspendCallbacks = false; }
*** 602,619 **** for (Listener listener: listeners) { try { listener.fiberResumed(this); } catch (Throwable e) { if(isTraceEnabled()) ! LOGGER.fine("Listener " + listener + " threw exception: " + e.getMessage()); } } ! return false; } return true; } } /** * Adds a new {@link FiberContextSwitchInterceptor} to this fiber. --- 649,702 ---- for (Listener listener: listeners) { try { listener.fiberResumed(this); } catch (Throwable e) { if(isTraceEnabled()) ! LOGGER.log(Level.FINE, "Listener {0} threw exception: {1}", new Object[]{listener, e.getMessage()}); } } ! } else if (onExitRunnable != null) { ! // synchronous use cases cannot disconnect from the current thread ! if (!synchronous) { ! /* INTENTIONALLY UNLOCKING EARLY */ ! synchronized(this) { ! // currentThread is protected by the monitor for this fiber so ! // that it is accessible to cancel() even when the lock is held ! currentThread = null; ! } ! lock.unlock(); ! assert(!lock.isHeldByCurrentThread()); ! isRequireUnlock.value = Boolean.FALSE; ! ! try { ! onExitRunnable.run(); ! } catch(Throwable t) { ! throw new OnExitRunnableException(t); } return true; + + } else { + // for synchronous we will stay with current thread, so do not disconnect + if (isTraceEnabled()) + LOGGER.fine("onExitRunnable used with synchronous Fiber execution -- not exiting current thread"); + onExitRunnable.run(); + } + } + + return false; + } + + private static final class OnExitRunnableException extends RuntimeException { + private static final long serialVersionUID = 1L; + + Throwable target; + + public OnExitRunnableException(Throwable target) { + super((Throwable)null); // see pattern for InvocationTargetException + this.target = target; } } /** * Adds a new {@link FiberContextSwitchInterceptor} to this fiber.
*** 632,648 **** * <li>interceptor gets installed * <li>interceptor.execute() is invoked * <li>Y.processRequest() * </ol> */ ! public void addInterceptor(@NotNull FiberContextSwitchInterceptor interceptor) { if (interceptors == null) { interceptors = new ArrayList<FiberContextSwitchInterceptor>(); ! interceptorHandler = new InterceptorHandler(); } interceptors.add(interceptor); - needsToReenter = true; } /** * Removes a {@link FiberContextSwitchInterceptor} from this fiber. * <p/> --- 715,733 ---- * <li>interceptor gets installed * <li>interceptor.execute() is invoked * <li>Y.processRequest() * </ol> */ ! public synchronized void addInterceptor(@NotNull FiberContextSwitchInterceptor interceptor) { if (interceptors == null) { interceptors = new ArrayList<FiberContextSwitchInterceptor>(); ! } else { ! List<FiberContextSwitchInterceptor> l = new ArrayList<FiberContextSwitchInterceptor>(); ! l.addAll(interceptors); ! interceptors = l; } interceptors.add(interceptor); } /** * Removes a {@link FiberContextSwitchInterceptor} from this fiber. * <p/>
*** 664,677 **** * </ol> * * @return true if the specified interceptor was removed. False if * the specified interceptor was not registered with this fiber to begin with. */ ! public boolean removeInterceptor(@NotNull FiberContextSwitchInterceptor interceptor) { ! if (interceptors != null && interceptors.remove(interceptor)) { ! needsToReenter = true; ! return true; } return false; } /** --- 749,769 ---- * </ol> * * @return true if the specified interceptor was removed. False if * the specified interceptor was not registered with this fiber to begin with. */ ! public synchronized boolean removeInterceptor(@NotNull FiberContextSwitchInterceptor interceptor) { ! if (interceptors != null) { ! boolean result = interceptors.remove(interceptor); ! if (interceptors.isEmpty()) ! interceptors = null; ! else { ! List<FiberContextSwitchInterceptor> l = new ArrayList<FiberContextSwitchInterceptor>(); ! l.addAll(interceptors); ! interceptors = l; ! } ! return result; } return false; } /**
*** 695,707 **** /** * DO NOT CALL THIS METHOD. This is an implementation detail * of {@link Fiber}. */ @Deprecated public void run() { assert !synchronous; ! doRun(); if (startedSync && suspendedCount == 0 && (next != null || contsSize > 0)) { // We bailed out of running this fiber we started as sync, and now // want to finish running it async startedSync = false; --- 787,803 ---- /** * DO NOT CALL THIS METHOD. This is an implementation detail * of {@link Fiber}. */ @Deprecated + @Override public void run() { + Container old = ContainerResolver.getDefault().enterContainer(owner.getContainer()); + try { assert !synchronous; ! // doRun returns true to indicate an early exit from fiber processing ! if (!doRun()) { if (startedSync && suspendedCount == 0 && (next != null || contsSize > 0)) { // We bailed out of running this fiber we started as sync, and now // want to finish running it async startedSync = false;
*** 710,719 **** --- 806,819 ---- owner.addRunnable(this); } else { completionCheck(); } } + } finally { + ContainerResolver.getDefault().exitContainer(old); + } + } /** * Runs a given {@link Tube} (and everything thereafter) synchronously. * <p/> * <p/>
*** 737,749 **** * @param tubeline The first tube of the tubeline that will act on the packet. * @param request The request packet to be passed to <tt>startPoint.processRequest()</tt>. * @return The response packet to the <tt>request</tt>. * @see #start(Tube, Packet, CompletionCallback) */ ! public synchronized @NotNull Packet runSync(@NotNull Tube tubeline, @NotNull Packet request) { // save the current continuation, so that we return runSync() without executing them. final Tube[] oldCont = conts; final int oldContSize = contsSize; final boolean oldSynchronous = synchronous; final Tube oldNext = next; --- 837,851 ---- * @param tubeline The first tube of the tubeline that will act on the packet. * @param request The request packet to be passed to <tt>startPoint.processRequest()</tt>. * @return The response packet to the <tt>request</tt>. * @see #start(Tube, Packet, CompletionCallback) */ ! public @NotNull Packet runSync(@NotNull Tube tubeline, @NotNull Packet request) { + lock.lock(); + try { // save the current continuation, so that we return runSync() without executing them. final Tube[] oldCont = conts; final int oldContSize = contsSize; final boolean oldSynchronous = synchronous; final Tube oldNext = next;
*** 757,775 **** --- 859,881 ---- synchronous = true; this.packet = request; next = tubeline; doRun(); if (throwable != null) { + if (isDeliverThrowableInPacket) { + packet.addSatellite(new ThrowableContainerPropertySet(throwable)); + } else { if (throwable instanceof RuntimeException) { throw (RuntimeException) throwable; } if (throwable instanceof Error) { throw (Error) throwable; } // our system is supposed to only accept Error or RuntimeException throw new AssertionError(throwable); } + } return this.packet; } finally { conts = oldCont; contsSize = oldContSize; synchronous = oldSynchronous;
*** 779,967 **** interrupted = false; } if(!started && !startedSync) completionCheck(); } } ! private synchronized void completionCheck() { // Don't trigger completion and callbacks if fiber is suspended if(!isCanceled && contsSize==0 && suspendedCount == 0) { if(isTraceEnabled()) ! LOGGER.fine(getName()+" completed"); ! completed = true; clearListeners(); ! notifyAll(); if (completionCallback != null) { ! if (throwable != null) completionCallback.onCompletion(throwable); ! else completionCallback.onCompletion(packet); } } } - - ///** - // * Blocks until the fiber completes. - // */ - //public synchronized void join() throws InterruptedException { - // while(!completed) - // wait(); - //} /** * Invokes all registered {@link InterceptorHandler}s and then call into * {@link Fiber#__doRun()}. */ private class InterceptorHandler implements FiberContextSwitchInterceptor.Work<Tube, Tube> { /** * Index in {@link Fiber#interceptors} to invoke next. */ private int idx; /** * Initiate the interception, and eventually invokes {@link Fiber#__doRun()}. */ Tube invoke(Tube next) { idx = 0; return execute(next); } public Tube execute(Tube next) { ! if (idx == interceptors.size()) { Fiber.this.next = next; ! __doRun(); } else { ! FiberContextSwitchInterceptor interceptor = interceptors.get(idx++); return interceptor.execute(Fiber.this, next, this); } return Fiber.this.next; } } /** * Executes the fiber as much as possible. * */ ! @SuppressWarnings({"LoopStatementThatDoesntLoop"}) // IntelliJ reports this bogus error ! private void doRun() { ! dumpFiberContext("running"); if (serializeExecution) { serializedExecutionLock.lock(); try { ! _doRun(next); } finally { serializedExecutionLock.unlock(); } } else { ! _doRun(next); } } ! private String currentThreadMonitor = "CurrentThreadMonitor"; ! ! private void _doRun(Tube next) { ! Thread thread; ! synchronized(currentThreadMonitor) { ! if (currentThread != null && !synchronous) { ! if (LOGGER.isLoggable(Level.FINE)) { ! LOGGER.fine("Attempt to run Fiber ['" + this + "'] in more than one thread. Current Thread: " + currentThread + " Attempted Thread: " + Thread.currentThread()); ! } ! while (currentThread != null) { try { ! currentThreadMonitor.wait(); ! } catch (Exception e) { ! // ignore ! } ! } ! } currentThread = Thread.currentThread(); ! thread = currentThread; ! if (LOGGER.isLoggable(Level.FINE)) { ! LOGGER.fine("Thread entering _doRun(): " + thread); } } - ClassLoader old = thread.getContextClassLoader(); - thread.setContextClassLoader(contextClassLoader); try { do { - needsToReenter = false; - // if interceptors are set, go through the interceptors. ! if(interceptorHandler ==null) { this.next = next; ! __doRun(); } - else - next = interceptorHandler.invoke(next); - } while (needsToReenter); } finally { thread.setContextClassLoader(old); ! synchronized(currentThreadMonitor) { currentThread = null; - if (LOGGER.isLoggable(Level.FINE)) { - LOGGER.fine("Thread leaving _doRun(): " + thread); } ! currentThreadMonitor.notify(); } } } /** * To be invoked from {@link #doRun()}. * * @see #doRun() */ ! private void __doRun() { final Fiber old = CURRENT_FIBER.get(); CURRENT_FIBER.set(this); // if true, lots of debug messages to show what's being executed final boolean traceEnabled = LOGGER.isLoggable(Level.FINER); try { boolean abortResponse = false; ! boolean justSuspended = false; ! while(!isCanceled && !isBlocking(justSuspended) && !needsToReenter) { try { NextAction na; Tube last; if(throwable!=null) { if(contsSize==0 || abortResponse) { contsSize = 0; // abortResponse case // nothing else to execute. we are done. ! return; } last = popCont(); if (traceEnabled) ! LOGGER.finer(getName() + ' ' + last + ".processException(" + throwable + ')'); na = last.processException(throwable); } else { if(next!=null) { if(traceEnabled) ! LOGGER.finer(getName()+' '+next+".processRequest("+(packet != null ? "Packet@"+Integer.toHexString(packet.hashCode()) : "null")+')'); na = next.processRequest(packet); last = next; } else { if(contsSize==0 || abortResponse) { // nothing else to execute. we are done. contsSize = 0; ! return; } last = popCont(); if(traceEnabled) ! LOGGER.finer(getName()+' '+last+".processResponse("+(packet != null ? "Packet@"+Integer.toHexString(packet.hashCode()) : "null")+')'); na = last.processResponse(packet); } } if (traceEnabled) ! LOGGER.finer(getName() + ' ' + last + " returned with " + na); // If resume is called before suspend, then make sure // resume(Packet) is not lost if (na.kind != NextAction.SUSPEND) { // preserve in-flight packet so that processException may inspect --- 885,1142 ---- interrupted = false; } if(!started && !startedSync) completionCheck(); } + } finally { + lock.unlock(); + } } ! private void completionCheck() { ! lock.lock(); ! try { // Don't trigger completion and callbacks if fiber is suspended if(!isCanceled && contsSize==0 && suspendedCount == 0) { if(isTraceEnabled()) ! LOGGER.log(Level.FINE, "{0} completed", getName()); clearListeners(); ! condition.signalAll(); if (completionCallback != null) { ! if (throwable != null) { ! if (isDeliverThrowableInPacket) { ! packet.addSatellite(new ThrowableContainerPropertySet(throwable)); ! completionCallback.onCompletion(packet); ! } else completionCallback.onCompletion(throwable); ! } else completionCallback.onCompletion(packet); } } + } finally { + lock.unlock(); + } } /** * Invokes all registered {@link InterceptorHandler}s and then call into * {@link Fiber#__doRun()}. */ private class InterceptorHandler implements FiberContextSwitchInterceptor.Work<Tube, Tube> { + private final Holder<Boolean> isUnlockRequired; + private final List<FiberContextSwitchInterceptor> ints; + /** * Index in {@link Fiber#interceptors} to invoke next. */ private int idx; + public InterceptorHandler(Holder<Boolean> isUnlockRequired, List<FiberContextSwitchInterceptor> ints) { + this.isUnlockRequired = isUnlockRequired; + this.ints = ints; + } + /** * Initiate the interception, and eventually invokes {@link Fiber#__doRun()}. */ Tube invoke(Tube next) { idx = 0; return execute(next); } + @Override public Tube execute(Tube next) { ! if (idx == ints.size()) { Fiber.this.next = next; ! if (__doRun(isUnlockRequired, ints)) ! return PLACEHOLDER; } else { ! FiberContextSwitchInterceptor interceptor = ints.get(idx++); return interceptor.execute(Fiber.this, next, this); } return Fiber.this.next; } } + private static final PlaceholderTube PLACEHOLDER = new PlaceholderTube(); + + private static class PlaceholderTube extends AbstractTubeImpl { + + @Override + public NextAction processRequest(Packet request) { + throw new UnsupportedOperationException(); + } + + @Override + public NextAction processResponse(Packet response) { + throw new UnsupportedOperationException(); + } + + @Override + public NextAction processException(Throwable t) { + return doThrow(t); + } + + @Override + public void preDestroy() { + } + + @Override + public PlaceholderTube copy(TubeCloner cloner) { + throw new UnsupportedOperationException(); + } + } + /** * Executes the fiber as much as possible. * */ ! private boolean doRun() { dumpFiberContext("running"); if (serializeExecution) { serializedExecutionLock.lock(); try { ! return _doRun(next); } finally { serializedExecutionLock.unlock(); } } else { ! return _doRun(next); } } ! private boolean _doRun(Tube next) { ! // isRequireUnlock will contain Boolean.FALSE when lock has already been released in suspend ! Holder<Boolean> isRequireUnlock = new Holder<Boolean>(Boolean.TRUE); ! lock.lock(); try { ! List<FiberContextSwitchInterceptor> ints; ! ClassLoader old; ! synchronized(this) { ! ints = interceptors; ! ! // currentThread is protected by the monitor for this fiber so ! // that it is accessible to cancel() even when the lock is held currentThread = Thread.currentThread(); ! if (isTraceEnabled()) { ! LOGGER.log(Level.FINE, "Thread entering _doRun(): {0}", currentThread); } + + old = currentThread.getContextClassLoader(); + currentThread.setContextClassLoader(contextClassLoader); } try { + boolean needsToReenter = false; do { // if interceptors are set, go through the interceptors. ! if (ints == null) { this.next = next; ! if (__doRun(isRequireUnlock, ints)) { ! return true; ! } ! } else { ! next = new InterceptorHandler(isRequireUnlock, ints).invoke(next); ! if (next == PLACEHOLDER) { ! return true; ! } } + synchronized(this) { + needsToReenter = (ints != interceptors); + if (needsToReenter) + ints = interceptors; + } + } while (needsToReenter); + } catch(OnExitRunnableException o) { + // catching this exception indicates onExitRunnable in suspend() threw. + // we must still avoid double unlock + Throwable t = o.target; + if (t instanceof WebServiceException) + throw (WebServiceException) t; + throw new WebServiceException(t); } finally { + // don't reference currentThread here because fiber processing + // may already be running on a different thread (Note: isAlreadyExited + // tracks this state + Thread thread = Thread.currentThread(); thread.setContextClassLoader(old); ! if (isTraceEnabled()) { ! LOGGER.log(Level.FINE, "Thread leaving _doRun(): {0}", thread); ! } ! } ! ! return false; ! } finally { ! if (isRequireUnlock.value) { ! synchronized(this) { currentThread = null; } ! lock.unlock(); } } } /** * To be invoked from {@link #doRun()}. * * @see #doRun() */ ! private boolean __doRun(Holder<Boolean> isRequireUnlock, List<FiberContextSwitchInterceptor> originalInterceptors) { ! assert(lock.isHeldByCurrentThread()); ! final Fiber old = CURRENT_FIBER.get(); CURRENT_FIBER.set(this); // if true, lots of debug messages to show what's being executed final boolean traceEnabled = LOGGER.isLoggable(Level.FINER); try { boolean abortResponse = false; ! while(isReady(originalInterceptors)) { ! if (isCanceled) { ! next = null; ! throwable = null; ! contsSize = 0; ! break; ! } ! try { NextAction na; Tube last; if(throwable!=null) { if(contsSize==0 || abortResponse) { contsSize = 0; // abortResponse case // nothing else to execute. we are done. ! return false; } last = popCont(); if (traceEnabled) ! LOGGER.log(Level.FINER, "{0} {1}.processException({2})", new Object[]{getName(), last, throwable}); na = last.processException(throwable); } else { if(next!=null) { if(traceEnabled) ! LOGGER.log(Level.FINER, "{0} {1}.processRequest({2})", new Object[]{getName(), next, packet != null ? "Packet@"+Integer.toHexString(packet.hashCode()) : "null"}); na = next.processRequest(packet); last = next; } else { if(contsSize==0 || abortResponse) { // nothing else to execute. we are done. contsSize = 0; ! return false; } last = popCont(); if(traceEnabled) ! LOGGER.log(Level.FINER, "{0} {1}.processResponse({2})", new Object[]{getName(), last, packet != null ? "Packet@"+Integer.toHexString(packet.hashCode()) : "null"}); na = last.processResponse(packet); } } if (traceEnabled) ! LOGGER.log(Level.FINER, "{0} {1} returned with {2}", new Object[]{getName(), last, na}); // If resume is called before suspend, then make sure // resume(Packet) is not lost if (na.kind != NextAction.SUSPEND) { // preserve in-flight packet so that processException may inspect
*** 979,996 **** case NextAction.INVOKE_AND_FORGET: next = na.next; if (na.kind == NextAction.INVOKE_ASYNC && startedSync) { // Break out here ! return; } break; case NextAction.THROW_ABORT_RESPONSE: case NextAction.ABORT_RESPONSE: abortResponse = true; ! if (LOGGER.isLoggable(Level.FINE)) { ! LOGGER.fine("Fiber " + this + " is aborting a response due to exception: " + na.throwable); } case NextAction.RETURN: case NextAction.THROW: next = null; break; --- 1154,1171 ---- case NextAction.INVOKE_AND_FORGET: next = na.next; if (na.kind == NextAction.INVOKE_ASYNC && startedSync) { // Break out here ! return false; } break; case NextAction.THROW_ABORT_RESPONSE: case NextAction.ABORT_RESPONSE: abortResponse = true; ! if (isTraceEnabled()) { ! LOGGER.log(Level.FINE, "Fiber {0} is aborting a response due to exception: {1}", new Object[]{this, na.throwable}); } case NextAction.RETURN: case NextAction.THROW: next = null; break;
*** 999,1009 **** // Only store the 'last' tube when we're processing // a request, since conts array is for processResponse pushCont(last); } next = na.next; ! justSuspended = suspend(); break; default: throw new AssertionError(); } } catch (RuntimeException t) { --- 1174,1185 ---- // Only store the 'last' tube when we're processing // a request, since conts array is for processResponse pushCont(last); } next = na.next; ! if(suspend(isRequireUnlock, na.onExitRunnable)) ! return true; // explicitly exiting control loop break; default: throw new AssertionError(); } } catch (RuntimeException t) {
*** 1017,1038 **** } dumpFiberContext("After tube execution"); } - if (isCanceled) { - next = null; - throwable = null; - contsSize = 0; - } - // there's nothing we can execute right away. // we'll be back when this fiber is resumed. } finally { CURRENT_FIBER.set(old); } } private void pushCont(Tube tube) { conts[contsSize++] = tube; --- 1193,1210 ---- } dumpFiberContext("After tube execution"); } // there's nothing we can execute right away. // we'll be back when this fiber is resumed. } finally { CURRENT_FIBER.set(old); } + + return false; } private void pushCont(Tube tube) { conts[contsSize++] = tube;
*** 1066,1095 **** this.conts = conts; this.contsSize = contsSize; } /** ! * Returns true if the fiber needs to block its execution. */ ! private boolean isBlocking(boolean justSuspended) { if (synchronous) { while (suspendedCount == 1) try { if (isTraceEnabled()) { ! LOGGER.fine(getName() + " is blocking thread " + Thread.currentThread().getName()); } ! wait(); // the synchronized block is the whole runSync method. } catch (InterruptedException e) { // remember that we are interrupted, but don't respond to it // right away. This behavior is in line with what happens // when you are actually running the whole thing synchronously. interrupted = true; } return false; } - else - return justSuspended || suspendedCount==1; } private String getName() { return "engine-" + owner.id + "fiber-" + id; } --- 1238,1275 ---- this.conts = conts; this.contsSize = contsSize; } /** ! * Returns true if the fiber is ready to execute. */ ! private boolean isReady(List<FiberContextSwitchInterceptor> originalInterceptors) { if (synchronous) { while (suspendedCount == 1) try { if (isTraceEnabled()) { ! LOGGER.log(Level.FINE, "{0} is blocking thread {1}", new Object[]{getName(), Thread.currentThread().getName()}); } ! condition.await(); // the synchronized block is the whole runSync method. } catch (InterruptedException e) { // remember that we are interrupted, but don't respond to it // right away. This behavior is in line with what happens // when you are actually running the whole thing synchronously. interrupted = true; } + + synchronized(this) { + return interceptors == originalInterceptors; + } + } + else { + if (suspendedCount>0) return false; + synchronized(this) { + return interceptors == originalInterceptors; + } } } private String getName() { return "engine-" + owner.id + "fiber-" + id; }
*** 1213,1230 **** */ public static volatile boolean serializeExecution = Boolean.getBoolean(Fiber.class.getName() + ".serialize"); private final Set<Component> components = new CopyOnWriteArraySet<Component>(); public <S> S getSPI(Class<S> spiType) { ! for(Component c : components) { S spi = c.getSPI(spiType); ! if (spi != null) return spi; } return null; } public Set<Component> getComponents() { return components; } } --- 1393,1413 ---- */ public static volatile boolean serializeExecution = Boolean.getBoolean(Fiber.class.getName() + ".serialize"); private final Set<Component> components = new CopyOnWriteArraySet<Component>(); + @Override public <S> S getSPI(Class<S> spiType) { ! for (Component c : components) { S spi = c.getSPI(spiType); ! if (spi != null) { return spi; } + } return null; } + @Override public Set<Component> getComponents() { return components; } }