src/share/jaxws_classes/com/sun/xml/internal/ws/api/pipe/Fiber.java
Print this page
*** 1,7 ****
/*
! * Copyright (c) 1997, 2010, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
--- 1,7 ----
/*
! * Copyright (c) 1997, 2013, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
*** 30,53 ****
import com.sun.xml.internal.ws.api.Cancelable;
import com.sun.xml.internal.ws.api.Component;
import com.sun.xml.internal.ws.api.ComponentRegistry;
import com.sun.xml.internal.ws.api.SOAPVersion;
import com.sun.xml.internal.ws.api.addressing.AddressingVersion;
import com.sun.xml.internal.ws.api.message.Packet;
import com.sun.xml.internal.ws.api.pipe.helper.AbstractFilterTubeImpl;
import com.sun.xml.internal.ws.api.server.Adapter;
import java.util.ArrayList;
- import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* User-level thread. Represents the execution of one request/response processing.
* <p/>
* <p/>
* JAX-WS RI is capable of running a large number of request/response concurrently by
--- 30,60 ----
import com.sun.xml.internal.ws.api.Cancelable;
import com.sun.xml.internal.ws.api.Component;
import com.sun.xml.internal.ws.api.ComponentRegistry;
import com.sun.xml.internal.ws.api.SOAPVersion;
import com.sun.xml.internal.ws.api.addressing.AddressingVersion;
+ import com.sun.xml.internal.ws.api.message.AddressingUtils;
import com.sun.xml.internal.ws.api.message.Packet;
import com.sun.xml.internal.ws.api.pipe.helper.AbstractFilterTubeImpl;
+ import com.sun.xml.internal.ws.api.pipe.helper.AbstractTubeImpl;
import com.sun.xml.internal.ws.api.server.Adapter;
+ import com.sun.xml.internal.ws.api.server.Container;
+ import com.sun.xml.internal.ws.api.server.ContainerResolver;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicInteger;
+ import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;
+ import javax.xml.ws.Holder;
+ import javax.xml.ws.WebServiceException;
+
/**
* User-level thread. Represents the execution of one request/response processing.
* <p/>
* <p/>
* JAX-WS RI is capable of running a large number of request/response concurrently by
*** 106,115 ****
--- 113,123 ----
/**
* Callback interface for notification of suspend and resume.
*
* @since 2.2.6
+ * @deprecated Use {@link NextAction#suspend(Runnable)}
*/
public interface Listener {
/**
* Fiber has been suspended. Implementations of this callback may resume the Fiber.
* @param fiber Fiber
*** 121,136 ****
* @param fiber Fiber
*/
public void fiberResumed(Fiber fiber);
}
! private List<Listener> _listeners = new ArrayList<Listener>();
/**
* Adds suspend/resume callback listener
* @param listener Listener
* @since 2.2.6
*/
public void addListener(Listener listener) {
synchronized(_listeners) {
if (!_listeners.contains(listener)) {
_listeners.add(listener);
--- 129,145 ----
* @param fiber Fiber
*/
public void fiberResumed(Fiber fiber);
}
! private final List<Listener> _listeners = new ArrayList<Listener>();
/**
* Adds suspend/resume callback listener
* @param listener Listener
* @since 2.2.6
+ * @deprecated
*/
public void addListener(Listener listener) {
synchronized(_listeners) {
if (!_listeners.contains(listener)) {
_listeners.add(listener);
*** 140,157 ****
/**
* Removes suspend/resume callback listener
* @param listener Listener
* @since 2.2.6
*/
public void removeListener(Listener listener) {
synchronized(_listeners) {
_listeners.remove(listener);
}
}
! private List<Listener> getCurrentListeners() {
synchronized(_listeners) {
return new ArrayList<Listener>(_listeners);
}
}
--- 149,167 ----
/**
* Removes suspend/resume callback listener
* @param listener Listener
* @since 2.2.6
+ * @deprecated
*/
public void removeListener(Listener listener) {
synchronized(_listeners) {
_listeners.remove(listener);
}
}
! List<Listener> getCurrentListeners() {
synchronized(_listeners) {
return new ArrayList<Listener>(_listeners);
}
}
*** 208,222 ****
private volatile int suspendedCount = 0;
private volatile boolean isInsideSuspendCallbacks = false;
/**
- * Is this fiber completed?
- */
- private volatile boolean completed;
-
- /**
* Is this {@link Fiber} currently running in the synchronous mode?
*/
private boolean synchronous;
private boolean interrupted;
--- 218,227 ----
*** 227,265 ****
* Active {@link FiberContextSwitchInterceptor}s for this fiber.
*/
private List<FiberContextSwitchInterceptor> interceptors;
/**
- * Not null when {@link #interceptors} is not null.
- */
- private InterceptorHandler interceptorHandler;
-
- /**
- * This flag is set to true when a new interceptor is added.
- * <p/>
- * When that happens, we need to first exit the current interceptors
- * and then reenter them, so that the newly added interceptors start
- * taking effect. This flag is used to control that flow.
- */
- private boolean needsToReenter;
-
- /**
* Fiber's context {@link ClassLoader}.
*/
private
@Nullable
ClassLoader contextClassLoader;
private
@Nullable
CompletionCallback completionCallback;
/**
* The thread on which this Fiber is currently executing, if applicable.
*/
private Thread currentThread;
private volatile boolean isCanceled;
/**
* Set to true if this fiber is started asynchronously, to avoid
* doubly-invoking completion code.
--- 232,269 ----
* Active {@link FiberContextSwitchInterceptor}s for this fiber.
*/
private List<FiberContextSwitchInterceptor> interceptors;
/**
* Fiber's context {@link ClassLoader}.
*/
private
@Nullable
ClassLoader contextClassLoader;
private
@Nullable
CompletionCallback completionCallback;
+ private boolean isDeliverThrowableInPacket = false;
+
+ public void setDeliverThrowableInPacket(boolean isDeliverThrowableInPacket) {
+ this.isDeliverThrowableInPacket = isDeliverThrowableInPacket;
+ }
+
/**
* The thread on which this Fiber is currently executing, if applicable.
*/
private Thread currentThread;
+ /**
+ * Replace uses of synchronized(this) with this lock so that we can control
+ * unlocking for resume use cases
+ */
+ private final ReentrantLock lock = new ReentrantLock();
+ private final Condition condition = lock.newCondition();
+
private volatile boolean isCanceled;
/**
* Set to true if this fiber is started asynchronously, to avoid
* doubly-invoking completion code.
*** 275,285 ****
* a message number before the remainder of the processing can be asynchronous.
*/
private boolean startedSync;
/**
! * Callback to be invoked when a {@link Fiber} finishs execution.
*/
public interface CompletionCallback {
/**
* Indicates that the fiber has finished its execution.
* <p/>
--- 279,289 ----
* a message number before the remainder of the processing can be asynchronous.
*/
private boolean startedSync;
/**
! * Callback to be invoked when a {@link Fiber} finishes execution.
*/
public interface CompletionCallback {
/**
* Indicates that the fiber has finished its execution.
* <p/>
*** 296,310 ****
void onCompletion(@NotNull Throwable error);
}
Fiber(Engine engine) {
this.owner = engine;
- if (isTraceEnabled()) {
id = iotaGen.incrementAndGet();
! LOGGER.fine(getName() + " created");
! } else {
! id = -1;
}
// if this is run from another fiber, then we naturally inherit its context classloader,
// so this code works for fiber->fiber inheritance just fine.
contextClassLoader = Thread.currentThread().getContextClassLoader();
--- 300,312 ----
void onCompletion(@NotNull Throwable error);
}
Fiber(Engine engine) {
this.owner = engine;
id = iotaGen.incrementAndGet();
! if (isTraceEnabled()) {
! LOGGER.log(Level.FINE, "{0} created", getName());
}
// if this is run from another fiber, then we naturally inherit its context classloader,
// so this code works for fiber->fiber inheritance just fine.
contextClassLoader = Thread.currentThread().getContextClassLoader();
*** 331,342 ****
String action = null;
String msgId = null;
if (packet != null) {
for (SOAPVersion sv: SOAPVersion.values()) {
for (AddressingVersion av: AddressingVersion.values()) {
! action = packet.getMessage() != null ? packet.getMessage().getHeaders().getAction(av, sv) : null;
! msgId = packet.getMessage() != null ? packet.getMessage().getHeaders().getMessageID(av, sv) : null;
if (action != null || msgId != null) {
break;
}
}
if (action != null || msgId != null) {
--- 333,344 ----
String action = null;
String msgId = null;
if (packet != null) {
for (SOAPVersion sv: SOAPVersion.values()) {
for (AddressingVersion av: AddressingVersion.values()) {
! action = packet.getMessage() != null ? AddressingUtils.getAction(packet.getMessage().getHeaders(), av, sv) : null;
! msgId = packet.getMessage() != null ? AddressingUtils.getMessageID(packet.getMessage().getHeaders(), av, sv) : null;
if (action != null || msgId != null) {
break;
}
}
if (action != null || msgId != null) {
*** 356,366 ****
tubeDesc = next.toString() + ".processRequest()";
} else {
tubeDesc = peekCont() + ".processResponse()";
}
! LOGGER.fine(getName() + " " + desc + " with " + actionAndMsgDesc + " and 'current' tube " + tubeDesc + " from thread " + Thread.currentThread().getName() + " with Packet: " + (packet != null ? packet.toShortString() : null));
}
}
/**
* Starts the execution of this fiber.
--- 358,368 ----
tubeDesc = next.toString() + ".processRequest()";
} else {
tubeDesc = peekCont() + ".processResponse()";
}
! LOGGER.log(Level.FINE, "{0} {1} with {2} and ''current'' tube {3} from thread {4} with Packet: {5}", new Object[]{getName(), desc, actionAndMsgDesc, tubeDesc, Thread.currentThread().getName(), packet != null ? packet.toShortString() : null});
}
}
/**
* Starts the execution of this fiber.
*** 443,453 ****
* For example, an asynchronous response endpoint that supports WS-ReliableMessaging
* including in-order message delivery may need to resume the Fiber synchronously
* until message order is confirmed prior to returning to asynchronous processing.
* @since 2.2.6
*/
! public synchronized void resume(@NotNull Packet resumePacket,
boolean forceSync) {
resume(resumePacket, forceSync, null);
}
/**
--- 445,455 ----
* For example, an asynchronous response endpoint that supports WS-ReliableMessaging
* including in-order message delivery may need to resume the Fiber synchronously
* until message order is confirmed prior to returning to asynchronous processing.
* @since 2.2.6
*/
! public void resume(@NotNull Packet resumePacket,
boolean forceSync) {
resume(resumePacket, forceSync, null);
}
/**
*** 456,510 ****
* @since 2.2.6
*/
public void resume(@NotNull Packet resumePacket,
boolean forceSync,
CompletionCallback callback) {
!
! synchronized(this) {
if (callback != null) {
setCompletionCallback(callback);
}
if(isTraceEnabled())
! LOGGER.fine(getName()+" resuming. Will have suspendedCount=" + (suspendedCount-1));
packet = resumePacket;
if( --suspendedCount == 0 ) {
if (!isInsideSuspendCallbacks) {
List<Listener> listeners = getCurrentListeners();
for (Listener listener: listeners) {
try {
listener.fiberResumed(this);
} catch (Throwable e) {
if (isTraceEnabled())
! LOGGER.fine("Listener " + listener + " threw exception: " + e.getMessage());
}
}
if(synchronous) {
! notifyAll();
} else if (forceSync || startedSync) {
run();
} else {
dumpFiberContext("resuming (async)");
owner.addRunnable(this);
}
}
} else {
if (isTraceEnabled()) {
! LOGGER.fine(getName() + " taking no action on resume because suspendedCount != 0: " + suspendedCount);
}
}
}
}
/**
* Wakes up a suspended fiber and begins response processing.
* @since 2.2.6
*/
! public synchronized void resumeAndReturn(@NotNull Packet resumePacket,
boolean forceSync) {
if(isTraceEnabled())
! LOGGER.fine(getName()+" resumed with Return Packet");
next = null;
resume(resumePacket, forceSync);
}
/**
--- 458,514 ----
* @since 2.2.6
*/
public void resume(@NotNull Packet resumePacket,
boolean forceSync,
CompletionCallback callback) {
! lock.lock();
! try {
if (callback != null) {
setCompletionCallback(callback);
}
if(isTraceEnabled())
! LOGGER.log(Level.FINE, "{0} resuming. Will have suspendedCount={1}", new Object[]{getName(), suspendedCount-1});
packet = resumePacket;
if( --suspendedCount == 0 ) {
if (!isInsideSuspendCallbacks) {
List<Listener> listeners = getCurrentListeners();
for (Listener listener: listeners) {
try {
listener.fiberResumed(this);
} catch (Throwable e) {
if (isTraceEnabled())
! LOGGER.log(Level.FINE, "Listener {0} threw exception: {1}", new Object[]{listener, e.getMessage()});
}
}
if(synchronous) {
! condition.signalAll();
} else if (forceSync || startedSync) {
run();
} else {
dumpFiberContext("resuming (async)");
owner.addRunnable(this);
}
}
} else {
if (isTraceEnabled()) {
! LOGGER.log(Level.FINE, "{0} taking no action on resume because suspendedCount != 0: {1}", new Object[]{getName(), suspendedCount});
}
}
+ } finally {
+ lock.unlock();
}
}
/**
* Wakes up a suspended fiber and begins response processing.
* @since 2.2.6
*/
! public void resumeAndReturn(@NotNull Packet resumePacket,
boolean forceSync) {
if(isTraceEnabled())
! LOGGER.log(Level.FINE, "{0} resumed with Return Packet", getName());
next = null;
resume(resumePacket, forceSync);
}
/**
*** 521,532 ****
* this method even before this fiber goes into the suspension mode. So the caller
* need not worry about synchronizing {@link NextAction#suspend()} and this method.
*
* @param throwable exception that is used in the resumed processing
*/
! public synchronized void resume(@NotNull Throwable throwable) {
! resume(throwable, false);
}
/**
* Wakes up a suspend fiber with an exception.
*
--- 525,558 ----
* this method even before this fiber goes into the suspension mode. So the caller
* need not worry about synchronizing {@link NextAction#suspend()} and this method.
*
* @param throwable exception that is used in the resumed processing
*/
! public void resume(@NotNull Throwable throwable) {
! resume(throwable, packet, false);
! }
!
! /**
! * Wakes up a suspended fiber with an exception.
! * <p/>
! * <p/>
! * The execution of the suspended fiber will be resumed in the response
! * processing direction, by calling the {@link Tube#processException(Throwable)} method
! * on the next/first {@link Tube} in the {@link Fiber}'s processing stack with
! * the specified exception as the parameter.
! * <p/>
! * <p/>
! * This method is implemented in a race-free way. Another thread can invoke
! * this method even before this fiber goes into the suspension mode. So the caller
! * need not worry about synchronizing {@link NextAction#suspend()} and this method.
! *
! * @param throwable exception that is used in the resumed processing
! * @param packet Packet that will be visible on the Fiber after the resume
! * @since 2.2.8
! */
! public void resume(@NotNull Throwable throwable, @NotNull Packet packet) {
! resume(throwable, packet, false);
}
/**
* Wakes up a suspend fiber with an exception.
*
*** 537,564 ****
*
* @param error exception that is used in the resumed processing
* @param forceSync if processing begins synchronously
* @since 2.2.6
*/
! public synchronized void resume(@NotNull Throwable error,
boolean forceSync) {
if(isTraceEnabled())
! LOGGER.fine(getName()+" resumed with Return Throwable");
next = null;
throwable = error;
resume(packet, forceSync);
}
/**
* Marks this Fiber as cancelled. A cancelled Fiber will never invoke its completion callback
! * @param mayInterrupt if cancel should use {@link Thread.interrupt()}
! * @see java.util.Future.cancel
* @since 2.2.6
*/
public void cancel(boolean mayInterrupt) {
isCanceled = true;
if (mayInterrupt) {
synchronized(this) {
if (currentThread != null)
currentThread.interrupt();
}
}
--- 563,611 ----
*
* @param error exception that is used in the resumed processing
* @param forceSync if processing begins synchronously
* @since 2.2.6
*/
! public void resume(@NotNull Throwable error,
! boolean forceSync) {
! resume(error, packet, forceSync);
! }
!
! /**
! * Wakes up a suspend fiber with an exception.
! *
! * If forceSync is true, then the suspended fiber will resume with
! * synchronous processing on the current thread. This will continue
! * until some Tube indicates that it is safe to switch to asynchronous
! * processing.
! *
! * @param error exception that is used in the resumed processing
! * @param packet Packet that will be visible on the Fiber after the resume
! * @param forceSync if processing begins synchronously
! * @since 2.2.8
! */
! public void resume(@NotNull Throwable error,
! @NotNull Packet packet,
boolean forceSync) {
if(isTraceEnabled())
! LOGGER.log(Level.FINE, "{0} resumed with Return Throwable", getName());
next = null;
throwable = error;
resume(packet, forceSync);
}
/**
* Marks this Fiber as cancelled. A cancelled Fiber will never invoke its completion callback
! * @param mayInterrupt if cancel should use {@link Thread#interrupt()}
! * @see java.util.concurrent.Future#cancel(boolean)
* @since 2.2.6
*/
+ @Override
public void cancel(boolean mayInterrupt) {
isCanceled = true;
if (mayInterrupt) {
+ // synchronized(this) is used as Thread running Fiber will be holding lock
synchronized(this) {
if (currentThread != null)
currentThread.interrupt();
}
}
*** 567,584 ****
/**
* Suspends this fiber's execution until the resume method is invoked.
* <p/>
* The call returns immediately, and when the fiber is resumed
* the execution picks up from the last scheduled continuation.
*/
! private boolean suspend() {
!
! synchronized(this) {
if(isTraceEnabled()) {
! LOGGER.fine(getName()+" suspending. Will have suspendedCount=" + (suspendedCount+1));
if (suspendedCount > 0) {
! LOGGER.fine("WARNING - " + getName()+" suspended more than resumed. Will require more than one resume to actually resume this fiber.");
}
}
List<Listener> listeners = getCurrentListeners();
if (++suspendedCount == 1) {
--- 614,631 ----
/**
* Suspends this fiber's execution until the resume method is invoked.
* <p/>
* The call returns immediately, and when the fiber is resumed
* the execution picks up from the last scheduled continuation.
+ * @param onExitRunnable runnable to be invoked after fiber is marked for suspension
+ * @return if control loop must exit
*/
! private boolean suspend(Holder<Boolean> isRequireUnlock, Runnable onExitRunnable) {
if(isTraceEnabled()) {
! LOGGER.log(Level.FINE, "{0} suspending. Will have suspendedCount={1}", new Object[]{getName(), suspendedCount+1});
if (suspendedCount > 0) {
! LOGGER.log(Level.FINE, "WARNING - {0} suspended more than resumed. Will require more than one resume to actually resume this fiber.", getName());
}
}
List<Listener> listeners = getCurrentListeners();
if (++suspendedCount == 1) {
*** 587,597 ****
for (Listener listener: listeners) {
try {
listener.fiberSuspended(this);
} catch (Throwable e) {
if(isTraceEnabled())
! LOGGER.fine("Listener " + listener + " threw exception: " + e.getMessage());
}
}
} finally {
isInsideSuspendCallbacks = false;
}
--- 634,644 ----
for (Listener listener: listeners) {
try {
listener.fiberSuspended(this);
} catch (Throwable e) {
if(isTraceEnabled())
! LOGGER.log(Level.FINE, "Listener {0} threw exception: {1}", new Object[]{listener, e.getMessage()});
}
}
} finally {
isInsideSuspendCallbacks = false;
}
*** 602,619 ****
for (Listener listener: listeners) {
try {
listener.fiberResumed(this);
} catch (Throwable e) {
if(isTraceEnabled())
! LOGGER.fine("Listener " + listener + " threw exception: " + e.getMessage());
}
}
! return false;
}
return true;
}
}
/**
* Adds a new {@link FiberContextSwitchInterceptor} to this fiber.
--- 649,702 ----
for (Listener listener: listeners) {
try {
listener.fiberResumed(this);
} catch (Throwable e) {
if(isTraceEnabled())
! LOGGER.log(Level.FINE, "Listener {0} threw exception: {1}", new Object[]{listener, e.getMessage()});
}
}
! } else if (onExitRunnable != null) {
! // synchronous use cases cannot disconnect from the current thread
! if (!synchronous) {
! /* INTENTIONALLY UNLOCKING EARLY */
! synchronized(this) {
! // currentThread is protected by the monitor for this fiber so
! // that it is accessible to cancel() even when the lock is held
! currentThread = null;
! }
! lock.unlock();
! assert(!lock.isHeldByCurrentThread());
! isRequireUnlock.value = Boolean.FALSE;
!
! try {
! onExitRunnable.run();
! } catch(Throwable t) {
! throw new OnExitRunnableException(t);
}
return true;
+
+ } else {
+ // for synchronous we will stay with current thread, so do not disconnect
+ if (isTraceEnabled())
+ LOGGER.fine("onExitRunnable used with synchronous Fiber execution -- not exiting current thread");
+ onExitRunnable.run();
+ }
+ }
+
+ return false;
+ }
+
+ private static final class OnExitRunnableException extends RuntimeException {
+ private static final long serialVersionUID = 1L;
+
+ Throwable target;
+
+ public OnExitRunnableException(Throwable target) {
+ super((Throwable)null); // see pattern for InvocationTargetException
+ this.target = target;
}
}
/**
* Adds a new {@link FiberContextSwitchInterceptor} to this fiber.
*** 632,648 ****
* <li>interceptor gets installed
* <li>interceptor.execute() is invoked
* <li>Y.processRequest()
* </ol>
*/
! public void addInterceptor(@NotNull FiberContextSwitchInterceptor interceptor) {
if (interceptors == null) {
interceptors = new ArrayList<FiberContextSwitchInterceptor>();
! interceptorHandler = new InterceptorHandler();
}
interceptors.add(interceptor);
- needsToReenter = true;
}
/**
* Removes a {@link FiberContextSwitchInterceptor} from this fiber.
* <p/>
--- 715,733 ----
* <li>interceptor gets installed
* <li>interceptor.execute() is invoked
* <li>Y.processRequest()
* </ol>
*/
! public synchronized void addInterceptor(@NotNull FiberContextSwitchInterceptor interceptor) {
if (interceptors == null) {
interceptors = new ArrayList<FiberContextSwitchInterceptor>();
! } else {
! List<FiberContextSwitchInterceptor> l = new ArrayList<FiberContextSwitchInterceptor>();
! l.addAll(interceptors);
! interceptors = l;
}
interceptors.add(interceptor);
}
/**
* Removes a {@link FiberContextSwitchInterceptor} from this fiber.
* <p/>
*** 664,677 ****
* </ol>
*
* @return true if the specified interceptor was removed. False if
* the specified interceptor was not registered with this fiber to begin with.
*/
! public boolean removeInterceptor(@NotNull FiberContextSwitchInterceptor interceptor) {
! if (interceptors != null && interceptors.remove(interceptor)) {
! needsToReenter = true;
! return true;
}
return false;
}
/**
--- 749,769 ----
* </ol>
*
* @return true if the specified interceptor was removed. False if
* the specified interceptor was not registered with this fiber to begin with.
*/
! public synchronized boolean removeInterceptor(@NotNull FiberContextSwitchInterceptor interceptor) {
! if (interceptors != null) {
! boolean result = interceptors.remove(interceptor);
! if (interceptors.isEmpty())
! interceptors = null;
! else {
! List<FiberContextSwitchInterceptor> l = new ArrayList<FiberContextSwitchInterceptor>();
! l.addAll(interceptors);
! interceptors = l;
! }
! return result;
}
return false;
}
/**
*** 695,707 ****
/**
* DO NOT CALL THIS METHOD. This is an implementation detail
* of {@link Fiber}.
*/
@Deprecated
public void run() {
assert !synchronous;
! doRun();
if (startedSync && suspendedCount == 0 &&
(next != null || contsSize > 0)) {
// We bailed out of running this fiber we started as sync, and now
// want to finish running it async
startedSync = false;
--- 787,803 ----
/**
* DO NOT CALL THIS METHOD. This is an implementation detail
* of {@link Fiber}.
*/
@Deprecated
+ @Override
public void run() {
+ Container old = ContainerResolver.getDefault().enterContainer(owner.getContainer());
+ try {
assert !synchronous;
! // doRun returns true to indicate an early exit from fiber processing
! if (!doRun()) {
if (startedSync && suspendedCount == 0 &&
(next != null || contsSize > 0)) {
// We bailed out of running this fiber we started as sync, and now
// want to finish running it async
startedSync = false;
*** 710,719 ****
--- 806,819 ----
owner.addRunnable(this);
} else {
completionCheck();
}
}
+ } finally {
+ ContainerResolver.getDefault().exitContainer(old);
+ }
+ }
/**
* Runs a given {@link Tube} (and everything thereafter) synchronously.
* <p/>
* <p/>
*** 737,749 ****
* @param tubeline The first tube of the tubeline that will act on the packet.
* @param request The request packet to be passed to <tt>startPoint.processRequest()</tt>.
* @return The response packet to the <tt>request</tt>.
* @see #start(Tube, Packet, CompletionCallback)
*/
! public synchronized
@NotNull
Packet runSync(@NotNull Tube tubeline, @NotNull Packet request) {
// save the current continuation, so that we return runSync() without executing them.
final Tube[] oldCont = conts;
final int oldContSize = contsSize;
final boolean oldSynchronous = synchronous;
final Tube oldNext = next;
--- 837,851 ----
* @param tubeline The first tube of the tubeline that will act on the packet.
* @param request The request packet to be passed to <tt>startPoint.processRequest()</tt>.
* @return The response packet to the <tt>request</tt>.
* @see #start(Tube, Packet, CompletionCallback)
*/
! public
@NotNull
Packet runSync(@NotNull Tube tubeline, @NotNull Packet request) {
+ lock.lock();
+ try {
// save the current continuation, so that we return runSync() without executing them.
final Tube[] oldCont = conts;
final int oldContSize = contsSize;
final boolean oldSynchronous = synchronous;
final Tube oldNext = next;
*** 757,775 ****
--- 859,881 ----
synchronous = true;
this.packet = request;
next = tubeline;
doRun();
if (throwable != null) {
+ if (isDeliverThrowableInPacket) {
+ packet.addSatellite(new ThrowableContainerPropertySet(throwable));
+ } else {
if (throwable instanceof RuntimeException) {
throw (RuntimeException) throwable;
}
if (throwable instanceof Error) {
throw (Error) throwable;
}
// our system is supposed to only accept Error or RuntimeException
throw new AssertionError(throwable);
}
+ }
return this.packet;
} finally {
conts = oldCont;
contsSize = oldContSize;
synchronous = oldSynchronous;
*** 779,967 ****
interrupted = false;
}
if(!started && !startedSync)
completionCheck();
}
}
! private synchronized void completionCheck() {
// Don't trigger completion and callbacks if fiber is suspended
if(!isCanceled && contsSize==0 && suspendedCount == 0) {
if(isTraceEnabled())
! LOGGER.fine(getName()+" completed");
! completed = true;
clearListeners();
! notifyAll();
if (completionCallback != null) {
! if (throwable != null)
completionCallback.onCompletion(throwable);
! else
completionCallback.onCompletion(packet);
}
}
}
-
- ///**
- // * Blocks until the fiber completes.
- // */
- //public synchronized void join() throws InterruptedException {
- // while(!completed)
- // wait();
- //}
/**
* Invokes all registered {@link InterceptorHandler}s and then call into
* {@link Fiber#__doRun()}.
*/
private class InterceptorHandler implements FiberContextSwitchInterceptor.Work<Tube, Tube> {
/**
* Index in {@link Fiber#interceptors} to invoke next.
*/
private int idx;
/**
* Initiate the interception, and eventually invokes {@link Fiber#__doRun()}.
*/
Tube invoke(Tube next) {
idx = 0;
return execute(next);
}
public Tube execute(Tube next) {
! if (idx == interceptors.size()) {
Fiber.this.next = next;
! __doRun();
} else {
! FiberContextSwitchInterceptor interceptor = interceptors.get(idx++);
return interceptor.execute(Fiber.this, next, this);
}
return Fiber.this.next;
}
}
/**
* Executes the fiber as much as possible.
*
*/
! @SuppressWarnings({"LoopStatementThatDoesntLoop"}) // IntelliJ reports this bogus error
! private void doRun() {
!
dumpFiberContext("running");
if (serializeExecution) {
serializedExecutionLock.lock();
try {
! _doRun(next);
} finally {
serializedExecutionLock.unlock();
}
} else {
! _doRun(next);
}
}
! private String currentThreadMonitor = "CurrentThreadMonitor";
!
! private void _doRun(Tube next) {
! Thread thread;
! synchronized(currentThreadMonitor) {
! if (currentThread != null && !synchronous) {
! if (LOGGER.isLoggable(Level.FINE)) {
! LOGGER.fine("Attempt to run Fiber ['" + this + "'] in more than one thread. Current Thread: " + currentThread + " Attempted Thread: " + Thread.currentThread());
! }
! while (currentThread != null) {
try {
! currentThreadMonitor.wait();
! } catch (Exception e) {
! // ignore
! }
! }
! }
currentThread = Thread.currentThread();
! thread = currentThread;
! if (LOGGER.isLoggable(Level.FINE)) {
! LOGGER.fine("Thread entering _doRun(): " + thread);
}
}
- ClassLoader old = thread.getContextClassLoader();
- thread.setContextClassLoader(contextClassLoader);
try {
do {
- needsToReenter = false;
-
// if interceptors are set, go through the interceptors.
! if(interceptorHandler ==null) {
this.next = next;
! __doRun();
}
- else
- next = interceptorHandler.invoke(next);
- } while (needsToReenter);
} finally {
thread.setContextClassLoader(old);
! synchronized(currentThreadMonitor) {
currentThread = null;
- if (LOGGER.isLoggable(Level.FINE)) {
- LOGGER.fine("Thread leaving _doRun(): " + thread);
}
! currentThreadMonitor.notify();
}
}
}
/**
* To be invoked from {@link #doRun()}.
*
* @see #doRun()
*/
! private void __doRun() {
final Fiber old = CURRENT_FIBER.get();
CURRENT_FIBER.set(this);
// if true, lots of debug messages to show what's being executed
final boolean traceEnabled = LOGGER.isLoggable(Level.FINER);
try {
boolean abortResponse = false;
! boolean justSuspended = false;
! while(!isCanceled && !isBlocking(justSuspended) && !needsToReenter) {
try {
NextAction na;
Tube last;
if(throwable!=null) {
if(contsSize==0 || abortResponse) {
contsSize = 0; // abortResponse case
// nothing else to execute. we are done.
! return;
}
last = popCont();
if (traceEnabled)
! LOGGER.finer(getName() + ' ' + last + ".processException(" + throwable + ')');
na = last.processException(throwable);
} else {
if(next!=null) {
if(traceEnabled)
! LOGGER.finer(getName()+' '+next+".processRequest("+(packet != null ? "Packet@"+Integer.toHexString(packet.hashCode()) : "null")+')');
na = next.processRequest(packet);
last = next;
} else {
if(contsSize==0 || abortResponse) {
// nothing else to execute. we are done.
contsSize = 0;
! return;
}
last = popCont();
if(traceEnabled)
! LOGGER.finer(getName()+' '+last+".processResponse("+(packet != null ? "Packet@"+Integer.toHexString(packet.hashCode()) : "null")+')');
na = last.processResponse(packet);
}
}
if (traceEnabled)
! LOGGER.finer(getName() + ' ' + last + " returned with " + na);
// If resume is called before suspend, then make sure
// resume(Packet) is not lost
if (na.kind != NextAction.SUSPEND) {
// preserve in-flight packet so that processException may inspect
--- 885,1142 ----
interrupted = false;
}
if(!started && !startedSync)
completionCheck();
}
+ } finally {
+ lock.unlock();
+ }
}
! private void completionCheck() {
! lock.lock();
! try {
// Don't trigger completion and callbacks if fiber is suspended
if(!isCanceled && contsSize==0 && suspendedCount == 0) {
if(isTraceEnabled())
! LOGGER.log(Level.FINE, "{0} completed", getName());
clearListeners();
! condition.signalAll();
if (completionCallback != null) {
! if (throwable != null) {
! if (isDeliverThrowableInPacket) {
! packet.addSatellite(new ThrowableContainerPropertySet(throwable));
! completionCallback.onCompletion(packet);
! } else
completionCallback.onCompletion(throwable);
! } else
completionCallback.onCompletion(packet);
}
}
+ } finally {
+ lock.unlock();
+ }
}
/**
* Invokes all registered {@link InterceptorHandler}s and then call into
* {@link Fiber#__doRun()}.
*/
private class InterceptorHandler implements FiberContextSwitchInterceptor.Work<Tube, Tube> {
+ private final Holder<Boolean> isUnlockRequired;
+ private final List<FiberContextSwitchInterceptor> ints;
+
/**
* Index in {@link Fiber#interceptors} to invoke next.
*/
private int idx;
+ public InterceptorHandler(Holder<Boolean> isUnlockRequired, List<FiberContextSwitchInterceptor> ints) {
+ this.isUnlockRequired = isUnlockRequired;
+ this.ints = ints;
+ }
+
/**
* Initiate the interception, and eventually invokes {@link Fiber#__doRun()}.
*/
Tube invoke(Tube next) {
idx = 0;
return execute(next);
}
+ @Override
public Tube execute(Tube next) {
! if (idx == ints.size()) {
Fiber.this.next = next;
! if (__doRun(isUnlockRequired, ints))
! return PLACEHOLDER;
} else {
! FiberContextSwitchInterceptor interceptor = ints.get(idx++);
return interceptor.execute(Fiber.this, next, this);
}
return Fiber.this.next;
}
}
+ private static final PlaceholderTube PLACEHOLDER = new PlaceholderTube();
+
+ private static class PlaceholderTube extends AbstractTubeImpl {
+
+ @Override
+ public NextAction processRequest(Packet request) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public NextAction processResponse(Packet response) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public NextAction processException(Throwable t) {
+ return doThrow(t);
+ }
+
+ @Override
+ public void preDestroy() {
+ }
+
+ @Override
+ public PlaceholderTube copy(TubeCloner cloner) {
+ throw new UnsupportedOperationException();
+ }
+ }
+
/**
* Executes the fiber as much as possible.
*
*/
! private boolean doRun() {
dumpFiberContext("running");
if (serializeExecution) {
serializedExecutionLock.lock();
try {
! return _doRun(next);
} finally {
serializedExecutionLock.unlock();
}
} else {
! return _doRun(next);
}
}
! private boolean _doRun(Tube next) {
! // isRequireUnlock will contain Boolean.FALSE when lock has already been released in suspend
! Holder<Boolean> isRequireUnlock = new Holder<Boolean>(Boolean.TRUE);
! lock.lock();
try {
! List<FiberContextSwitchInterceptor> ints;
! ClassLoader old;
! synchronized(this) {
! ints = interceptors;
!
! // currentThread is protected by the monitor for this fiber so
! // that it is accessible to cancel() even when the lock is held
currentThread = Thread.currentThread();
! if (isTraceEnabled()) {
! LOGGER.log(Level.FINE, "Thread entering _doRun(): {0}", currentThread);
}
+
+ old = currentThread.getContextClassLoader();
+ currentThread.setContextClassLoader(contextClassLoader);
}
try {
+ boolean needsToReenter = false;
do {
// if interceptors are set, go through the interceptors.
! if (ints == null) {
this.next = next;
! if (__doRun(isRequireUnlock, ints)) {
! return true;
! }
! } else {
! next = new InterceptorHandler(isRequireUnlock, ints).invoke(next);
! if (next == PLACEHOLDER) {
! return true;
! }
}
+ synchronized(this) {
+ needsToReenter = (ints != interceptors);
+ if (needsToReenter)
+ ints = interceptors;
+ }
+ } while (needsToReenter);
+ } catch(OnExitRunnableException o) {
+ // catching this exception indicates onExitRunnable in suspend() threw.
+ // we must still avoid double unlock
+ Throwable t = o.target;
+ if (t instanceof WebServiceException)
+ throw (WebServiceException) t;
+ throw new WebServiceException(t);
} finally {
+ // don't reference currentThread here because fiber processing
+ // may already be running on a different thread (Note: isAlreadyExited
+ // tracks this state
+ Thread thread = Thread.currentThread();
thread.setContextClassLoader(old);
! if (isTraceEnabled()) {
! LOGGER.log(Level.FINE, "Thread leaving _doRun(): {0}", thread);
! }
! }
!
! return false;
! } finally {
! if (isRequireUnlock.value) {
! synchronized(this) {
currentThread = null;
}
! lock.unlock();
}
}
}
/**
* To be invoked from {@link #doRun()}.
*
* @see #doRun()
*/
! private boolean __doRun(Holder<Boolean> isRequireUnlock, List<FiberContextSwitchInterceptor> originalInterceptors) {
! assert(lock.isHeldByCurrentThread());
!
final Fiber old = CURRENT_FIBER.get();
CURRENT_FIBER.set(this);
// if true, lots of debug messages to show what's being executed
final boolean traceEnabled = LOGGER.isLoggable(Level.FINER);
try {
boolean abortResponse = false;
! while(isReady(originalInterceptors)) {
! if (isCanceled) {
! next = null;
! throwable = null;
! contsSize = 0;
! break;
! }
!
try {
NextAction na;
Tube last;
if(throwable!=null) {
if(contsSize==0 || abortResponse) {
contsSize = 0; // abortResponse case
// nothing else to execute. we are done.
! return false;
}
last = popCont();
if (traceEnabled)
! LOGGER.log(Level.FINER, "{0} {1}.processException({2})", new Object[]{getName(), last, throwable});
na = last.processException(throwable);
} else {
if(next!=null) {
if(traceEnabled)
! LOGGER.log(Level.FINER, "{0} {1}.processRequest({2})", new Object[]{getName(), next, packet != null ? "Packet@"+Integer.toHexString(packet.hashCode()) : "null"});
na = next.processRequest(packet);
last = next;
} else {
if(contsSize==0 || abortResponse) {
// nothing else to execute. we are done.
contsSize = 0;
! return false;
}
last = popCont();
if(traceEnabled)
! LOGGER.log(Level.FINER, "{0} {1}.processResponse({2})", new Object[]{getName(), last, packet != null ? "Packet@"+Integer.toHexString(packet.hashCode()) : "null"});
na = last.processResponse(packet);
}
}
if (traceEnabled)
! LOGGER.log(Level.FINER, "{0} {1} returned with {2}", new Object[]{getName(), last, na});
// If resume is called before suspend, then make sure
// resume(Packet) is not lost
if (na.kind != NextAction.SUSPEND) {
// preserve in-flight packet so that processException may inspect
*** 979,996 ****
case NextAction.INVOKE_AND_FORGET:
next = na.next;
if (na.kind == NextAction.INVOKE_ASYNC
&& startedSync) {
// Break out here
! return;
}
break;
case NextAction.THROW_ABORT_RESPONSE:
case NextAction.ABORT_RESPONSE:
abortResponse = true;
! if (LOGGER.isLoggable(Level.FINE)) {
! LOGGER.fine("Fiber " + this + " is aborting a response due to exception: " + na.throwable);
}
case NextAction.RETURN:
case NextAction.THROW:
next = null;
break;
--- 1154,1171 ----
case NextAction.INVOKE_AND_FORGET:
next = na.next;
if (na.kind == NextAction.INVOKE_ASYNC
&& startedSync) {
// Break out here
! return false;
}
break;
case NextAction.THROW_ABORT_RESPONSE:
case NextAction.ABORT_RESPONSE:
abortResponse = true;
! if (isTraceEnabled()) {
! LOGGER.log(Level.FINE, "Fiber {0} is aborting a response due to exception: {1}", new Object[]{this, na.throwable});
}
case NextAction.RETURN:
case NextAction.THROW:
next = null;
break;
*** 999,1009 ****
// Only store the 'last' tube when we're processing
// a request, since conts array is for processResponse
pushCont(last);
}
next = na.next;
! justSuspended = suspend();
break;
default:
throw new AssertionError();
}
} catch (RuntimeException t) {
--- 1174,1185 ----
// Only store the 'last' tube when we're processing
// a request, since conts array is for processResponse
pushCont(last);
}
next = na.next;
! if(suspend(isRequireUnlock, na.onExitRunnable))
! return true; // explicitly exiting control loop
break;
default:
throw new AssertionError();
}
} catch (RuntimeException t) {
*** 1017,1038 ****
}
dumpFiberContext("After tube execution");
}
- if (isCanceled) {
- next = null;
- throwable = null;
- contsSize = 0;
- }
-
// there's nothing we can execute right away.
// we'll be back when this fiber is resumed.
} finally {
CURRENT_FIBER.set(old);
}
}
private void pushCont(Tube tube) {
conts[contsSize++] = tube;
--- 1193,1210 ----
}
dumpFiberContext("After tube execution");
}
// there's nothing we can execute right away.
// we'll be back when this fiber is resumed.
} finally {
CURRENT_FIBER.set(old);
}
+
+ return false;
}
private void pushCont(Tube tube) {
conts[contsSize++] = tube;
*** 1066,1095 ****
this.conts = conts;
this.contsSize = contsSize;
}
/**
! * Returns true if the fiber needs to block its execution.
*/
! private boolean isBlocking(boolean justSuspended) {
if (synchronous) {
while (suspendedCount == 1)
try {
if (isTraceEnabled()) {
! LOGGER.fine(getName() + " is blocking thread " + Thread.currentThread().getName());
}
! wait(); // the synchronized block is the whole runSync method.
} catch (InterruptedException e) {
// remember that we are interrupted, but don't respond to it
// right away. This behavior is in line with what happens
// when you are actually running the whole thing synchronously.
interrupted = true;
}
return false;
}
- else
- return justSuspended || suspendedCount==1;
}
private String getName() {
return "engine-" + owner.id + "fiber-" + id;
}
--- 1238,1275 ----
this.conts = conts;
this.contsSize = contsSize;
}
/**
! * Returns true if the fiber is ready to execute.
*/
! private boolean isReady(List<FiberContextSwitchInterceptor> originalInterceptors) {
if (synchronous) {
while (suspendedCount == 1)
try {
if (isTraceEnabled()) {
! LOGGER.log(Level.FINE, "{0} is blocking thread {1}", new Object[]{getName(), Thread.currentThread().getName()});
}
! condition.await(); // the synchronized block is the whole runSync method.
} catch (InterruptedException e) {
// remember that we are interrupted, but don't respond to it
// right away. This behavior is in line with what happens
// when you are actually running the whole thing synchronously.
interrupted = true;
}
+
+ synchronized(this) {
+ return interceptors == originalInterceptors;
+ }
+ }
+ else {
+ if (suspendedCount>0)
return false;
+ synchronized(this) {
+ return interceptors == originalInterceptors;
+ }
}
}
private String getName() {
return "engine-" + owner.id + "fiber-" + id;
}
*** 1213,1230 ****
*/
public static volatile boolean serializeExecution = Boolean.getBoolean(Fiber.class.getName() + ".serialize");
private final Set<Component> components = new CopyOnWriteArraySet<Component>();
public <S> S getSPI(Class<S> spiType) {
! for(Component c : components) {
S spi = c.getSPI(spiType);
! if (spi != null)
return spi;
}
return null;
}
public Set<Component> getComponents() {
return components;
}
}
--- 1393,1413 ----
*/
public static volatile boolean serializeExecution = Boolean.getBoolean(Fiber.class.getName() + ".serialize");
private final Set<Component> components = new CopyOnWriteArraySet<Component>();
+ @Override
public <S> S getSPI(Class<S> spiType) {
! for (Component c : components) {
S spi = c.getSPI(spiType);
! if (spi != null) {
return spi;
}
+ }
return null;
}
+ @Override
public Set<Component> getComponents() {
return components;
}
}