Source Code Cross Referenced for ConversationImpl.java in  » Workflow-Engines » Dalma » dalma » impl » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Workflow Engines » Dalma » dalma.impl 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        package dalma.impl;
002:
003:        import dalma.Conversation;
004:        import dalma.ConversationDeath;
005:        import dalma.ConversationState;
006:        import dalma.Fiber;
007:        import dalma.FiberState;
008:        import dalma.Workflow;
009:        import dalma.spi.ConversationSPI;
010:        import org.apache.commons.javaflow.Continuation;
011:
012:        import java.io.BufferedInputStream;
013:        import java.io.BufferedOutputStream;
014:        import java.io.File;
015:        import java.io.FileInputStream;
016:        import java.io.FileOutputStream;
017:        import java.io.IOException;
018:        import java.io.ObjectInputStream;
019:        import java.io.ObjectOutputStream;
020:        import java.io.Serializable;
021:        import java.io.FileNotFoundException;
022:        import java.util.ArrayList;
023:        import java.util.Collections;
024:        import java.util.HashSet;
025:        import java.util.Hashtable;
026:        import java.util.List;
027:        import java.util.Map;
028:        import java.util.Set;
029:        import java.util.UUID;
030:        import java.util.Vector;
031:        import java.util.Date;
032:        import java.util.logging.Level;
033:        import java.util.logging.Logger;
034:        import java.util.logging.LogRecord;
035:
036:        /**
037:         * Represents a running conversation.
038:         *
039:         * TODO: we need a better way for a running user's conversation to expose information
040:         * to the caller.
041:         *
042:         * <p>
043:         * The monitor of this object is used to notify the completion of a conversation.
044:         *
045:         * <h2>Persisting Conversation</h2>
046:         * <p>
047:         * There are two different modes of 'persistence' for this object (and fibers.)
048:         * One is called hydration/dehydration, which is when we just persist the execution state
049:         * of fibers to the disk to save memory usage (and to improve fault tolerance.)
050:         * <p>
051:         * The other is called save/load, which is when we persist the conversation
052:         * object itself, but excluding the execution state of user code, to prepare
053:         * for the engine to go down.
054:         *
055:         *
056:         * @author Kohsuke Kawaguchi
057:         */
058:        public final class ConversationImpl extends ConversationSPI implements 
059:                Serializable {
060:            private transient/*final*/EngineImpl engine;
061:
062:            /**
063:             * All the {@link FiberImpl}s that belong to this conversation.
064:             * Indexed by their id.
065:             */
066:            protected final List<FiberImpl> fibers = new Vector<FiberImpl>();
067:
068:            /**
069:             * Generates fiber id.
070:             */
071:            /*package*/final Counter fiberId = new Counter();
072:
073:            /**
074:             * The number of {@link Continuation}s that are {@link FiberState.RUNNING running} right now.
075:             */
076:            // when inc()==0, load state
077:            // when dec()==0, persist to disk
078:            transient/*final*/Counter runningCounts;
079:
080:            /**
081:             * The directory to save the state of this conversation.
082:             */
083:            private transient/*final*/File rootDir;
084:
085:            private final LogRecorder logRecorder;
086:
087:            /**
088:             * {@link GeneratorImpl}s that belong to this conversation.
089:             */
090:            private Map<UUID, GeneratorImpl> generators = new Hashtable<UUID, GeneratorImpl>();
091:
092:            /**
093:             * Other conversations that are blocking for the completion of this conversation.
094:             *
095:             * Transient, because {@link ConversationCondition}s in this queue re-register themselves.
096:             * Always non-null.
097:             */
098:            transient Set<ConversationCondition> waitList;
099:
100:            /**
101:             * Set to true until the first {@link FiberImpl} runs.
102:             * This is necessary because the first fiber has in-memory {@link Continuation}.
103:             */
104:            private boolean justCreated;
105:
106:            /**
107:             * Set to true if the {@link #remove()} operation is in progress.
108:             * When true, {@link Fiber}s are prevented from being executed.
109:             */
110:            /*package*/transient boolean isRemoving;
111:
112:            /**
113:             * Synchronization for handling multiple concurrent {@link #remove()} method invocation.
114:             */
115:            private transient/*final*/Object removeLock;
116:
117:            /**
118:             * Every conversation gets unique ID (per engine).
119:             * This is used so that a serialized {@link Conversation}
120:             * (as a part of the stack frame) can connect back to the running {@link Conversation} instance.
121:             */
122:            final int id;
123:
124:            /**
125:             * Represents the inner shell of this conversation.
126:             * Null when this conversation is dehydrated.
127:             */
128:            private transient Workflow workflow;
129:
130:            private String title;
131:
132:            /**
133:             * The timestamp when this conversation is created.
134:             * @see System#currentTimeMillis()
135:             */
136:            private final long startDate;
137:
138:            /**
139:             * -1 if not completed yet.
140:             * @see #getCompletionDate()
141:             */
142:            private long endDate = -1;
143:
144:            /**
145:             * This logger is connected to {@link #masterLogger}, and also to the log recorder
146:             * of this conversation.
147:             */
148:            private transient Logger logger;
149:
150:            private static final Logger masterLogger = Logger
151:                    .getLogger(ConversationImpl.class.getName());
152:
153:            /**
154:             * Creates a new conversation that starts with the given target.
155:             */
156:            ConversationImpl(EngineImpl engine, Workflow target)
157:                    throws IOException {
158:                id = engine.generateUniqueId();
159:                startDate = System.currentTimeMillis();
160:                File rootDir = new File(engine.getConversationsDir(), String
161:                        .valueOf(id));
162:                if (!rootDir.mkdirs())
163:                    throw new IOException("Unable to create " + this .rootDir);
164:
165:                File logDir = new File(rootDir, "log");
166:                logDir.mkdirs();
167:                logRecorder = new LogRecorder(logDir);
168:
169:                init(engine, rootDir);
170:
171:                justCreated = true;
172:                engine.conversations.put(id, this );
173:                this .workflow = target;
174:                workflow.setOwner(this );
175:
176:                // create a persisted data store for this conversation first
177:                save();
178:
179:                engine.listeners.onConversationStarted(this );
180:
181:                // start the first fiber in this conversation.
182:                // as soon as we call 'start', conversation may end in any minute,
183:                // so this has to be the last
184:                FiberImpl f = new FiberImpl(this , target);
185:                f.start();
186:            }
187:
188:            private void init(EngineImpl engine, File rootDir) {
189:                this .engine = engine;
190:                this .rootDir = rootDir;
191:                this .waitList = Collections
192:                        .synchronizedSet(new HashSet<ConversationCondition>());
193:                this .runningCounts = new Counter();
194:                this .removeLock = new Object();
195:                this .logger = Logger.getAnonymousLogger();
196:                this .logger.setParent(masterLogger);
197:                this .logger.addHandler(logRecorder);
198:                this .logger.setLevel(Level.ALL);
199:            }
200:
201:            public void addGenerator(GeneratorImpl g) {
202:                generators.put(g.id, g);
203:                g.setConversation(this );
204:                g.onLoad();
205:            }
206:
207:            public GeneratorImpl getGenerator(UUID id) {
208:                return generators.get(id);
209:            }
210:
211:            public List<LogRecord> getLog() {
212:                return logRecorder.getLogs();
213:            }
214:
215:            /**
216:             * Loads a {@link ConversationImpl} object from the disk.
217:             */
218:            public static ConversationImpl load(EngineImpl engine, File dir)
219:                    throws IOException {
220:                ConversationImpl conv;
221:                File config = new File(dir, "conversation.xml");
222:
223:                if (!config.exists()) {
224:                    // bogus directory?
225:                    Util.deleteRecursive(dir);
226:                    throw new FileNotFoundException(config
227:                            + " not found. deleting this conversation");
228:                }
229:
230:                try {
231:                    SerializationContext.set(engine,
232:                            SerializationContext.Mode.CONVERSATION);
233:                    conv = (ConversationImpl) new XmlFile(config)
234:                            .read(engine.classLoader);
235:                } finally {
236:                    SerializationContext.remove();
237:                }
238:                conv.init(engine, dir);
239:                for (GeneratorImpl g : conv.generators.values())
240:                    g.onLoad();
241:                for (FiberImpl fiber : conv.fibers)
242:                    fiber.onLoad();
243:                return conv;
244:            }
245:
246:            private synchronized void save() throws IOException {
247:                try {
248:                    SerializationContext.set(engine,
249:                            SerializationContext.Mode.CONVERSATION);
250:                    new XmlFile(new File(rootDir, "conversation.xml"))
251:                            .write(this );
252:                } finally {
253:                    SerializationContext.remove();
254:                }
255:            }
256:
257:            public int getId() {
258:                return id;
259:            }
260:
261:            /**
262:             * Gets the current state of the conversation.
263:             *
264:             * @return always non-null.
265:             */
266:            public ConversationState getState() {
267:                if (runningCounts.get() != 0)
268:                    return ConversationState.RUNNING;
269:
270:                ConversationState r = ConversationState.ENDED;
271:
272:                synchronized (fibers) {
273:                    for (FiberImpl f : fibers) {
274:                        switch (f.getState()) {
275:                        case RUNNABLE:
276:                            return ConversationState.RUNNABLE;
277:                        case WAITING:
278:                            r = ConversationState.SUSPENDED;
279:                            break;
280:                        }
281:                    }
282:                }
283:                return r;
284:            }
285:
286:            public EngineImpl getEngine() {
287:                return engine;
288:            }
289:
290:            synchronized void onFiberStartedRunning(FiberImpl fiber) {
291:                if (isRemoving)
292:                    // this conversation is going to be removed now
293:                    // no further execution is allowed
294:                    throw new FiberDeath();
295:
296:                if (runningCounts.inc() > 0)
297:                    // another fiber is already running, and therefore
298:                    // all the fibers are already hydrated. just go ahead and run
299:                    return;
300:
301:                if (justCreated) {
302:                    // we are about to run the first fiber, and it has in-memory continuation.
303:                    assert fibers.size() == 1;
304:                    justCreated = false;
305:                    return;
306:                }
307:
308:                File cont = new File(rootDir, "continuation");
309:                try {
310:                    SerializationContext.set(engine,
311:                            SerializationContext.Mode.CONTINUATION);
312:
313:                    ObjectInputStream ois = new ObjectInputStreamEx(
314:                            new BufferedInputStream(new FileInputStream(cont)),
315:                            engine.classLoader);
316:                    List<FiberImpl.PersistedData> list;
317:                    try {
318:                        list = (List<FiberImpl.PersistedData>) ois.readObject();
319:                        assert workflow == null;
320:                        workflow = (Workflow) ois.readObject();
321:                    } finally {
322:                        ois.close();
323:                    }
324:                    cont.delete();
325:
326:                    if (fibers.size() != list.size())
327:                        throw new ConversationDeath(
328:                                list.size()
329:                                        + " fibers are found in the disk but the memory says "
330:                                        + fibers.size() + " fibers", null);
331:                    for (FiberImpl f : fibers) {
332:                        f.hydrate(list.get(f.id));
333:                    }
334:                } catch (IOException e) {
335:                    runningCounts.dec();
336:                    throw new ConversationDeath(
337:                            "failed to restore the state of the conversation "
338:                                    + cont, e);
339:                } catch (ClassNotFoundException e) {
340:                    runningCounts.dec();
341:                    throw new ConversationDeath(
342:                            "failed to restore the state of the conversation "
343:                                    + cont, e);
344:                } finally {
345:                    SerializationContext.remove();
346:                }
347:            }
348:
349:            synchronized void onFiberEndedRunning(FiberImpl fiber) {
350:                if (runningCounts.dec() > 0)
351:                    return;
352:
353:                if (getState() == ConversationState.ENDED) {
354:                    // no fiber is there to run. conversation is complete
355:                    remove();
356:                    return;
357:                }
358:
359:                // create the object that represents the persisted state
360:                List<FiberImpl.PersistedData> state = new ArrayList<FiberImpl.PersistedData>(
361:                        fibers.size());
362:
363:                for (FiberImpl f : fibers)
364:                    state.add(f.dehydrate());
365:
366:                // persist the state
367:                File cont = new File(rootDir, "continuation");
368:                ObjectOutputStream oos = null;
369:                try {
370:                    SerializationContext.set(engine,
371:                            SerializationContext.Mode.CONTINUATION);
372:
373:                    oos = new ObjectOutputStream(new BufferedOutputStream(
374:                            new FileOutputStream(cont)));
375:                    oos.writeObject(state);
376:                    assert workflow != null;
377:                    oos.writeObject(workflow);
378:                    workflow = null;
379:                } catch (IOException e) {
380:                    throw new ConversationDeath(
381:                            "failed to persist the state of the conversation "
382:                                    + cont, e);
383:                } finally {
384:                    SerializationContext.remove();
385:                    if (oos != null) {
386:                        try {
387:                            oos.close();
388:                        } catch (IOException e) {
389:                            // ignore
390:                        }
391:                    }
392:                }
393:
394:                try { // this needs to be done outside the EngineImpl.SERIALIZATION_CONTEXT
395:                    save();
396:                } catch (IOException e) {
397:                    throw new ConversationDeath(
398:                            "failed to persist the state of the conversation "
399:                                    + cont, e);
400:                }
401:            }
402:
403:            public void remove() {
404:                // this lock is to handle multiple concurrent invocations of the remove method
405:                synchronized (removeLock) {
406:                    // the first thing we have to do is to wait for all the executing fibers
407:                    // to complete. when we are doing that, we don't want new fibers to
408:                    // start executing. We use isRemoving==true for this purpose.
409:                    if (isRemoving)
410:                        return; // already removed.
411:
412:                    isRemoving = true;
413:
414:                    try {
415:                        runningCounts.waitForZero();
416:                    } catch (InterruptedException e) {
417:                        // can't process it now. later.
418:                        Thread.currentThread().interrupt();
419:                    }
420:
421:                    endDate = System.currentTimeMillis();
422:                    engine.listeners.onConversationCompleted(this );
423:
424:                    synchronized (engine.completionLock) {
425:                        Map<Integer, ConversationImpl> convs = engine.conversations;
426:                        synchronized (convs) {
427:                            ConversationImpl removed = convs.remove(id);
428:                            assert removed == this ;
429:                            if (convs.isEmpty()) {
430:                                engine.completionLock.notifyAll();
431:                            }
432:                        }
433:                    }
434:
435:                    try {
436:                        Util.deleteRecursive(rootDir);
437:                    } catch (IOException e) {
438:                        // there's really nothing we nor appliation can do to recover from this.
439:                        logger
440:                                .log(
441:                                        Level.WARNING,
442:                                        "Unable to delete the conversation data directory",
443:                                        e);
444:                    }
445:
446:                    synchronized (this ) {
447:                        // remove this conversation from the endPoint
448:                        synchronized (fibers) {
449:                            for (FiberImpl f : fibers)
450:                                f.remove();
451:                            fibers.clear();
452:                        }
453:
454:                        synchronized (generators) {
455:                            for (GeneratorImpl g : generators.values()) {
456:                                g.dispose();
457:                            }
458:                            generators.clear();
459:                        }
460:
461:                        // notify any threads that are blocked on this conversation.
462:                        // the lock needs to be held before removing all fibers, as
463:                        // that changes the getState() value
464:                        notifyAll();
465:
466:                        // notify all conversations that are blocked on this
467:                        synchronized (waitList) {
468:                            for (ConversationCondition cd : waitList)
469:                                cd.activate(this );
470:                            waitList.clear();
471:                        }
472:                    }
473:                }
474:            }
475:
476:            public synchronized void join() throws InterruptedException {
477:                FiberImpl fiber = FiberImpl.currentFiber(false);
478:                if (fiber == null) {
479:                    // called from outside conversations
480:                    if (getState() != ConversationState.ENDED) {
481:                        wait();
482:                    }
483:                } else {
484:                    if (this  == fiber.owner)
485:                        throw new IllegalStateException(
486:                                "a conversation can't wait for its own completion");
487:                    fiber.suspend(new ConversationCondition(this ));
488:                }
489:            }
490:
491:            public void setTitle(String title) {
492:                this .title = title;
493:            }
494:
495:            public String getTitle() {
496:                return title;
497:            }
498:
499:            public Date getStartDate() {
500:                return new Date(startDate);
501:            }
502:
503:            public Date getCompletionDate() {
504:                if (endDate == -1)
505:                    return null;
506:                else
507:                    return new Date(endDate);
508:            }
509:
510:            public Logger getLogger() {
511:                return logger;
512:            }
513:
514:            private Object writeReplace() {
515:                if (SerializationContext.get().mode == SerializationContext.Mode.CONVERSATION)
516:                    return this ;
517:                else
518:                    return new ConversationMoniker(id);
519:            }
520:
521:            protected FiberImpl getFiber(int id) {
522:                return fibers.get(id);
523:            }
524:
525:            private static final class ConversationMoniker implements 
526:                    Serializable {
527:                private final int id;
528:
529:                public ConversationMoniker(int id) {
530:                    this .id = id;
531:                }
532:
533:                private Object readResolve() {
534:                    // TODO: what if the id is already removed from engine?
535:                    // we can fix this by allowing Conversation object itself to be persisted
536:                    // (and then readResolve may replace if it's still running),
537:                    // but how do we do about the classLoader field?
538:                    ConversationImpl conv = SerializationContext.get().engine
539:                            .getConversation(id);
540:                    assert conv != null;
541:                    return conv;
542:                }
543:
544:                private static final long serialVersionUID = 1L;
545:            }
546:
547:            /**
548:             * Returns a {@link ConversationImpl} instance that the current thread is executing.
549:             */
550:            public static ConversationImpl currentConversation() {
551:                return FiberImpl.currentFiber(true).owner;
552:            }
553:
554:            private static final long serialVersionUID = 1L;
555:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.