Source Code Cross Referenced for FiberImpl.java in  » Workflow-Engines » Dalma » dalma » impl » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Workflow Engines » Dalma » dalma.impl 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        package dalma.impl;
002:
003:        import dalma.Condition;
004:        import dalma.Conversation;
005:        import dalma.Fiber;
006:        import dalma.FiberState;
007:        import dalma.spi.ConditionListener;
008:        import dalma.spi.FiberSPI;
009:        import org.apache.commons.javaflow.Continuation;
010:        import org.apache.commons.javaflow.bytecode.StackRecorder;
011:
012:        import java.io.Serializable;
013:        import java.util.Collections;
014:        import java.util.HashSet;
015:        import java.util.Set;
016:
017:        /**
018:         * Smallest execution unit inside a {@link Conversation}.
019:         *
020:         * <h3>Persistence and Fiber</h3>
021:         * <p>
022:         * Fiber can be persisted when it's {@link FiberState#CREATED}
023:         * and {@link FiberState#WAITING}.
024:         *
025:         * @author Kohsuke Kawaguchi
026:         */
027:        public final class FiberImpl<T extends Runnable> extends FiberSPI<T>
028:                implements  Serializable, ConditionListener {
029:
030:            /**
031:             * Uniquely identifies {@link FiberImpl} among other fibers that belong to the same owner.
032:             * Necessary for serialization of the continuation to work correctly.
033:             */
034:            final int id;
035:
036:            /**
037:             * {@link Conversation} to which this {@link FiberImpl} belongs to.
038:             */
039:            final ConversationImpl owner;
040:
041:            /**
042:             * Non-null if this {@link FiberImpl}'s execution is blocked on a specific condition
043:             * (in which case {@link Condition} is not active), or if the {@link Condition}
044:             * is active but {@link FiberImpl} is waiting for a scheduling.
045:             */
046:            private Condition cond;
047:
048:            static class PersistedData<T extends Runnable> implements 
049:                    Serializable {
050:                private Continuation continuation;
051:                public final T runnable;
052:
053:                public PersistedData(T runnable) {
054:                    this .runnable = runnable;
055:                    this .continuation = Continuation
056:                            .startSuspendedWith(runnable);
057:                }
058:
059:                public void execute() {
060:                    continuation = Continuation.continueWith(continuation);
061:                }
062:
063:                public boolean isCompleted() {
064:                    return continuation == null;
065:                }
066:
067:                private static final long serialVersionUID = 1L;
068:            }
069:
070:            /**
071:             * The {@link PersistedData} that includes continuation to be executed.
072:             */
073:            private transient PersistedData<T> execution;
074:
075:            /**
076:             * The current state of the {@link FiberImpl}.
077:             */
078:            private FiberState state;
079:
080:            /**
081:             * Other fibers that are blocking for the completion of this fiber.
082:             *
083:             * Transient, because {@link FiberCompletionCondition}s in this queue re-register themselves.
084:             * Always non-null.
085:             */
086:            private transient Set<FiberCompletionCondition> waitList;
087:
088:            /*package*/FiberImpl(ConversationImpl owner, T init) {
089:                this .owner = owner;
090:                this .id = owner.fiberId.inc();
091:                this .execution = new PersistedData<T>(init);
092:                state = FiberState.CREATED;
093:                assert owner.fibers.size() == id;
094:                owner.fibers.add(this );
095:            }
096:
097:            public T getRunnable() {
098:                FiberImpl<?> f = currentFiber(false);
099:                if (f == null)
100:                    throw new IllegalStateException(
101:                            "Cannot be invoked from outside a conversation");
102:                if (f.owner != owner)
103:                    throw new IllegalStateException(
104:                            "Cannot be invoked from a fiber that belongs to another conversation");
105:
106:                assert execution != null;
107:
108:                return execution.runnable;
109:            }
110:
111:            public void start() {
112:                if (state != FiberState.CREATED)
113:                    throw new IllegalStateException("fiber is already started");
114:                queue();
115:            }
116:
117:            public synchronized void join() throws InterruptedException {
118:                FiberImpl<?> fiber = FiberImpl.currentFiber(false);
119:
120:                if (!StackRecorder.get().isRestoring) {
121:                    if (getState() == FiberState.ENDED)
122:                        return;
123:
124:                    if (fiber == null) {
125:                        // called from outside conversations
126:                        wait();
127:                        return;
128:                    }
129:
130:                    if (fiber == this )
131:                        throw new IllegalStateException(
132:                                "a fiber can't wait for its own completion");
133:                }
134:
135:                fiber.suspend(new FiberCompletionCondition(this ));
136:            }
137:
138:            public FiberState getState() {
139:                return state;
140:            }
141:
142:            public ConversationImpl getOwner() {
143:                return owner;
144:            }
145:
146:            private void queue() {
147:                state = FiberState.RUNNABLE;
148:                owner.getEngine().queue(this );
149:            }
150:
151:            // called by the continuation thread
152:            public synchronized <T> T suspend(Condition<T> c) {
153:                if (!StackRecorder.get().isRestoring) {
154:                    if (c == null)
155:                        throw new IllegalArgumentException(
156:                                "dock cannot be null");
157:                    assert cond == null;
158:                    cond = c;
159:
160:                    assert state == FiberState.RUNNING;
161:                }
162:
163:                Continuation.suspend();
164:                if (StackRecorder.get().isCapturing) {
165:                    StackRecorder.get().pushReference(this );
166:                    return null;
167:                }
168:
169:                assert cond != null;
170:                // assert c==cond;  this isn't correct, because cond is persisted as a part of conversation.xml
171:                // while c is persisted in the continuation. they are different objects
172:                T r = (T) cond.getReturnValue();
173:                cond = null;
174:
175:                assert state == FiberState.RUNNING;
176:
177:                return r;
178:            }
179:
180:            /**
181:             * Called from the executor thread to run this fiber until
182:             * it suspends or completes.
183:             *
184:             * This method is synchronized to prevent a still-running conversation
185:             * from being run again concurrently, which happens when:
186:             *
187:             * 1. a dock parks
188:             * 2. a signal arrives and conversation resumes
189:             * 3. the conversation gets queued and picked up
190:             * 4. the conversation gets run
191:             */
192:            public synchronized void run() {
193:                FiberImpl old = currentFiber.get();
194:                currentFiber.set(this );
195:                try {
196:                    run0();
197:                } finally {
198:                    if (old == null)
199:                        currentFiber.remove();
200:                    else
201:                        currentFiber.set(old);
202:                }
203:            }
204:
205:            private void run0() {
206:                owner.onFiberStartedRunning(this );
207:                try {
208:                    run1();
209:                } finally {
210:                    owner.onFiberEndedRunning(this );
211:                }
212:            }
213:
214:            private void run1() {
215:                assert state == FiberState.RUNNABLE;
216:                state = FiberState.RUNNING;
217:
218:                // this runs the conversation until it blocks
219:                try {
220:                    execution.execute();
221:                } catch (Error e) {
222:                    die(e);
223:                } catch (RuntimeException e) {
224:                    die(e);
225:                }
226:
227:                assert state == FiberState.RUNNING;
228:
229:                if (execution.isCompleted()) {
230:                    synchronized (this ) {
231:                        // conversation has finished execution.
232:                        state = FiberState.ENDED;
233:
234:                        // notify any threads that are blocked on this conversation.
235:                        notifyAll();
236:
237:                        // notify all conversations that are blocked on this
238:                        if (waitList != null) {
239:                            synchronized (waitList) {
240:                                for (FiberCompletionCondition cd : waitList)
241:                                    cd.activate(this );
242:                                waitList.clear();
243:                            }
244:                        }
245:                    }
246:
247:                    assert cond == null;
248:
249:                } else {
250:                    // conversation has suspended
251:                    state = FiberState.WAITING;
252:                    assert cond != null;
253:
254:                    // let the condition know that we are parked
255:                    cond.park(this );
256:                }
257:            }
258:
259:            /**
260:             * Called when a fiber dies unexpectedly in the user code.
261:             */
262:            private void die(Throwable t) {
263:                // this method is supposed to handle an error in the user code,
264:                // not an unexpected termination inside the engine
265:                assert state == FiberState.RUNNING;
266:                state = FiberState.ENDED;
267:
268:                // clean up if we own a condition
269:                remove();
270:                owner.getEngine().addToErrorQueue(t);
271:                throw new FiberDeath();
272:            }
273:
274:            protected synchronized Set<FiberCompletionCondition> getWaitList() {
275:                if (waitList == null)
276:                    waitList = Collections
277:                            .synchronizedSet(new HashSet<FiberCompletionCondition>());
278:                return waitList;
279:            }
280:
281:            /**
282:             * Called by {@link ConversationImpl} to clean up this fiber
283:             * (as a part of removing the whole conversation.)
284:             */
285:            /*package*/synchronized void remove() {
286:                if (cond != null) {
287:                    cond.interrupt();
288:                    cond = null;
289:                }
290:                state = FiberState.ENDED;
291:            }
292:
293:            /**
294:             * Called by the endpoint threads when {@link #cond} becomes active.
295:             */
296:            public synchronized void onActivated(Condition cond) {
297:                assert this .cond == cond;
298:                assert state == FiberState.WAITING;
299:                state = FiberState.RUNNABLE;
300:                queue();
301:            }
302:
303:            // TODO: think about synchronization between hydration and activation
304:            /**
305:             * Called when the state of the {@link FiberImpl} is being moved from the disk.
306:             */
307:            /*package*/void hydrate(PersistedData<T> c) {
308:                assert state != FiberState.RUNNING;
309:                assert execution == null;
310:                assert c != null;
311:                execution = c;
312:            }
313:
314:            /**
315:             * Called when the state of the {@link FiberImpl} is being moved to the disk.
316:             */
317:            /*package*/PersistedData<T> dehydrate() {
318:                assert state == FiberState.RUNNABLE
319:                        || state == FiberState.WAITING
320:                        || state == FiberState.ENDED;
321:                assert execution != null;
322:                PersistedData<T> r = execution;
323:                execution = null;
324:                return r;
325:            }
326:
327:            /**
328:             * Called after the conversation is restored from the disk.
329:             */
330:            /*package*/void onLoad() {
331:                assert execution == null;
332:                if (cond != null)
333:                    cond.onLoad();
334:                assert execution == null;
335:                assert state == FiberState.WAITING
336:                        || state == FiberState.RUNNABLE
337:                        || state == FiberState.ENDED;
338:            }
339:
340:            /**
341:             * Gets the {@link Fiber} that the current thread is executing.
342:             *
343:             * @param mustReturnNonNull
344:             *      if true and the current thread isn't executing any fiber, this method
345:             *      throws an exception.
346:             */
347:            public static FiberImpl<?> currentFiber(boolean mustReturnNonNull) {
348:                FiberImpl f = currentFiber.get();
349:                if (f == null && mustReturnNonNull)
350:                    throw new IllegalStateException(
351:                            "this thread isn't executing a conversation");
352:                return f;
353:            }
354:
355:            /**
356:             * @see Fiber#create(Runnable)
357:             */
358:            public static <T extends Runnable> FiberImpl<T> create(T entryPoint) {
359:                return new FiberImpl<T>(currentFiber(true).owner, entryPoint);
360:            }
361:
362:            private Object writeReplace() {
363:                if (SerializationContext.get().mode == SerializationContext.Mode.CONVERSATION)
364:                    return this ;
365:                else
366:                    return new FiberMoniker(owner, id);
367:            }
368:
369:            private static final class FiberMoniker implements  Serializable {
370:                private final ConversationImpl conv;
371:                private final int id;
372:
373:                public FiberMoniker(ConversationImpl conv, int id) {
374:                    this .conv = conv;
375:                    this .id = id;
376:                }
377:
378:                private Object readResolve() {
379:                    FiberImpl fiber = conv.getFiber(id);
380:                    assert fiber != null;
381:                    return fiber;
382:                }
383:
384:                private static final long serialVersionUID = 1L;
385:            }
386:
387:            private static final long serialVersionUID = 1L;
388:
389:            /**
390:             * Records the currently running {@link FiberImpl} in the thread.
391:             */
392:            private static final ThreadLocal<FiberImpl> currentFiber = new ThreadLocal<FiberImpl>();
393:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.