Source Code Cross Referenced for DefaultWorker.java in  » Workflow-Engines » syrup » org » syrup » workers » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Workflow Engines » syrup » org.syrup.workers 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        package org.syrup.workers;
002:
003:        import org.apache.xml.serializer.SerializationHandler;
004:        import org.syrup.Data;
005:        import org.syrup.LogEntry;
006:        import org.syrup.LogEntryTemplate;
007:        import org.syrup.PTask;
008:        import org.syrup.PTaskTemplate;
009:        import org.syrup.WorkSpace;
010:        import org.syrup.helpers.DataImpl;
011:        import org.syrup.helpers.LogEntryTemplateImpl;
012:        import org.syrup.helpers.PTaskTemplateImpl;
013:        import org.syrup.helpers.XMLOutput;
014:        import org.syrup.sql.SQLWorkSpace;
015:
016:        import java.io.ByteArrayOutputStream;
017:        import java.io.InputStream;
018:        import java.io.OutputStream;
019:        import java.util.Hashtable;
020:        import java.util.Iterator;
021:        import java.util.logging.Level;
022:        import java.util.logging.Logger;
023:
024:        import javax.naming.InitialContext;
025:
026:        /**
027:         * The default Worker implementation that executes PTasks taken from a
028:         * WorkSpace.
029:         * 
030:         * @author Robbert van Dalen
031:         */
032:        public class DefaultWorker {
033:            static final String COPYRIGHT = "Copyright 2005 Robbert van Dalen."
034:                    + "At your option, you may copy, distribute, or make derivative works under "
035:                    + "the terms of The Artistic License. This License may be found at "
036:                    + "http://www.opensource.org/licenses/artistic-license.php. "
037:                    + "THERE IS NO WARRANTY; USE THIS PRODUCT AT YOUR OWN RISK.";
038:
039:            private final static Logger logger = Logger
040:                    .getLogger("org.syrup.workers.DefaultWorker");
041:
042:            /**
043:             * The main entry to start the Worker as a daemon, or to execute a Syrup
044:             * command.
045:             * 
046:             * @param args
047:             *            The command + qualifier.
048:             * @param i
049:             *            The InputStream to read input from.
050:             * @param o
051:             *            The OutputStream to write output to.
052:             * @return The number of results (matched/stopped PTasks, matched
053:             *         LogEntries)
054:             */
055:            public static int operate(String[] args, InputStream i,
056:                    OutputStream o) throws Exception {
057:                WorkSpace sp = null;
058:
059:                try {
060:                    sp = (WorkSpace) (new InitialContext())
061:                            .lookup("syrupWorkSpace");
062:                } catch (Exception e) {
063:                    logger
064:                            .log(
065:                                    Level.INFO,
066:                                    "Did not get syrupWorkSpace key via JNDI. Reverted to default SQLWorkSpace implementation");
067:                    sp = new SQLWorkSpace();
068:                }
069:
070:                if (args.length > 0) {
071:
072:                    // The first argument is the command.
073:                    if (args[0].equals("reset")) {
074:                        sp.reset();
075:                    } else if (args[0].equals("in1")) {
076:                        sp.set_in_1(read(i));
077:                    } else if (args[0].equals("in2")) {
078:                        sp.set_in_2(read(i));
079:                    } else if (args[0].equals("out1")) {
080:                        write(sp.get_out_1(), o);
081:                    } else if (args[0].equals("out2")) {
082:                        write(sp.get_out_2(), o);
083:                    } else if (args[0].equals("match")) {
084:                        PTaskTemplate template = new PTaskTemplateImpl(args);
085:                        return match(sp, template, o);
086:                    } else if (args[0].equals("get")) {
087:
088:                        PTaskTemplate template = new PTaskTemplateImpl(args);
089:                        return get(sp, template, o);
090:                    } else if (args[0].equals("execute")) {
091:
092:                        PTaskTemplate template = new PTaskTemplateImpl(args);
093:                        execute(sp, template, -1, 10000);
094:                    } else if (args[0].equals("step")) {
095:
096:                        PTaskTemplate template = new PTaskTemplateImpl(args);
097:                        execute(sp, template, 1, 0);
098:                    } else if (args[0].equals("stop")) {
099:                        PTaskTemplate template = new PTaskTemplateImpl(args);
100:                        return stop(sp, template);
101:                    } else if (args[0].equals("get-log")) {
102:                        // Parses the arguments into a LogEntryTemplate, used by the
103:                        // command.
104:                        LogEntryTemplate template = new LogEntryTemplateImpl(
105:                                args);
106:                        return match(sp, template, o);
107:                    } else {
108:                        throw new Exception("operation '" + args[0]
109:                                + "' not supported");
110:                    }
111:                }
112:
113:                return 0;
114:            }
115:
116:            /**
117:             * Start the Worker as a daemon or to execute a Syrup command using
118:             * System.in and System.out for input and output.
119:             * 
120:             * @param args
121:             *            The command + qualifier.
122:             */
123:            public static void main(String[] args) throws Exception {
124:                operate(args, System.in, System.out);
125:            }
126:
127:            /**
128:             * Return PTasks that match a PTaskTemplate by writing them out in XML
129:             * format.
130:             * 
131:             * @param sp
132:             *            The WorkSpace to be used.
133:             * @param template
134:             *            The PTaskTemplate to be matched.
135:             * @param out
136:             *            The OutputStream to write PTasks to.
137:             * @return The number of matched PTasks.
138:             */
139:            public static int match(WorkSpace sp, PTaskTemplate template,
140:                    OutputStream out) throws Exception {
141:
142:                XMLOutput o = new XMLOutput();
143:                SerializationHandler h = o.wrap(out);
144:
145:                PTask p[] = sp.match(template);
146:
147:                // Outputs the fetched PTasks to the OutputStream in XML format.
148:                o.startDocument("match", h);
149:
150:                for (int i = 0; i < p.length; i++) {
151:                    o.output(p[i], h);
152:                }
153:
154:                o.endDocument("match", h);
155:
156:                return p.length;
157:            }
158:
159:            /**
160:             * Return LogEntries that match a LogEntryTemplate by writing them out in
161:             * XML format.
162:             * 
163:             * @param sp
164:             *            The WorkSpace to be used.
165:             * @param template
166:             *            The LogEntryTemplate to be matched.
167:             * @param out
168:             *            The OutputStream to write LogEntries to.
169:             * @return The number of matched LogEntries.
170:             */
171:
172:            public static int match(WorkSpace sp, LogEntryTemplate template,
173:                    OutputStream out) throws Exception {
174:
175:                XMLOutput o = new XMLOutput();
176:                SerializationHandler h = o.wrap(out);
177:
178:                LogEntry l[] = sp.match(template);
179:
180:                // Outputs the fetched PTasks to the OutputStream in XML format.
181:                o.startDocument("log", h);
182:
183:                for (int i = 0; i < l.length; i++) {
184:                    o.output(l[i], h);
185:                }
186:
187:                o.endDocument("log", h);
188:
189:                return l.length;
190:            }
191:
192:            /**
193:             */
194:            private static void output(Hashtable tree, XMLOutput xmlout,
195:                    SerializationHandler handler) throws Exception {
196:                Iterator keys = tree.keySet().iterator();
197:                while (keys.hasNext()) {
198:                    String key = (String) keys.next();
199:                    if (!key.equals("_parent")) {
200:                        Object o = tree.get(key);
201:                        if (o instanceof  Hashtable) {
202:                            Hashtable subtree = (Hashtable) o;
203:                            org.syrup.Context c = (org.syrup.Context) subtree
204:                                    .get("_parent");
205:                            xmlout.start(c, handler);
206:                            output(subtree, xmlout, handler);
207:                            xmlout.end(c, handler);
208:                        } else {
209:                            xmlout.output((org.syrup.Context) o, handler);
210:                        }
211:                    }
212:                }
213:            }
214:
215:            /**
216:             * Get the Contexts that match a PTaskTemplate by writing them out in XML
217:             * format.
218:             * 
219:             * @param sp
220:             *            The WorkSpace to be used.
221:             * @param template
222:             *            The PTaskTemplate to be matched.
223:             * @param out
224:             *            The OutputStream to write Contexts to.
225:             * @return The number of matched Contexts.
226:             */
227:            public static int get(WorkSpace sp, PTaskTemplate template,
228:                    OutputStream out) throws Exception {
229:                XMLOutput o = new XMLOutput();
230:                SerializationHandler h = o.wrap(out);
231:
232:                org.syrup.Context c[] = sp.get(template);
233:
234:                Hashtable parents = new Hashtable();
235:
236:                // Make a parent Task table
237:                for (int i = 0; i < c.length; i++) {
238:                    org.syrup.Context pc = c[i];
239:                    if (pc.task().isParent()) {
240:                        Hashtable pn = new Hashtable();
241:                        pn.put("_parent", pc);
242:                        parents.put(pc.task().key(), pn);
243:                    }
244:                }
245:
246:                // Put the child Tasks underneath the parent Tasks.
247:                for (int i = 0; i < c.length; i++) {
248:                    org.syrup.Context ct = c[i];
249:                    if (!ct.task().isParent()) {
250:                        Hashtable pn = (Hashtable) parents.get(ct.task()
251:                                .parentKey());
252:                        if (pn != null) {
253:                            pn.put(ct.task().key(), ct);
254:                        } else {
255:                            parents.put(ct.task().key(), ct);
256:                        }
257:                    }
258:                }
259:
260:                Hashtable cl = new Hashtable(parents);
261:                Iterator keys = cl.keySet().iterator();
262:
263:                // Build hierarchy
264:                while (keys.hasNext()) {
265:                    String k = (String) keys.next();
266:                    Object ph = (Object) cl.get(k);
267:                    if (ph instanceof  Hashtable) {
268:                        Hashtable phh = (Hashtable) ph;
269:                        org.syrup.Context ct = (org.syrup.Context) phh
270:                                .get("_parent");
271:
272:                        Hashtable oo = (Hashtable) cl
273:                                .get(ct.task().parentKey());
274:
275:                        if (oo != null) {
276:                            parents.remove(k);
277:                            oo.put(k, phh);
278:                        }
279:                    }
280:                }
281:
282:                // Outputs the fetched Contexts to the OutputStream in XML format.
283:                o.startDocument("get", h);
284:                output(parents, o, h);
285:                o.endDocument("get", h);
286:
287:                return c.length;
288:            }
289:
290:            /**
291:             * Executes PTasks that match a PTaskTemplate by writing them out in XML
292:             * format. This method will continue to execute forever, waiting for new
293:             * PTasks to be executed until the calling Thread or JVM is stopped.
294:             * 
295:             * @param sp
296:             *            The WorkSpace to be used.
297:             * @param template
298:             *            The PTaskTemplate to be matched.
299:             */
300:            public static void execute(WorkSpace sp, PTaskTemplate template,
301:                    long totalIterations, long pollInterval) throws Exception {
302:
303:                try {
304:                    while (totalIterations > 0 || totalIterations < 0) {
305:                        PTask p[] = sp.match(template);
306:
307:                        int k = 1;
308:                        int i = 0;
309:                        int executionRuns = 0;
310:
311:                        // Go through all matching PTasks sequentially.
312:                        while (i < p.length) {
313:                            logger.log(Level.INFO, "starting " + p[i], p[i]);
314:
315:                            PTask p2 = null;
316:                            int ii = 0;
317:
318:                            // Retry execution (5 times) upon failure.
319:                            while (ii++ < 5) {
320:                                try {
321:                                    p2 = sp.execute(p[i]);
322:                                    executionRuns++;
323:                                    break;
324:                                } catch (InterruptedException ie) {
325:                                    throw ie;
326:                                } catch (Exception e) {
327:                                    logger.log(Level.INFO, "execution failed "
328:                                            + p[i], e);
329:                                }
330:
331:                                // This block is only entered when execution fails.
332:                                try {
333:                                    sp
334:                                            .stop(new PTaskTemplateImpl(
335:                                                    new String[] { "stop",
336:                                                            "-key=equal",
337:                                                            " " + p[i] }));
338:                                } catch (Exception e) {
339:                                    logger.log(Level.INFO, "stopping failed "
340:                                            + p[i], e);
341:                                }
342:                                logger.log(Level.INFO,
343:                                        "... retrying execution " + p[i]);
344:                            }
345:
346:                            // Indicates that execution has failed 5 times
347:                            if (p2 == null) {
348:                                // Bail out to top level caller.
349:                                throw new Exception(
350:                                        "execution cannot continue - exhausted all retries");
351:                            }
352:                            // Indicates that the execution was succesful.
353:                            if (p2 != p[i]) {
354:                                logger.log(Level.INFO, "executed " + p2, p2);
355:                            }
356:                            // Indicates that the execution was not succesful.
357:                            else {
358:                                logger.log(Level.INFO, "dropped " + p2, p2);
359:                            }
360:                            // Indicates that the PTask was already taken by another
361:                            // Worker.
362:                            if (p[i].modifications() == p2.modifications()) {
363:                                // Increase the step taken through the fetched list.
364:                                // This will lower the chance of hitting a PTask
365:                                // that has been taken by another Worker.
366:                                k += 1;
367:                                logger.log(Level.INFO, "increasing step to "
368:                                        + k);
369:                            }
370:                            // Instead of stepping x time, make it x+1, reducing the
371:                            // chance of colliding with another Worker.
372:                            // Ideally, the selectopm of executable Tasks from the list
373:                            // should be random, but this alternative interleaving
374:                            // scheme is nearly as efficient [TODO: prove this!]
375:                            i += k;
376:                        }
377:                        try {
378:                            if (executionRuns == 0) {
379:                                // No more PTasks to be executed. Wait for a while.
380:                                logger.log(Level.INFO, "sleeping "
381:                                        + pollInterval);
382:                                Thread.sleep(pollInterval);
383:                            }
384:                        } catch (InterruptedException ie) {
385:                            throw ie;
386:                        } catch (Exception e) {
387:                            logger.log(Level.SEVERE, Thread.currentThread()
388:                                    .toString(), e);
389:                        }
390:
391:                        if (totalIterations > 0) {
392:                            totalIterations--;
393:                        }
394:                    }
395:                } catch (InterruptedException ie) {
396:                    logger.log(Level.WARNING, "interrupted ", ie);
397:                } catch (Throwable e) {
398:                    logger.log(Level.SEVERE, Thread.currentThread().toString(),
399:                            e);
400:                }
401:            }
402:
403:            /**
404:             * Stops a non-progressing PTasks matching the PTaskTemplate.
405:             * 
406:             * @param sp
407:             *            The WorkSpace that is used.
408:             * @param template
409:             *            The PTaskTemplate to be matched.
410:             * @return The number of stopped PTasks.
411:             */
412:            private static int stop(WorkSpace sp, PTaskTemplate template)
413:                    throws Exception {
414:                int stopped = 0;
415:
416:                PTask p[] = sp.match(template);
417:
418:                // Stops the matching PTasks sequentially and one by one.
419:                for (int i = 0; i < p.length; i++) {
420:                    PTask pp = sp.stop(p[i]);
421:                    if (pp != p[i]) {
422:                        logger.log(Level.INFO, "stopped " + pp);
423:                        stopped++;
424:                    }
425:                }
426:
427:                return stopped;
428:            }
429:
430:            /**
431:             * Encapsulates the data read from an InputStream with a Data object.
432:             * 
433:             * @param i
434:             *            The InputStream to be encapsulated.
435:             * @return The encapsulated InputStream.
436:             */
437:            private final static Data read(InputStream i) throws Exception {
438:                byte b[] = new byte[8192];
439:                ByteArrayOutputStream o = new ByteArrayOutputStream(8192);
440:                int l = 0;
441:
442:                while ((l = i.read(b)) >= 0) {
443:                    o.write(b, 0, l);
444:                }
445:                return new DataImpl(o.toByteArray());
446:            }
447:
448:            /**
449:             * Writes the data to an OutputStream using a Data object.
450:             * 
451:             * @param d
452:             *            The Data object to be written.
453:             * @param o
454:             *            The OutputStream to be written to.
455:             */
456:            private final static void write(Data d, OutputStream o)
457:                    throws Exception {
458:                if (d != null) {
459:                    byte[] b = d.bytes();
460:                    o.write(b, 0, b.length);
461:                }
462:            }
463:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.