001: package dalma.impl;
002:
003: import dalma.Conversation;
004: import dalma.ConversationDeath;
005: import dalma.ConversationState;
006: import dalma.Fiber;
007: import dalma.FiberState;
008: import dalma.Workflow;
009: import dalma.spi.ConversationSPI;
010: import org.apache.commons.javaflow.Continuation;
011:
012: import java.io.BufferedInputStream;
013: import java.io.BufferedOutputStream;
014: import java.io.File;
015: import java.io.FileInputStream;
016: import java.io.FileOutputStream;
017: import java.io.IOException;
018: import java.io.ObjectInputStream;
019: import java.io.ObjectOutputStream;
020: import java.io.Serializable;
021: import java.io.FileNotFoundException;
022: import java.util.ArrayList;
023: import java.util.Collections;
024: import java.util.HashSet;
025: import java.util.Hashtable;
026: import java.util.List;
027: import java.util.Map;
028: import java.util.Set;
029: import java.util.UUID;
030: import java.util.Vector;
031: import java.util.Date;
032: import java.util.logging.Level;
033: import java.util.logging.Logger;
034: import java.util.logging.LogRecord;
035:
036: /**
037: * Represents a running conversation.
038: *
039: * TODO: we need a better way for a running user's conversation to expose information
040: * to the caller.
041: *
042: * <p>
043: * The monitor of this object is used to notify the completion of a conversation.
044: *
045: * <h2>Persisting Conversation</h2>
046: * <p>
047: * There are two different modes of 'persistence' for this object (and fibers.)
048: * One is called hydration/dehydration, which is when we just persist the execution state
049: * of fibers to the disk to save memory usage (and to improve fault tolerance.)
050: * <p>
051: * The other is called save/load, which is when we persist the conversation
052: * object itself, but excluding the execution state of user code, to prepare
053: * for the engine to go down.
054: *
055: *
056: * @author Kohsuke Kawaguchi
057: */
058: public final class ConversationImpl extends ConversationSPI implements
059: Serializable {
060: private transient/*final*/EngineImpl engine;
061:
062: /**
063: * All the {@link FiberImpl}s that belong to this conversation.
064: * Indexed by their id.
065: */
066: protected final List<FiberImpl> fibers = new Vector<FiberImpl>();
067:
068: /**
069: * Generates fiber id.
070: */
071: /*package*/final Counter fiberId = new Counter();
072:
073: /**
074: * The number of {@link Continuation}s that are {@link FiberState.RUNNING running} right now.
075: */
076: // when inc()==0, load state
077: // when dec()==0, persist to disk
078: transient/*final*/Counter runningCounts;
079:
080: /**
081: * The directory to save the state of this conversation.
082: */
083: private transient/*final*/File rootDir;
084:
085: private final LogRecorder logRecorder;
086:
087: /**
088: * {@link GeneratorImpl}s that belong to this conversation.
089: */
090: private Map<UUID, GeneratorImpl> generators = new Hashtable<UUID, GeneratorImpl>();
091:
092: /**
093: * Other conversations that are blocking for the completion of this conversation.
094: *
095: * Transient, because {@link ConversationCondition}s in this queue re-register themselves.
096: * Always non-null.
097: */
098: transient Set<ConversationCondition> waitList;
099:
100: /**
101: * Set to true until the first {@link FiberImpl} runs.
102: * This is necessary because the first fiber has in-memory {@link Continuation}.
103: */
104: private boolean justCreated;
105:
106: /**
107: * Set to true if the {@link #remove()} operation is in progress.
108: * When true, {@link Fiber}s are prevented from being executed.
109: */
110: /*package*/transient boolean isRemoving;
111:
112: /**
113: * Synchronization for handling multiple concurrent {@link #remove()} method invocation.
114: */
115: private transient/*final*/Object removeLock;
116:
117: /**
118: * Every conversation gets unique ID (per engine).
119: * This is used so that a serialized {@link Conversation}
120: * (as a part of the stack frame) can connect back to the running {@link Conversation} instance.
121: */
122: final int id;
123:
124: /**
125: * Represents the inner shell of this conversation.
126: * Null when this conversation is dehydrated.
127: */
128: private transient Workflow workflow;
129:
130: private String title;
131:
132: /**
133: * The timestamp when this conversation is created.
134: * @see System#currentTimeMillis()
135: */
136: private final long startDate;
137:
138: /**
139: * -1 if not completed yet.
140: * @see #getCompletionDate()
141: */
142: private long endDate = -1;
143:
144: /**
145: * This logger is connected to {@link #masterLogger}, and also to the log recorder
146: * of this conversation.
147: */
148: private transient Logger logger;
149:
150: private static final Logger masterLogger = Logger
151: .getLogger(ConversationImpl.class.getName());
152:
153: /**
154: * Creates a new conversation that starts with the given target.
155: */
156: ConversationImpl(EngineImpl engine, Workflow target)
157: throws IOException {
158: id = engine.generateUniqueId();
159: startDate = System.currentTimeMillis();
160: File rootDir = new File(engine.getConversationsDir(), String
161: .valueOf(id));
162: if (!rootDir.mkdirs())
163: throw new IOException("Unable to create " + this .rootDir);
164:
165: File logDir = new File(rootDir, "log");
166: logDir.mkdirs();
167: logRecorder = new LogRecorder(logDir);
168:
169: init(engine, rootDir);
170:
171: justCreated = true;
172: engine.conversations.put(id, this );
173: this .workflow = target;
174: workflow.setOwner(this );
175:
176: // create a persisted data store for this conversation first
177: save();
178:
179: engine.listeners.onConversationStarted(this );
180:
181: // start the first fiber in this conversation.
182: // as soon as we call 'start', conversation may end in any minute,
183: // so this has to be the last
184: FiberImpl f = new FiberImpl(this , target);
185: f.start();
186: }
187:
188: private void init(EngineImpl engine, File rootDir) {
189: this .engine = engine;
190: this .rootDir = rootDir;
191: this .waitList = Collections
192: .synchronizedSet(new HashSet<ConversationCondition>());
193: this .runningCounts = new Counter();
194: this .removeLock = new Object();
195: this .logger = Logger.getAnonymousLogger();
196: this .logger.setParent(masterLogger);
197: this .logger.addHandler(logRecorder);
198: this .logger.setLevel(Level.ALL);
199: }
200:
201: public void addGenerator(GeneratorImpl g) {
202: generators.put(g.id, g);
203: g.setConversation(this );
204: g.onLoad();
205: }
206:
207: public GeneratorImpl getGenerator(UUID id) {
208: return generators.get(id);
209: }
210:
211: public List<LogRecord> getLog() {
212: return logRecorder.getLogs();
213: }
214:
215: /**
216: * Loads a {@link ConversationImpl} object from the disk.
217: */
218: public static ConversationImpl load(EngineImpl engine, File dir)
219: throws IOException {
220: ConversationImpl conv;
221: File config = new File(dir, "conversation.xml");
222:
223: if (!config.exists()) {
224: // bogus directory?
225: Util.deleteRecursive(dir);
226: throw new FileNotFoundException(config
227: + " not found. deleting this conversation");
228: }
229:
230: try {
231: SerializationContext.set(engine,
232: SerializationContext.Mode.CONVERSATION);
233: conv = (ConversationImpl) new XmlFile(config)
234: .read(engine.classLoader);
235: } finally {
236: SerializationContext.remove();
237: }
238: conv.init(engine, dir);
239: for (GeneratorImpl g : conv.generators.values())
240: g.onLoad();
241: for (FiberImpl fiber : conv.fibers)
242: fiber.onLoad();
243: return conv;
244: }
245:
246: private synchronized void save() throws IOException {
247: try {
248: SerializationContext.set(engine,
249: SerializationContext.Mode.CONVERSATION);
250: new XmlFile(new File(rootDir, "conversation.xml"))
251: .write(this );
252: } finally {
253: SerializationContext.remove();
254: }
255: }
256:
257: public int getId() {
258: return id;
259: }
260:
261: /**
262: * Gets the current state of the conversation.
263: *
264: * @return always non-null.
265: */
266: public ConversationState getState() {
267: if (runningCounts.get() != 0)
268: return ConversationState.RUNNING;
269:
270: ConversationState r = ConversationState.ENDED;
271:
272: synchronized (fibers) {
273: for (FiberImpl f : fibers) {
274: switch (f.getState()) {
275: case RUNNABLE:
276: return ConversationState.RUNNABLE;
277: case WAITING:
278: r = ConversationState.SUSPENDED;
279: break;
280: }
281: }
282: }
283: return r;
284: }
285:
286: public EngineImpl getEngine() {
287: return engine;
288: }
289:
290: synchronized void onFiberStartedRunning(FiberImpl fiber) {
291: if (isRemoving)
292: // this conversation is going to be removed now
293: // no further execution is allowed
294: throw new FiberDeath();
295:
296: if (runningCounts.inc() > 0)
297: // another fiber is already running, and therefore
298: // all the fibers are already hydrated. just go ahead and run
299: return;
300:
301: if (justCreated) {
302: // we are about to run the first fiber, and it has in-memory continuation.
303: assert fibers.size() == 1;
304: justCreated = false;
305: return;
306: }
307:
308: File cont = new File(rootDir, "continuation");
309: try {
310: SerializationContext.set(engine,
311: SerializationContext.Mode.CONTINUATION);
312:
313: ObjectInputStream ois = new ObjectInputStreamEx(
314: new BufferedInputStream(new FileInputStream(cont)),
315: engine.classLoader);
316: List<FiberImpl.PersistedData> list;
317: try {
318: list = (List<FiberImpl.PersistedData>) ois.readObject();
319: assert workflow == null;
320: workflow = (Workflow) ois.readObject();
321: } finally {
322: ois.close();
323: }
324: cont.delete();
325:
326: if (fibers.size() != list.size())
327: throw new ConversationDeath(
328: list.size()
329: + " fibers are found in the disk but the memory says "
330: + fibers.size() + " fibers", null);
331: for (FiberImpl f : fibers) {
332: f.hydrate(list.get(f.id));
333: }
334: } catch (IOException e) {
335: runningCounts.dec();
336: throw new ConversationDeath(
337: "failed to restore the state of the conversation "
338: + cont, e);
339: } catch (ClassNotFoundException e) {
340: runningCounts.dec();
341: throw new ConversationDeath(
342: "failed to restore the state of the conversation "
343: + cont, e);
344: } finally {
345: SerializationContext.remove();
346: }
347: }
348:
349: synchronized void onFiberEndedRunning(FiberImpl fiber) {
350: if (runningCounts.dec() > 0)
351: return;
352:
353: if (getState() == ConversationState.ENDED) {
354: // no fiber is there to run. conversation is complete
355: remove();
356: return;
357: }
358:
359: // create the object that represents the persisted state
360: List<FiberImpl.PersistedData> state = new ArrayList<FiberImpl.PersistedData>(
361: fibers.size());
362:
363: for (FiberImpl f : fibers)
364: state.add(f.dehydrate());
365:
366: // persist the state
367: File cont = new File(rootDir, "continuation");
368: ObjectOutputStream oos = null;
369: try {
370: SerializationContext.set(engine,
371: SerializationContext.Mode.CONTINUATION);
372:
373: oos = new ObjectOutputStream(new BufferedOutputStream(
374: new FileOutputStream(cont)));
375: oos.writeObject(state);
376: assert workflow != null;
377: oos.writeObject(workflow);
378: workflow = null;
379: } catch (IOException e) {
380: throw new ConversationDeath(
381: "failed to persist the state of the conversation "
382: + cont, e);
383: } finally {
384: SerializationContext.remove();
385: if (oos != null) {
386: try {
387: oos.close();
388: } catch (IOException e) {
389: // ignore
390: }
391: }
392: }
393:
394: try { // this needs to be done outside the EngineImpl.SERIALIZATION_CONTEXT
395: save();
396: } catch (IOException e) {
397: throw new ConversationDeath(
398: "failed to persist the state of the conversation "
399: + cont, e);
400: }
401: }
402:
403: public void remove() {
404: // this lock is to handle multiple concurrent invocations of the remove method
405: synchronized (removeLock) {
406: // the first thing we have to do is to wait for all the executing fibers
407: // to complete. when we are doing that, we don't want new fibers to
408: // start executing. We use isRemoving==true for this purpose.
409: if (isRemoving)
410: return; // already removed.
411:
412: isRemoving = true;
413:
414: try {
415: runningCounts.waitForZero();
416: } catch (InterruptedException e) {
417: // can't process it now. later.
418: Thread.currentThread().interrupt();
419: }
420:
421: endDate = System.currentTimeMillis();
422: engine.listeners.onConversationCompleted(this );
423:
424: synchronized (engine.completionLock) {
425: Map<Integer, ConversationImpl> convs = engine.conversations;
426: synchronized (convs) {
427: ConversationImpl removed = convs.remove(id);
428: assert removed == this ;
429: if (convs.isEmpty()) {
430: engine.completionLock.notifyAll();
431: }
432: }
433: }
434:
435: try {
436: Util.deleteRecursive(rootDir);
437: } catch (IOException e) {
438: // there's really nothing we nor appliation can do to recover from this.
439: logger
440: .log(
441: Level.WARNING,
442: "Unable to delete the conversation data directory",
443: e);
444: }
445:
446: synchronized (this ) {
447: // remove this conversation from the endPoint
448: synchronized (fibers) {
449: for (FiberImpl f : fibers)
450: f.remove();
451: fibers.clear();
452: }
453:
454: synchronized (generators) {
455: for (GeneratorImpl g : generators.values()) {
456: g.dispose();
457: }
458: generators.clear();
459: }
460:
461: // notify any threads that are blocked on this conversation.
462: // the lock needs to be held before removing all fibers, as
463: // that changes the getState() value
464: notifyAll();
465:
466: // notify all conversations that are blocked on this
467: synchronized (waitList) {
468: for (ConversationCondition cd : waitList)
469: cd.activate(this );
470: waitList.clear();
471: }
472: }
473: }
474: }
475:
476: public synchronized void join() throws InterruptedException {
477: FiberImpl fiber = FiberImpl.currentFiber(false);
478: if (fiber == null) {
479: // called from outside conversations
480: if (getState() != ConversationState.ENDED) {
481: wait();
482: }
483: } else {
484: if (this == fiber.owner)
485: throw new IllegalStateException(
486: "a conversation can't wait for its own completion");
487: fiber.suspend(new ConversationCondition(this ));
488: }
489: }
490:
491: public void setTitle(String title) {
492: this .title = title;
493: }
494:
495: public String getTitle() {
496: return title;
497: }
498:
499: public Date getStartDate() {
500: return new Date(startDate);
501: }
502:
503: public Date getCompletionDate() {
504: if (endDate == -1)
505: return null;
506: else
507: return new Date(endDate);
508: }
509:
510: public Logger getLogger() {
511: return logger;
512: }
513:
514: private Object writeReplace() {
515: if (SerializationContext.get().mode == SerializationContext.Mode.CONVERSATION)
516: return this ;
517: else
518: return new ConversationMoniker(id);
519: }
520:
521: protected FiberImpl getFiber(int id) {
522: return fibers.get(id);
523: }
524:
525: private static final class ConversationMoniker implements
526: Serializable {
527: private final int id;
528:
529: public ConversationMoniker(int id) {
530: this .id = id;
531: }
532:
533: private Object readResolve() {
534: // TODO: what if the id is already removed from engine?
535: // we can fix this by allowing Conversation object itself to be persisted
536: // (and then readResolve may replace if it's still running),
537: // but how do we do about the classLoader field?
538: ConversationImpl conv = SerializationContext.get().engine
539: .getConversation(id);
540: assert conv != null;
541: return conv;
542: }
543:
544: private static final long serialVersionUID = 1L;
545: }
546:
547: /**
548: * Returns a {@link ConversationImpl} instance that the current thread is executing.
549: */
550: public static ConversationImpl currentConversation() {
551: return FiberImpl.currentFiber(true).owner;
552: }
553:
554: private static final long serialVersionUID = 1L;
555: }
|