001: /*
002: * ====================================================================
003: *
004: * XFLOW - Process Management System
005: * Copyright (C) 2003 Rob Tan
006: * All rights reserved.
007: *
008: * Redistribution and use in source and binary forms, with or without
009: * modification, are permitted provided that the following conditions
010: * are met:
011: *
012: * 1. Redistributions of source code must retain the above copyright
013: * notice, this list of conditions, and the following disclaimer.
014: *
015: * 2. Redistributions in binary form must reproduce the above copyright
016: * notice, this list of conditions, and the disclaimer that follows
017: * these conditions in the documentation and/or other materials
018: * provided with the distribution.
019: *
020: * 3. The name "XFlow" must not be used to endorse or promote products
021: * derived from this software without prior written permission. For
022: * written permission, please contact rcktan@yahoo.com
023: *
024: * 4. Products derived from this software may not be called "XFlow", nor
025: * may "XFlow" appear in their name, without prior written permission
026: * from the XFlow Project Management (rcktan@yahoo.com)
027: *
028: * In addition, we request (but do not require) that you include in the
029: * end-user documentation provided with the redistribution and/or in the
030: * software itself an acknowledgement equivalent to the following:
031: * "This product includes software developed by the
032: * XFlow Project (http://xflow.sourceforge.net/)."
033: * Alternatively, the acknowledgment may be graphical using the logos
034: * available at http://xflow.sourceforge.net/
035: *
036: * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
037: * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
038: * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
039: * DISCLAIMED. IN NO EVENT SHALL THE XFLOW AUTHORS OR THE PROJECT
040: * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
041: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
042: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
043: * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
044: * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
045: * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)ARISING IN ANY WAY OUT
046: * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
047: * SUCH DAMAGE.
048: *
049: * ====================================================================
050: * This software consists of voluntary contributions made by many
051: * individuals on behalf of the XFlow Project and was originally
052: * created by Rob Tan (rcktan@yahoo.com)
053: * For more information on the XFlow Project, please see:
054: * <http://xflow.sourceforge.net/>.
055: * ====================================================================
056: */
057: package xflow.events;
058:
059: import java.io.*;
060: import java.sql.ResultSet;
061: import java.sql.Statement;
062: import java.util.Properties;
063:
064: import org.w3c.dom.*;
065: import org.xml.sax.InputSource;
066:
067: import javax.jms.*;
068: import javax.naming.*;
069: import javax.sql.*;
070: import javax.xml.parsers.*;
071:
072: import xflow.common.XflowException;
073: import xflow.messaging.JMSSubscriber;
074: import xflow.messaging.JMSTopicConnection;
075: import xflow.util.Db;
076: import xflow.common.XflowConstants;
077:
078: /**
079: * @author xzma
080: *
081: * The event handler will receive the event messages asynchronously
082: * using a JMS topicsubscription and save the event infomation into
083: * database.
084: */
085: public class EventsHandler implements MessageListener {
086:
087: private JMSSubscriber subscriber;
088:
089: /**
090: * Constructor. Properties file is used for getting DB info when EventsHandler is
091: * running standalone.
092: */
093: public EventsHandler(Properties props) {
094: try {
095: JMSTopicConnection.initialize();
096: EventsPersistence.init(props);
097: subscriber = new JMSSubscriber(this ,
098: XflowConstants.XFLOW_EVENT_TOPIC, null);
099: } catch (XflowException e) {
100: e.printStackTrace();
101: System.out.println("Can't set up JMS Subscription");
102: } catch (JMSException e) {
103: e.printStackTrace();
104: }
105: }
106:
107: /**
108: * the method defined from MessageListener interdace, it monitors
109: * possible incoming message events
110: */
111: public void onMessage(Message evt) {
112: String evtXML = null;
113: System.out.println("Got a message...");
114: try {
115: if (evt instanceof TextMessage) {
116: evtXML = ((TextMessage) evt).getText();
117: System.out.println(evtXML);
118: } else {
119: System.out.println("Message not recognized.");
120: return;
121: }
122: } catch (JMSException e) {
123: e.printStackTrace();
124: System.out
125: .println("Cannot get text message from Received message");
126: }
127:
128: try {
129: //namespace of SOAP-ENV
130: String docNS = "http://schemas.xmlsoap.org/soap/envelope/";
131:
132: DocumentBuilderFactory factory = DocumentBuilderFactory
133: .newInstance();
134: factory.setNamespaceAware(true);
135: DocumentBuilder builder = factory.newDocumentBuilder();
136:
137: StringReader sreader = new StringReader(evtXML);
138: InputSource is = new InputSource(sreader);
139: Document doc = builder.parse(is);
140:
141: NodeList els = doc.getElementsByTagNameNS(docNS, "Body");
142: Element body = (Element) els.item(0); //assumming certain format
143:
144: els = body.getChildNodes();
145: Element event;
146: els = body.getElementsByTagName("WorkflowSuspendedEvent");
147: if (els != null && els.item(0) != null) {
148: event = (Element) els.item(0);
149: insertSupendedEvent(event); //insert into DB
150: } else if ((els = body
151: .getElementsByTagName("ProcessTimedOutEvent")) != null
152: && els.item(0) != null) {
153: event = (Element) els.item(0);
154: insertTimeOutEvent(event);
155: } else if ((els = body
156: .getElementsByTagName("ModelDeployedEvent")) != null
157: && els.item(0) != null) {
158: event = (Element) els.item(0);
159: insertDeployedEvent(event);
160: } else if ((els = body
161: .getElementsByTagName("VariableUpdatedEvent")) != null
162: && els.item(0) != null) {
163: event = (Element) els.item(0);
164: insertUpdatedEvent(event);
165: } else if ((els = body
166: .getElementsByTagName("WorkflowAbortedEvent")) != null
167: && els.item(0) != null) {
168: event = (Element) els.item(0);
169: insertAbortedEvent(event);
170: } else if ((els = body
171: .getElementsByTagName("WorkflowResumedEvent")) != null
172: && els.item(0) != null) {
173: event = (Element) els.item(0);
174: insertResumedEvent(event);
175: } else if ((els = body
176: .getElementsByTagName("WorkflowCompletedEvent")) != null
177: && els.item(0) != null) {
178: event = (Element) els.item(0);
179: insertCompletedEvent(event);
180: } else if ((els = body
181: .getElementsByTagName("WorkflowStartedEvent")) != null
182: && els.item(0) != null) {
183: event = (Element) els.item(0);
184: insertStartedEvent(event);
185: } else if ((els = body
186: .getElementsByTagName("NodeTransitionEvent")) != null
187: && els.item(0) != null) {
188: event = (Element) els.item(0);
189: insertNodeTransitionEvent(event);
190: } else {
191: System.err.println("unknown event types");
192: }
193:
194: } catch (Exception e) {
195: e.printStackTrace();
196: }
197: }
198:
199: /**
200: * inserts the following parameters into NodeTransitionEvent Table in DB
201: * @param eventId
202: * @param fname
203: * @param ftype
204: * @param tname
205: * @param ttype
206: */
207: private void insertNodeTransitionEventTable(int eventId,
208: String fname, String ftype, String tname, String ttype) {
209: String query = "INSERT INTO evt_NodeTransitionEvent VALUES("
210: + eventId + ",'" + fname + "','" + ftype + "','"
211: + tname + "','" + ttype + "')";
212: System.out.println(query);
213: executeQuery(query);
214: }
215:
216: /**
217: *
218: * @param eventId
219: * @param workItemInternalId: if = -1, auto-incremented
220: * @param workItemId
221: * @param payloadType
222: * @param payload
223: * @return workItemInternalId, if it is -1 when passed in,
224: * return last modified value(by auto-increment) in db
225: */
226: private int insertEventWorkItemTable(int eventId,
227: int workItemInternalId, int workItemId, String payloadType,
228: String payload) {
229:
230: Integer internalId = new Integer(workItemInternalId);
231:
232: String query = "INSERT INTO evt_EventWorkItem VALUES("
233: + eventId
234: + ","
235: + ((workItemInternalId == -1) ? "null" : internalId
236: .toString()) + "," + workItemId + ",'"
237: + payloadType + "','" + payload + "')";
238: return executeQuery(query, "workItemInternalId",
239: "evt_EventWorkItem");
240:
241: }
242:
243: /**
244: * insert work item properties into WorkItemPropertiesTable
245: * @param workItenInternalId
246: * @param pname
247: * @param ptype
248: * @param pvalue
249: */
250: private void insertEventWorkItemPropertiesTable(
251: int workItenInternalId, String pname, String ptype,
252: String pvalue) {
253: String query = "INSERT INTO evt_EventWorkItemProperties VALUES("
254: + workItenInternalId
255: + ",'"
256: + pname
257: + "','"
258: + ptype
259: + "','" + pvalue + "')";
260: executeQuery(query);
261: }
262:
263: /**
264: * @param event
265: */
266: private void insertUpdatedEvent(Element event) {
267: int eventId;
268: Element var = (Element) event.getElementsByTagName("Variable")
269: .item(0);
270:
271: String name = var.getAttribute("name");
272: String type = var.getAttribute("type");
273: String value = var.getFirstChild().getNodeValue();
274:
275: eventId = insertEventTable(event);
276:
277: insertVariableUpdateEventTable(eventId, name, type, value);
278: }
279:
280: /**
281: * @param eventId
282: * @param name
283: * @param type
284: * @param value
285: */
286: private void insertVariableUpdateEventTable(int eventId,
287: String name, String type, String value) {
288: String query = "INSERT INTO evt_VariableUpdateEvent VALUES("
289: + eventId + ",'" + name + "','" + type + "','" + value
290: + "')";
291: executeQuery(query);
292: }
293:
294: /**
295: * insert timeout event into DB, first insert event info into event table,
296: * get the assigned eventId, then insert timeout info into timeout table
297: * @param event
298: */
299: private void insertTimeOutEvent(Element event) {
300: int eventId;
301: String processName = event.getElementsByTagName("ProcessName")
302: .item(0).getFirstChild().getNodeValue();
303:
304: eventId = insertEventTable(event);
305: insertProcessTimedOutTable(eventId, processName);
306: }
307:
308: /**
309: *
310: * @param eventId
311: * @param processName
312: */
313: private void insertProcessTimedOutTable(int eventId,
314: String processName) {
315: String query = "INSERT INTO evt_ProcessTimedOutEvent VALUES("
316: + eventId + ",'" + processName + "')";
317: executeQuery(query);
318: }
319:
320: /**
321: * @param event
322: */
323: private void insertCompletedEvent(Element event) {
324: insertEventTable(event);
325: }
326:
327: /**
328: *
329: * @param event
330: */
331: private void insertResumedEvent(Element event) {
332: insertEventTable(event);
333: }
334:
335: /**
336: *
337: * @param event
338: */
339: private void insertDeployedEvent(Element event) {
340: insertEventTable(event);
341: }
342:
343: /**
344: * @param event
345: */
346: private void insertAbortedEvent(Element event) {
347: insertEventTable(event);
348: }
349:
350: /**
351: *
352: * @param event
353: */
354: private void insertSupendedEvent(Element event) {
355: insertEventTable(event);
356: }
357:
358: /**
359: * insert event information into data base
360: * @param event
361: * @return eventId, this id is used by may operation
362: */
363: private int insertEventTable(Element event) {
364: String eventId = null;
365: String eventType, workflowName, user;
366: String timestamp;
367: int workflowVersion, workflowInstanceId, parentWorkflowInstanceId;
368: String[] info;
369:
370: info = retriveEventInfo(event);
371:
372: for (int i = 0; i < info.length; i++) {
373: System.out.println("in info " + info[i]);
374: }
375: eventType = info[0];
376: timestamp = info[1];
377: workflowName = info[2];
378: workflowVersion = Integer.parseInt(info[3]);
379: workflowInstanceId = (info[4] == null) ? -1 : Integer
380: .parseInt(info[4]);
381: parentWorkflowInstanceId = (info[5] == null) ? -1 : Integer
382: .parseInt(info[5]);
383: if (info[6] == null) {
384: user = "system";
385: } else {
386: user = info[6];
387: }
388:
389: String query = "INSERT INTO evt_event VALUES(null, " + "'"
390: + eventType + "','" + timestamp + "','" + workflowName
391: + "'," + workflowVersion + "," + workflowInstanceId
392: + "," + parentWorkflowInstanceId + ","
393: + ((user == null) ? "null)" : ("'" + user + "')"));
394:
395: System.out.println(query);
396:
397: java.sql.Connection conn = null;
398: try {
399: //now create a query and update db
400: conn = Db.getConnection();
401: Statement st = conn.createStatement();
402: st.execute(query);
403: ResultSet rs = st
404: .executeQuery("SELECT max(eventId), eventId from evt_event");
405: while (rs.next()) {
406: try {
407: eventId = rs.getString("eventId").trim();
408: System.out.println(eventId);
409: rs.next();
410: } catch (NumberFormatException e) {
411: System.err.println("bad event ID");
412: }
413: }
414: rs.close();
415: st.close();
416: } catch (Exception e) {
417: System.out.println(e.getMessage());
418: } finally {
419: if (conn != null) {
420: Db.returnConnection(conn);
421: }
422: }
423: System.out.println("eventId is " + eventId);
424: return Integer.parseInt(eventId);
425: }
426:
427: /**
428: * given the query string, execute the query on DB
429: * @param query
430: */
431: private void executeQuery(String query) {
432: System.out.println(query);
433: java.sql.Connection conn = null;
434: try {
435: //now create a query and update db
436: conn = Db.getConnection();
437: Statement st = conn.createStatement();
438: st.execute(query);
439: st.close();
440: } catch (Exception e) {
441: System.out.println(e.getMessage());
442: } finally {
443: if (conn != null) {
444: Db.returnConnection(conn);
445: }
446: }
447: }
448:
449: /**
450: * execute a query by given query string on table "tableName", return the largest
451: * index for specific colume. It can be use to get the last inserted entry if the
452: * column is auto-incremented
453: * @param query
454: * @param columnName
455: * @param tableName
456: * @return
457: */
458: private int executeQuery(String query, String columnName,
459: String tableName) {
460: System.out.println(query);
461: java.sql.Connection conn = null;
462: String lastMod = null;
463: try {
464: //now create a query and update db
465: conn = Db.getConnection();
466: Statement st = conn.createStatement();
467: st.execute(query);
468:
469: ResultSet rs = st.executeQuery("SELECT max(" + columnName
470: + ")," + columnName + " from " + tableName);
471: while (rs.next()) { // actually only has one row
472: try {
473: lastMod = rs.getString(columnName).trim();
474: } catch (NumberFormatException e) {
475: System.err.println("bad event ID");
476: }
477: }
478: rs.close();
479: st.close();
480: } catch (Exception e) {
481: System.out.println(e.getMessage());
482: } finally {
483: if (conn != null) {
484: Db.returnConnection(conn);
485: }
486: }
487: return Integer.parseInt(lastMod);
488: }
489:
490: /**
491: * get information for event from soap body element--event
492: * @param event
493: * @return
494: */
495: private String[] retriveEventInfo(Element event) {
496: String info[] = new String[7];
497:
498: info[0] = event.getLocalName();
499: info[1] = event.getElementsByTagName("Timestamp").item(0)
500: .getFirstChild().getNodeValue();
501: info[2] = event.getElementsByTagName("WorkflowName").item(0)
502: .getFirstChild().getNodeValue();
503: info[3] = event.getElementsByTagName("WorkflowVersion").item(0)
504: .getFirstChild().getNodeValue();
505:
506: if (event.getElementsByTagName("WorkflowInstanceId").item(0) != null) {
507: info[4] = event.getElementsByTagName("WorkflowInstanceId")
508: .item(0).getFirstChild().getNodeValue();
509: } else {
510: info[4] = "-1";
511: }
512:
513: Node pwi, user;
514: if ((pwi = event.getElementsByTagName(
515: "ParentWorkflowInstanceId").item(0)) != null
516: && pwi.getFirstChild() != null) {
517: info[5] = pwi.getFirstChild().getNodeValue();
518: } else {
519: info[5] = null;
520: }
521:
522: if ((user = event.getElementsByTagName("User").item(0)) != null
523: && user.getFirstChild() != null) {
524: info[6] = user.getFirstChild().getNodeValue();
525: } else {
526: info[6] = null;
527: }
528: return info;
529: }
530:
531: /**
532: * involves inserting information into 3 tables:event, eventworkitem, and
533: * eventworkitemproperties table.
534: * @param event
535: */
536: private void insertNodeTransitionEvent(Element event) {
537: int eventId, workItemInternalId;
538: Element workItem = (Element) event.getElementsByTagName(
539: "WorkItem").item(0);
540: String[] info = retriveWorkItemInfo(workItem);
541:
542: int workItemId = Integer.parseInt(info[0]);
543: String payloadType = info[1];
544: String payload = info[2];
545:
546: eventId = insertEventTable(event);
547: workItemInternalId = insertEventWorkItemTable(eventId, -1,
548: workItemId, payloadType, payload);
549: info = retrieveTransitionInfo(event);
550: String fromName = info[0];
551: String fromType = info[1];
552: String toName = info[2];
553: String toType = info[3];
554: insertNodeTransitionEventTable(eventId, fromName, fromType,
555: toName, toType);
556:
557: Element ps = (Element) workItem.getElementsByTagName(
558: "Properties").item(0);
559: NodeList proplist = ps.getElementsByTagName("Property");
560: for (int i = 0; i < proplist.getLength(); i++) {
561: String pname, ptype, pvalue;
562: Element p = (Element) proplist.item(i);
563: info = retriveWorkItemPropertyInfo(p);
564: pname = info[0];
565: ptype = info[1];
566: pvalue = info[2];
567:
568: insertEventWorkItemPropertiesTable(workItemInternalId,
569: pname, ptype, pvalue);
570:
571: }
572: }
573:
574: /**
575: * involves inserting information into 2 tables: event and eventworkitem
576: * @param event
577: */
578: private void insertStartedEvent(Element event) {
579: int eventId, workItemInternalId;
580: Element workItem = (Element) event.getElementsByTagName(
581: "WorkItem").item(0);
582: String[] info = retriveWorkItemInfo(workItem);
583:
584: int workItemId = Integer.parseInt(info[0]);
585: String payloadType = info[1];
586: String payload = info[2];
587:
588: eventId = insertEventTable(event);
589:
590: workItemInternalId = insertEventWorkItemTable(eventId, -1,
591: workItemId, payloadType, payload);
592: Element ps = (Element) workItem.getElementsByTagName(
593: "Properties").item(0);
594: NodeList proplist = ps.getElementsByTagName("Property");
595: for (int i = 0; i < proplist.getLength(); i++) {
596: String pname, ptype, pvalue;
597: Element p = (Element) proplist.item(i);
598: info = retriveWorkItemPropertyInfo(p);
599: pname = info[0];
600: ptype = info[1];
601: pvalue = info[2];
602:
603: insertEventWorkItemPropertiesTable(workItemInternalId,
604: pname, ptype, pvalue);
605:
606: }
607: }
608:
609: /**
610: * get information from property node
611: * @param property
612: * @return
613: */
614: private String[] retriveWorkItemPropertyInfo(Element property) {
615: String info[] = new String[3];
616: info[0] = property.getElementsByTagName("Name").item(0)
617: .getFirstChild().getNodeValue();
618: info[1] = property.getElementsByTagName("Type").item(0)
619: .getFirstChild().getNodeValue();
620: info[2] = property.getElementsByTagName("Value").item(0)
621: .getFirstChild().getNodeValue();
622: return info;
623: }
624:
625: /**
626: * get information from workitem node
627: * @param workItem
628: * @return
629: */
630: private String[] retriveWorkItemInfo(Element workItem) {
631: String info[] = new String[3];
632:
633: info[0] = workItem.getElementsByTagName("WorkItemId").item(0)
634: .getFirstChild().getNodeValue();
635: Element pld = (Element) workItem
636: .getElementsByTagName("Payload").item(0);
637:
638: info[1] = pld.getAttribute("type");
639:
640: NodeList plist = pld.getChildNodes();
641: info[2] = "";
642: for (int i = 0; i < plist.getLength(); i++) {
643: info[2] += plist.item(i).toString();
644: }
645: return info;
646: }
647:
648: /**
649: * get transistion information from event node
650: * @param event
651: * @return
652: */
653: private String[] retrieveTransitionInfo(Element event) {
654: String info[] = new String[4];
655:
656: Element from = (Element) event.getElementsByTagName("From")
657: .item(0);
658: Element to = (Element) event.getElementsByTagName("To").item(0);
659:
660: info[0] = from.getAttribute("nodeName");
661: info[1] = from.getAttribute("nodeType");
662:
663: info[2] = to.getAttribute("nodeName");
664: info[3] = to.getAttribute("nodeType");
665:
666: return info;
667: }
668:
669: public static void main(String[] args) throws XflowException,
670: JMSException {
671:
672: String propFileName = args[0];
673: if (propFileName == null) {
674: System.out
675: .println("Usage: xflow.events.EventsHandler <properties file name>");
676: System.exit(0);
677: }
678:
679: Properties props = new Properties();
680: try {
681: FileInputStream fi = new FileInputStream(propFileName);
682: props.load(fi);
683: fi.close();
684: } catch (FileNotFoundException fx) {
685: System.out.print("Property file not found: "
686: + fx.getMessage());
687: return;
688: } catch (IOException e) {
689: System.out.print("Failed to read property file: "
690: + e.getMessage());
691: return;
692: }
693:
694: new EventsHandler(props);
695: }
696: }
|