001: package org.syrup.workers;
002:
003: import org.apache.xml.serializer.SerializationHandler;
004: import org.syrup.Data;
005: import org.syrup.LogEntry;
006: import org.syrup.LogEntryTemplate;
007: import org.syrup.PTask;
008: import org.syrup.PTaskTemplate;
009: import org.syrup.WorkSpace;
010: import org.syrup.helpers.DataImpl;
011: import org.syrup.helpers.LogEntryTemplateImpl;
012: import org.syrup.helpers.PTaskTemplateImpl;
013: import org.syrup.helpers.XMLOutput;
014: import org.syrup.sql.SQLWorkSpace;
015:
016: import java.io.ByteArrayOutputStream;
017: import java.io.InputStream;
018: import java.io.OutputStream;
019: import java.util.Hashtable;
020: import java.util.Iterator;
021: import java.util.logging.Level;
022: import java.util.logging.Logger;
023:
024: import javax.naming.InitialContext;
025:
026: /**
027: * The default Worker implementation that executes PTasks taken from a
028: * WorkSpace.
029: *
030: * @author Robbert van Dalen
031: */
032: public class DefaultWorker {
033: static final String COPYRIGHT = "Copyright 2005 Robbert van Dalen."
034: + "At your option, you may copy, distribute, or make derivative works under "
035: + "the terms of The Artistic License. This License may be found at "
036: + "http://www.opensource.org/licenses/artistic-license.php. "
037: + "THERE IS NO WARRANTY; USE THIS PRODUCT AT YOUR OWN RISK.";
038:
039: private final static Logger logger = Logger
040: .getLogger("org.syrup.workers.DefaultWorker");
041:
042: /**
043: * The main entry to start the Worker as a daemon, or to execute a Syrup
044: * command.
045: *
046: * @param args
047: * The command + qualifier.
048: * @param i
049: * The InputStream to read input from.
050: * @param o
051: * The OutputStream to write output to.
052: * @return The number of results (matched/stopped PTasks, matched
053: * LogEntries)
054: */
055: public static int operate(String[] args, InputStream i,
056: OutputStream o) throws Exception {
057: WorkSpace sp = null;
058:
059: try {
060: sp = (WorkSpace) (new InitialContext())
061: .lookup("syrupWorkSpace");
062: } catch (Exception e) {
063: logger
064: .log(
065: Level.INFO,
066: "Did not get syrupWorkSpace key via JNDI. Reverted to default SQLWorkSpace implementation");
067: sp = new SQLWorkSpace();
068: }
069:
070: if (args.length > 0) {
071:
072: // The first argument is the command.
073: if (args[0].equals("reset")) {
074: sp.reset();
075: } else if (args[0].equals("in1")) {
076: sp.set_in_1(read(i));
077: } else if (args[0].equals("in2")) {
078: sp.set_in_2(read(i));
079: } else if (args[0].equals("out1")) {
080: write(sp.get_out_1(), o);
081: } else if (args[0].equals("out2")) {
082: write(sp.get_out_2(), o);
083: } else if (args[0].equals("match")) {
084: PTaskTemplate template = new PTaskTemplateImpl(args);
085: return match(sp, template, o);
086: } else if (args[0].equals("get")) {
087:
088: PTaskTemplate template = new PTaskTemplateImpl(args);
089: return get(sp, template, o);
090: } else if (args[0].equals("execute")) {
091:
092: PTaskTemplate template = new PTaskTemplateImpl(args);
093: execute(sp, template, -1, 10000);
094: } else if (args[0].equals("step")) {
095:
096: PTaskTemplate template = new PTaskTemplateImpl(args);
097: execute(sp, template, 1, 0);
098: } else if (args[0].equals("stop")) {
099: PTaskTemplate template = new PTaskTemplateImpl(args);
100: return stop(sp, template);
101: } else if (args[0].equals("get-log")) {
102: // Parses the arguments into a LogEntryTemplate, used by the
103: // command.
104: LogEntryTemplate template = new LogEntryTemplateImpl(
105: args);
106: return match(sp, template, o);
107: } else {
108: throw new Exception("operation '" + args[0]
109: + "' not supported");
110: }
111: }
112:
113: return 0;
114: }
115:
116: /**
117: * Start the Worker as a daemon or to execute a Syrup command using
118: * System.in and System.out for input and output.
119: *
120: * @param args
121: * The command + qualifier.
122: */
123: public static void main(String[] args) throws Exception {
124: operate(args, System.in, System.out);
125: }
126:
127: /**
128: * Return PTasks that match a PTaskTemplate by writing them out in XML
129: * format.
130: *
131: * @param sp
132: * The WorkSpace to be used.
133: * @param template
134: * The PTaskTemplate to be matched.
135: * @param out
136: * The OutputStream to write PTasks to.
137: * @return The number of matched PTasks.
138: */
139: public static int match(WorkSpace sp, PTaskTemplate template,
140: OutputStream out) throws Exception {
141:
142: XMLOutput o = new XMLOutput();
143: SerializationHandler h = o.wrap(out);
144:
145: PTask p[] = sp.match(template);
146:
147: // Outputs the fetched PTasks to the OutputStream in XML format.
148: o.startDocument("match", h);
149:
150: for (int i = 0; i < p.length; i++) {
151: o.output(p[i], h);
152: }
153:
154: o.endDocument("match", h);
155:
156: return p.length;
157: }
158:
159: /**
160: * Return LogEntries that match a LogEntryTemplate by writing them out in
161: * XML format.
162: *
163: * @param sp
164: * The WorkSpace to be used.
165: * @param template
166: * The LogEntryTemplate to be matched.
167: * @param out
168: * The OutputStream to write LogEntries to.
169: * @return The number of matched LogEntries.
170: */
171:
172: public static int match(WorkSpace sp, LogEntryTemplate template,
173: OutputStream out) throws Exception {
174:
175: XMLOutput o = new XMLOutput();
176: SerializationHandler h = o.wrap(out);
177:
178: LogEntry l[] = sp.match(template);
179:
180: // Outputs the fetched PTasks to the OutputStream in XML format.
181: o.startDocument("log", h);
182:
183: for (int i = 0; i < l.length; i++) {
184: o.output(l[i], h);
185: }
186:
187: o.endDocument("log", h);
188:
189: return l.length;
190: }
191:
192: /**
193: */
194: private static void output(Hashtable tree, XMLOutput xmlout,
195: SerializationHandler handler) throws Exception {
196: Iterator keys = tree.keySet().iterator();
197: while (keys.hasNext()) {
198: String key = (String) keys.next();
199: if (!key.equals("_parent")) {
200: Object o = tree.get(key);
201: if (o instanceof Hashtable) {
202: Hashtable subtree = (Hashtable) o;
203: org.syrup.Context c = (org.syrup.Context) subtree
204: .get("_parent");
205: xmlout.start(c, handler);
206: output(subtree, xmlout, handler);
207: xmlout.end(c, handler);
208: } else {
209: xmlout.output((org.syrup.Context) o, handler);
210: }
211: }
212: }
213: }
214:
215: /**
216: * Get the Contexts that match a PTaskTemplate by writing them out in XML
217: * format.
218: *
219: * @param sp
220: * The WorkSpace to be used.
221: * @param template
222: * The PTaskTemplate to be matched.
223: * @param out
224: * The OutputStream to write Contexts to.
225: * @return The number of matched Contexts.
226: */
227: public static int get(WorkSpace sp, PTaskTemplate template,
228: OutputStream out) throws Exception {
229: XMLOutput o = new XMLOutput();
230: SerializationHandler h = o.wrap(out);
231:
232: org.syrup.Context c[] = sp.get(template);
233:
234: Hashtable parents = new Hashtable();
235:
236: // Make a parent Task table
237: for (int i = 0; i < c.length; i++) {
238: org.syrup.Context pc = c[i];
239: if (pc.task().isParent()) {
240: Hashtable pn = new Hashtable();
241: pn.put("_parent", pc);
242: parents.put(pc.task().key(), pn);
243: }
244: }
245:
246: // Put the child Tasks underneath the parent Tasks.
247: for (int i = 0; i < c.length; i++) {
248: org.syrup.Context ct = c[i];
249: if (!ct.task().isParent()) {
250: Hashtable pn = (Hashtable) parents.get(ct.task()
251: .parentKey());
252: if (pn != null) {
253: pn.put(ct.task().key(), ct);
254: } else {
255: parents.put(ct.task().key(), ct);
256: }
257: }
258: }
259:
260: Hashtable cl = new Hashtable(parents);
261: Iterator keys = cl.keySet().iterator();
262:
263: // Build hierarchy
264: while (keys.hasNext()) {
265: String k = (String) keys.next();
266: Object ph = (Object) cl.get(k);
267: if (ph instanceof Hashtable) {
268: Hashtable phh = (Hashtable) ph;
269: org.syrup.Context ct = (org.syrup.Context) phh
270: .get("_parent");
271:
272: Hashtable oo = (Hashtable) cl
273: .get(ct.task().parentKey());
274:
275: if (oo != null) {
276: parents.remove(k);
277: oo.put(k, phh);
278: }
279: }
280: }
281:
282: // Outputs the fetched Contexts to the OutputStream in XML format.
283: o.startDocument("get", h);
284: output(parents, o, h);
285: o.endDocument("get", h);
286:
287: return c.length;
288: }
289:
290: /**
291: * Executes PTasks that match a PTaskTemplate by writing them out in XML
292: * format. This method will continue to execute forever, waiting for new
293: * PTasks to be executed until the calling Thread or JVM is stopped.
294: *
295: * @param sp
296: * The WorkSpace to be used.
297: * @param template
298: * The PTaskTemplate to be matched.
299: */
300: public static void execute(WorkSpace sp, PTaskTemplate template,
301: long totalIterations, long pollInterval) throws Exception {
302:
303: try {
304: while (totalIterations > 0 || totalIterations < 0) {
305: PTask p[] = sp.match(template);
306:
307: int k = 1;
308: int i = 0;
309: int executionRuns = 0;
310:
311: // Go through all matching PTasks sequentially.
312: while (i < p.length) {
313: logger.log(Level.INFO, "starting " + p[i], p[i]);
314:
315: PTask p2 = null;
316: int ii = 0;
317:
318: // Retry execution (5 times) upon failure.
319: while (ii++ < 5) {
320: try {
321: p2 = sp.execute(p[i]);
322: executionRuns++;
323: break;
324: } catch (InterruptedException ie) {
325: throw ie;
326: } catch (Exception e) {
327: logger.log(Level.INFO, "execution failed "
328: + p[i], e);
329: }
330:
331: // This block is only entered when execution fails.
332: try {
333: sp
334: .stop(new PTaskTemplateImpl(
335: new String[] { "stop",
336: "-key=equal",
337: " " + p[i] }));
338: } catch (Exception e) {
339: logger.log(Level.INFO, "stopping failed "
340: + p[i], e);
341: }
342: logger.log(Level.INFO,
343: "... retrying execution " + p[i]);
344: }
345:
346: // Indicates that execution has failed 5 times
347: if (p2 == null) {
348: // Bail out to top level caller.
349: throw new Exception(
350: "execution cannot continue - exhausted all retries");
351: }
352: // Indicates that the execution was succesful.
353: if (p2 != p[i]) {
354: logger.log(Level.INFO, "executed " + p2, p2);
355: }
356: // Indicates that the execution was not succesful.
357: else {
358: logger.log(Level.INFO, "dropped " + p2, p2);
359: }
360: // Indicates that the PTask was already taken by another
361: // Worker.
362: if (p[i].modifications() == p2.modifications()) {
363: // Increase the step taken through the fetched list.
364: // This will lower the chance of hitting a PTask
365: // that has been taken by another Worker.
366: k += 1;
367: logger.log(Level.INFO, "increasing step to "
368: + k);
369: }
370: // Instead of stepping x time, make it x+1, reducing the
371: // chance of colliding with another Worker.
372: // Ideally, the selectopm of executable Tasks from the list
373: // should be random, but this alternative interleaving
374: // scheme is nearly as efficient [TODO: prove this!]
375: i += k;
376: }
377: try {
378: if (executionRuns == 0) {
379: // No more PTasks to be executed. Wait for a while.
380: logger.log(Level.INFO, "sleeping "
381: + pollInterval);
382: Thread.sleep(pollInterval);
383: }
384: } catch (InterruptedException ie) {
385: throw ie;
386: } catch (Exception e) {
387: logger.log(Level.SEVERE, Thread.currentThread()
388: .toString(), e);
389: }
390:
391: if (totalIterations > 0) {
392: totalIterations--;
393: }
394: }
395: } catch (InterruptedException ie) {
396: logger.log(Level.WARNING, "interrupted ", ie);
397: } catch (Throwable e) {
398: logger.log(Level.SEVERE, Thread.currentThread().toString(),
399: e);
400: }
401: }
402:
403: /**
404: * Stops a non-progressing PTasks matching the PTaskTemplate.
405: *
406: * @param sp
407: * The WorkSpace that is used.
408: * @param template
409: * The PTaskTemplate to be matched.
410: * @return The number of stopped PTasks.
411: */
412: private static int stop(WorkSpace sp, PTaskTemplate template)
413: throws Exception {
414: int stopped = 0;
415:
416: PTask p[] = sp.match(template);
417:
418: // Stops the matching PTasks sequentially and one by one.
419: for (int i = 0; i < p.length; i++) {
420: PTask pp = sp.stop(p[i]);
421: if (pp != p[i]) {
422: logger.log(Level.INFO, "stopped " + pp);
423: stopped++;
424: }
425: }
426:
427: return stopped;
428: }
429:
430: /**
431: * Encapsulates the data read from an InputStream with a Data object.
432: *
433: * @param i
434: * The InputStream to be encapsulated.
435: * @return The encapsulated InputStream.
436: */
437: private final static Data read(InputStream i) throws Exception {
438: byte b[] = new byte[8192];
439: ByteArrayOutputStream o = new ByteArrayOutputStream(8192);
440: int l = 0;
441:
442: while ((l = i.read(b)) >= 0) {
443: o.write(b, 0, l);
444: }
445: return new DataImpl(o.toByteArray());
446: }
447:
448: /**
449: * Writes the data to an OutputStream using a Data object.
450: *
451: * @param d
452: * The Data object to be written.
453: * @param o
454: * The OutputStream to be written to.
455: */
456: private final static void write(Data d, OutputStream o)
457: throws Exception {
458: if (d != null) {
459: byte[] b = d.bytes();
460: o.write(b, 0, b.length);
461: }
462: }
463: }
|