001: package dalma.impl;
002:
003: import dalma.Condition;
004: import dalma.Conversation;
005: import dalma.Fiber;
006: import dalma.FiberState;
007: import dalma.spi.ConditionListener;
008: import dalma.spi.FiberSPI;
009: import org.apache.commons.javaflow.Continuation;
010: import org.apache.commons.javaflow.bytecode.StackRecorder;
011:
012: import java.io.Serializable;
013: import java.util.Collections;
014: import java.util.HashSet;
015: import java.util.Set;
016:
017: /**
018: * Smallest execution unit inside a {@link Conversation}.
019: *
020: * <h3>Persistence and Fiber</h3>
021: * <p>
022: * Fiber can be persisted when it's {@link FiberState#CREATED}
023: * and {@link FiberState#WAITING}.
024: *
025: * @author Kohsuke Kawaguchi
026: */
027: public final class FiberImpl<T extends Runnable> extends FiberSPI<T>
028: implements Serializable, ConditionListener {
029:
030: /**
031: * Uniquely identifies {@link FiberImpl} among other fibers that belong to the same owner.
032: * Necessary for serialization of the continuation to work correctly.
033: */
034: final int id;
035:
036: /**
037: * {@link Conversation} to which this {@link FiberImpl} belongs to.
038: */
039: final ConversationImpl owner;
040:
041: /**
042: * Non-null if this {@link FiberImpl}'s execution is blocked on a specific condition
043: * (in which case {@link Condition} is not active), or if the {@link Condition}
044: * is active but {@link FiberImpl} is waiting for a scheduling.
045: */
046: private Condition cond;
047:
048: static class PersistedData<T extends Runnable> implements
049: Serializable {
050: private Continuation continuation;
051: public final T runnable;
052:
053: public PersistedData(T runnable) {
054: this .runnable = runnable;
055: this .continuation = Continuation
056: .startSuspendedWith(runnable);
057: }
058:
059: public void execute() {
060: continuation = Continuation.continueWith(continuation);
061: }
062:
063: public boolean isCompleted() {
064: return continuation == null;
065: }
066:
067: private static final long serialVersionUID = 1L;
068: }
069:
070: /**
071: * The {@link PersistedData} that includes continuation to be executed.
072: */
073: private transient PersistedData<T> execution;
074:
075: /**
076: * The current state of the {@link FiberImpl}.
077: */
078: private FiberState state;
079:
080: /**
081: * Other fibers that are blocking for the completion of this fiber.
082: *
083: * Transient, because {@link FiberCompletionCondition}s in this queue re-register themselves.
084: * Always non-null.
085: */
086: private transient Set<FiberCompletionCondition> waitList;
087:
088: /*package*/FiberImpl(ConversationImpl owner, T init) {
089: this .owner = owner;
090: this .id = owner.fiberId.inc();
091: this .execution = new PersistedData<T>(init);
092: state = FiberState.CREATED;
093: assert owner.fibers.size() == id;
094: owner.fibers.add(this );
095: }
096:
097: public T getRunnable() {
098: FiberImpl<?> f = currentFiber(false);
099: if (f == null)
100: throw new IllegalStateException(
101: "Cannot be invoked from outside a conversation");
102: if (f.owner != owner)
103: throw new IllegalStateException(
104: "Cannot be invoked from a fiber that belongs to another conversation");
105:
106: assert execution != null;
107:
108: return execution.runnable;
109: }
110:
111: public void start() {
112: if (state != FiberState.CREATED)
113: throw new IllegalStateException("fiber is already started");
114: queue();
115: }
116:
117: public synchronized void join() throws InterruptedException {
118: FiberImpl<?> fiber = FiberImpl.currentFiber(false);
119:
120: if (!StackRecorder.get().isRestoring) {
121: if (getState() == FiberState.ENDED)
122: return;
123:
124: if (fiber == null) {
125: // called from outside conversations
126: wait();
127: return;
128: }
129:
130: if (fiber == this )
131: throw new IllegalStateException(
132: "a fiber can't wait for its own completion");
133: }
134:
135: fiber.suspend(new FiberCompletionCondition(this ));
136: }
137:
138: public FiberState getState() {
139: return state;
140: }
141:
142: public ConversationImpl getOwner() {
143: return owner;
144: }
145:
146: private void queue() {
147: state = FiberState.RUNNABLE;
148: owner.getEngine().queue(this );
149: }
150:
151: // called by the continuation thread
152: public synchronized <T> T suspend(Condition<T> c) {
153: if (!StackRecorder.get().isRestoring) {
154: if (c == null)
155: throw new IllegalArgumentException(
156: "dock cannot be null");
157: assert cond == null;
158: cond = c;
159:
160: assert state == FiberState.RUNNING;
161: }
162:
163: Continuation.suspend();
164: if (StackRecorder.get().isCapturing) {
165: StackRecorder.get().pushReference(this );
166: return null;
167: }
168:
169: assert cond != null;
170: // assert c==cond; this isn't correct, because cond is persisted as a part of conversation.xml
171: // while c is persisted in the continuation. they are different objects
172: T r = (T) cond.getReturnValue();
173: cond = null;
174:
175: assert state == FiberState.RUNNING;
176:
177: return r;
178: }
179:
180: /**
181: * Called from the executor thread to run this fiber until
182: * it suspends or completes.
183: *
184: * This method is synchronized to prevent a still-running conversation
185: * from being run again concurrently, which happens when:
186: *
187: * 1. a dock parks
188: * 2. a signal arrives and conversation resumes
189: * 3. the conversation gets queued and picked up
190: * 4. the conversation gets run
191: */
192: public synchronized void run() {
193: FiberImpl old = currentFiber.get();
194: currentFiber.set(this );
195: try {
196: run0();
197: } finally {
198: if (old == null)
199: currentFiber.remove();
200: else
201: currentFiber.set(old);
202: }
203: }
204:
205: private void run0() {
206: owner.onFiberStartedRunning(this );
207: try {
208: run1();
209: } finally {
210: owner.onFiberEndedRunning(this );
211: }
212: }
213:
214: private void run1() {
215: assert state == FiberState.RUNNABLE;
216: state = FiberState.RUNNING;
217:
218: // this runs the conversation until it blocks
219: try {
220: execution.execute();
221: } catch (Error e) {
222: die(e);
223: } catch (RuntimeException e) {
224: die(e);
225: }
226:
227: assert state == FiberState.RUNNING;
228:
229: if (execution.isCompleted()) {
230: synchronized (this ) {
231: // conversation has finished execution.
232: state = FiberState.ENDED;
233:
234: // notify any threads that are blocked on this conversation.
235: notifyAll();
236:
237: // notify all conversations that are blocked on this
238: if (waitList != null) {
239: synchronized (waitList) {
240: for (FiberCompletionCondition cd : waitList)
241: cd.activate(this );
242: waitList.clear();
243: }
244: }
245: }
246:
247: assert cond == null;
248:
249: } else {
250: // conversation has suspended
251: state = FiberState.WAITING;
252: assert cond != null;
253:
254: // let the condition know that we are parked
255: cond.park(this );
256: }
257: }
258:
259: /**
260: * Called when a fiber dies unexpectedly in the user code.
261: */
262: private void die(Throwable t) {
263: // this method is supposed to handle an error in the user code,
264: // not an unexpected termination inside the engine
265: assert state == FiberState.RUNNING;
266: state = FiberState.ENDED;
267:
268: // clean up if we own a condition
269: remove();
270: owner.getEngine().addToErrorQueue(t);
271: throw new FiberDeath();
272: }
273:
274: protected synchronized Set<FiberCompletionCondition> getWaitList() {
275: if (waitList == null)
276: waitList = Collections
277: .synchronizedSet(new HashSet<FiberCompletionCondition>());
278: return waitList;
279: }
280:
281: /**
282: * Called by {@link ConversationImpl} to clean up this fiber
283: * (as a part of removing the whole conversation.)
284: */
285: /*package*/synchronized void remove() {
286: if (cond != null) {
287: cond.interrupt();
288: cond = null;
289: }
290: state = FiberState.ENDED;
291: }
292:
293: /**
294: * Called by the endpoint threads when {@link #cond} becomes active.
295: */
296: public synchronized void onActivated(Condition cond) {
297: assert this .cond == cond;
298: assert state == FiberState.WAITING;
299: state = FiberState.RUNNABLE;
300: queue();
301: }
302:
303: // TODO: think about synchronization between hydration and activation
304: /**
305: * Called when the state of the {@link FiberImpl} is being moved from the disk.
306: */
307: /*package*/void hydrate(PersistedData<T> c) {
308: assert state != FiberState.RUNNING;
309: assert execution == null;
310: assert c != null;
311: execution = c;
312: }
313:
314: /**
315: * Called when the state of the {@link FiberImpl} is being moved to the disk.
316: */
317: /*package*/PersistedData<T> dehydrate() {
318: assert state == FiberState.RUNNABLE
319: || state == FiberState.WAITING
320: || state == FiberState.ENDED;
321: assert execution != null;
322: PersistedData<T> r = execution;
323: execution = null;
324: return r;
325: }
326:
327: /**
328: * Called after the conversation is restored from the disk.
329: */
330: /*package*/void onLoad() {
331: assert execution == null;
332: if (cond != null)
333: cond.onLoad();
334: assert execution == null;
335: assert state == FiberState.WAITING
336: || state == FiberState.RUNNABLE
337: || state == FiberState.ENDED;
338: }
339:
340: /**
341: * Gets the {@link Fiber} that the current thread is executing.
342: *
343: * @param mustReturnNonNull
344: * if true and the current thread isn't executing any fiber, this method
345: * throws an exception.
346: */
347: public static FiberImpl<?> currentFiber(boolean mustReturnNonNull) {
348: FiberImpl f = currentFiber.get();
349: if (f == null && mustReturnNonNull)
350: throw new IllegalStateException(
351: "this thread isn't executing a conversation");
352: return f;
353: }
354:
355: /**
356: * @see Fiber#create(Runnable)
357: */
358: public static <T extends Runnable> FiberImpl<T> create(T entryPoint) {
359: return new FiberImpl<T>(currentFiber(true).owner, entryPoint);
360: }
361:
362: private Object writeReplace() {
363: if (SerializationContext.get().mode == SerializationContext.Mode.CONVERSATION)
364: return this ;
365: else
366: return new FiberMoniker(owner, id);
367: }
368:
369: private static final class FiberMoniker implements Serializable {
370: private final ConversationImpl conv;
371: private final int id;
372:
373: public FiberMoniker(ConversationImpl conv, int id) {
374: this .conv = conv;
375: this .id = id;
376: }
377:
378: private Object readResolve() {
379: FiberImpl fiber = conv.getFiber(id);
380: assert fiber != null;
381: return fiber;
382: }
383:
384: private static final long serialVersionUID = 1L;
385: }
386:
387: private static final long serialVersionUID = 1L;
388:
389: /**
390: * Records the currently running {@link FiberImpl} in the thread.
391: */
392: private static final ThreadLocal<FiberImpl> currentFiber = new ThreadLocal<FiberImpl>();
393: }
|