001: /*
002: * This file is part of the WfMOpen project.
003: * Copyright (C) 2001-2006 Danet GmbH (www.danet.de), BU BTS.
004: * All rights reserved.
005: *
006: * This program is free software; you can redistribute it and/or modify
007: * it under the terms of the GNU General Public License as published by
008: * the Free Software Foundation; either version 2 of the License, or
009: * (at your option) any later version.
010: *
011: * This program is distributed in the hope that it will be useful,
012: * but WITHOUT ANY WARRANTY; without even the implied warranty of
013: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
014: * GNU General Public License for more details.
015: *
016: * You should have received a copy of the GNU General Public License
017: * along with this program; if not, write to the Free Software
018: * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
019: *
020: * $Id: WfXmlAuditHandler.java,v 1.10 2007/03/01 12:32:57 schnelle Exp $
021: *
022: * $Log: WfXmlAuditHandler.java,v $
023: * Revision 1.10 2007/03/01 12:32:57 schnelle
024: * Enhanced Instance.SetProperties to process ContextData.
025: *
026: * Revision 1.9 2007/02/17 21:19:48 mlipp
027: * Workflow service caching redone.
028: *
029: * Revision 1.8 2007/02/06 08:35:34 schnelle
030: * Started automatic generation of wsdl description.
031: *
032: * Revision 1.7 2007/01/31 22:55:36 mlipp
033: * Some more refactoring and fixes of problems introduced by refactoring.
034: *
035: * Revision 1.6 2007/01/31 14:53:06 schnelle
036: * Small corrections wvaluating the resource reference.
037: *
038: * Revision 1.5 2007/01/31 12:24:07 drmlipp
039: * Design revisited.
040: *
041: * Revision 1.4 2007/01/30 21:04:17 mlipp
042: * Fixed package/process id retrieval.
043: *
044: * Revision 1.3 2007/01/30 13:11:37 schnelle
045: * Corrected check to decide the sending of a completed message.
046: *
047: * Revision 1.2 2007/01/30 11:56:14 drmlipp
048: * Merged Wf-XML branch.
049: *
050: * Revision 1.1.2.10 2007/01/29 15:04:20 schnelle
051: * Renaming of Observer to ObserverRegistry and URIDecoder to ResourceReference.
052: *
053: * Revision 1.1.2.9 2007/01/29 13:40:32 schnelle
054: * Storing of the sender base in the servlet context.
055: *
056: * Revision 1.1.2.8 2007/01/26 15:50:29 schnelle
057: * Added encoding for process id and package id.
058: *
059: * Revision 1.1.2.7 2007/01/24 14:22:39 schnelle
060: * Observer handler starts on servlet startup.
061: *
062: * Revision 1.1.2.6 2007/01/24 10:56:50 schnelle
063: * Prepared return of a result for aobservers.
064: *
065: * Revision 1.1.2.5 2007/01/16 11:05:42 schnelle
066: * Refactoring: Moved subscription handling methods to own class.
067: *
068: * Revision 1.1.2.4 2007/01/11 11:37:10 schnelle
069: * Added subscription if an oberver key is given in the creation of a process.
070: *
071: * Revision 1.1.2.3 2007/01/11 10:41:53 schnelle
072: * Sending notification messages to all observers.
073: *
074: * Revision 1.1.2.2 2007/01/11 10:23:52 schnelle
075: * Creation of StateChanged notifications.
076: *
077: * Revision 1.1.2.1 2007/01/10 13:41:28 schnelle
078: * Implemented subscribe.
079: *
080: */
081: package de.danet.an.workflow.clients.wfxml;
082:
083: import java.io.IOException;
084: import java.net.MalformedURLException;
085: import java.net.URL;
086: import java.rmi.RemoteException;
087: import java.sql.SQLException;
088: import java.text.DateFormat;
089: import java.text.SimpleDateFormat;
090: import java.util.Collection;
091: import java.util.Date;
092: import java.util.Iterator;
093:
094: import javax.security.auth.callback.Callback;
095: import javax.security.auth.callback.CallbackHandler;
096: import javax.security.auth.callback.NameCallback;
097: import javax.security.auth.callback.PasswordCallback;
098: import javax.security.auth.callback.TextOutputCallback;
099: import javax.security.auth.callback.UnsupportedCallbackException;
100: import javax.security.auth.login.LoginContext;
101: import javax.security.auth.login.LoginException;
102: import javax.xml.soap.SOAPConnection;
103: import javax.xml.soap.SOAPConnectionFactory;
104: import javax.xml.soap.SOAPElement;
105: import javax.xml.soap.SOAPException;
106: import javax.xml.soap.SOAPMessage;
107: import javax.xml.transform.TransformerException;
108:
109: import org.xml.sax.SAXException;
110:
111: import de.danet.an.workflow.api.EventSubscriber;
112: import de.danet.an.workflow.api.FactoryConfigurationError;
113: import de.danet.an.workflow.api.ProcessClosedAuditEvent;
114: import de.danet.an.workflow.api.SAXEventBuffer;
115: import de.danet.an.workflow.api.WorkflowService;
116: import de.danet.an.workflow.api.WorkflowServiceFactory;
117: import de.danet.an.workflow.clients.wfxml.ObserverRegistry.ObserverInfo;
118: import de.danet.an.workflow.omgcore.InvalidPerformerException;
119: import de.danet.an.workflow.omgcore.ProcessData;
120: import de.danet.an.workflow.omgcore.ResultNotAvailableException;
121: import de.danet.an.workflow.omgcore.WfAuditEvent;
122: import de.danet.an.workflow.omgcore.WfAuditHandler;
123: import de.danet.an.workflow.omgcore.WfStateAuditEvent;
124:
125: /**
126: * This class is an event handler foe all audit events of the workflow engine.
127: * All events are collected and distributed to the subscribed observers.
128: *
129: * @author Dirk Schnelle
130: *
131: * TODO: Check if this is better implemented as an mbean.
132: * TODO: Move the functionality of message creation to the response generators.
133: */
134: class WfXmlAuditHandler implements WfAuditHandler {
135: /** Logger instance. */
136: private static final org.apache.commons.logging.Log logger = org.apache.commons.logging.LogFactory
137: .getLog(WfAuditHandler.class);
138:
139: /** Formatter to produce xsd time stamps. */
140: protected static DateFormat xsdDateTimeFormat = new SimpleDateFormat(
141: "yyyy-MM-dd'T'HH:mm:ss");
142:
143: /** The workflow service. */
144: private WorkflowService workflowService = null;
145:
146: /** the observer registry. */
147: private ObserverRegistry observerRegistry;
148:
149: /**
150: * Constructs a new object.
151: *
152: * @param wfs the workflow service
153: * @param obs the observer registry.
154: */
155: public WfXmlAuditHandler(WorkflowService wfs, ObserverRegistry obs) {
156: workflowService = wfs;
157: this .observerRegistry = obs;
158: }
159:
160: /* (non-Javadoc)
161: * @see de.danet.an.workflow.omgcore.WfAuditHandler#receiveEvent(de.danet.an.workflow.omgcore.WfAuditEvent)
162: */
163: public void receiveEvent(WfAuditEvent event)
164: throws InvalidPerformerException, RemoteException {
165: String type = event.eventType();
166: if (!type.equals(WfAuditEvent.PROCESS_STATE_CHANGED)) {
167: return;
168: }
169:
170: WfStateAuditEvent stateEvent = (WfStateAuditEvent) event;
171:
172: String mgrName = stateEvent.processMgrName();
173: String[] ids = mgrName.split("/");
174: String packageId = ids[0];
175: String processId = ids[1];
176: String processKey = stateEvent.processKey();
177:
178: Collection observers;
179: try {
180: observers = observerRegistry.getObservers(packageId,
181: processId, processKey);
182: } catch (SQLException sqle) {
183: logger.warn("error retrieving observer list", sqle);
184: return;
185: }
186:
187: for (Iterator iterator = observers.iterator(); iterator
188: .hasNext();) {
189: ObserverInfo observerInfo = (ObserverInfo) iterator.next();
190: ResourceReference resRef = new ResourceReference(
191: observerInfo.getSenderBase(), packageId, processId,
192: event.processKey());
193: InstanceResponseGenerator irg = new InstanceResponseGenerator(
194: observerRegistry, workflowService, resRef);
195: try {
196: sendStateChanged(observerInfo.getObserverKey(), irg,
197: stateEvent);
198: if (stateEvent instanceof ProcessClosedAuditEvent) {
199: ProcessData result = ((ProcessClosedAuditEvent) stateEvent)
200: .result();
201: sendCompleted(observerInfo.getObserverKey(), irg,
202: result);
203: }
204: } catch (IllegalStateException e) {
205: logger.warn("Problem sending event to "
206: + observerInfo.getObserverKey() + ": "
207: + e.getMessage(), e);
208: try {
209: observerRegistry.unsubscribe(observerInfo
210: .getObserverKey());
211: } catch (SQLException ee) {
212: logger.warn("error removing observer", ee);
213: return;
214: }
215: }
216: }
217:
218: // Do some cleanup if the process is no more available.
219: if (stateEvent.newState().startsWith("closed")) {
220: try {
221: observerRegistry.unsubscribe(packageId, processId,
222: processKey);
223: } catch (SQLException e) {
224: logger.warn("error unsubscribing observer", e);
225: return;
226: }
227: }
228: }
229:
230: /**
231: * Sends a {@link Consts.STATE_CHANGE_REQUEST} message to all observers.
232: * @param observers observers.
233: * @param newState new state.
234: * @param oldState old state.
235: */
236: private void sendStateChanged(String observer,
237: InstanceResponseGenerator irg, WfStateAuditEvent event) {
238: String newState = StateMapper.omg2asapState(event.newState());
239: String oldState = StateMapper.omg2asapState(event.oldState());
240: try {
241: SOAPMessage message = irg.createStateChangedMessage(
242: observer, newState, oldState);
243: call(message, observer);
244: } catch (MalformedURLException e) {
245: throw (IllegalStateException) (new IllegalStateException(e
246: .getMessage()).initCause(e));
247: } catch (SOAPException e) {
248: throw (IllegalStateException) (new IllegalStateException(e
249: .getMessage()).initCause(e));
250: }
251: }
252:
253: /**
254: * Sends a {@link Consts.STATE_CHANGE_REQUEST} message to all observers.
255: * @param obs observer persistence.
256: * @param observers observers.
257: * @param result result of the process.
258: * @throws SAXException
259: * @throws TransformerException
260: * @throws ResultNotAvailableException
261: * @throws RemoteException
262: */
263: private void sendCompleted(String observer,
264: InstanceResponseGenerator irg, ProcessData result)
265: throws RemoteException {
266: try {
267: SOAPMessage message = irg.createCompletedMessage(observer,
268: result);
269: call(message, observer);
270: } catch (MalformedURLException e) {
271: throw (IllegalStateException) (new IllegalStateException(e
272: .getMessage()).initCause(e));
273: } catch (SOAPException e) {
274: throw (IllegalStateException) (new IllegalStateException(e
275: .getMessage()).initCause(e));
276: }
277: }
278:
279: /**
280: * Calls the default endpoint with the given message.
281: * @param request the SOAP request.
282: * @return SOAP response.
283: * @throws SOAPException
284: * @throws MalformedURLException
285: */
286: private void call(SOAPMessage message, String observer)
287: throws SOAPException, MalformedURLException {
288: SOAPConnectionFactory factory = SOAPConnectionFactory
289: .newInstance();
290: SOAPConnection connection = factory.createConnection();
291:
292: URL endpoint = new URL(observer);
293: connection.call(message, endpoint);
294: }
295:
296: }
|