001: /*
002: * $Id: ProcessControllerImpl.java,v 1.22 2004/12/09 12:34:19 kowap Exp $
003: *
004: * Copyright (c) 2004 Patric Fornasier, Pawel Kowalski
005: * Berne University of Applied Sciences
006: * School of Engineering and Information Technology
007: * All rights reserved.
008: */
009: package bexee.core;
010:
011: import java.util.Collection;
012: import java.util.Iterator;
013: import java.util.List;
014: import java.util.Map;
015:
016: import javax.wsdl.Binding;
017: import javax.wsdl.Definition;
018: import javax.wsdl.Message;
019: import javax.wsdl.Operation;
020: import javax.wsdl.Part;
021: import javax.wsdl.Port;
022: import javax.wsdl.PortType;
023: import javax.wsdl.extensions.ExtensibilityElement;
024: import javax.wsdl.extensions.soap.SOAPAddress;
025: import javax.xml.namespace.QName;
026: import javax.xml.rpc.ParameterMode;
027:
028: import org.apache.axis.Constants;
029: import org.apache.axis.client.Call;
030: import org.apache.axis.client.Service;
031: import org.apache.commons.logging.Log;
032: import org.apache.commons.logging.LogFactory;
033:
034: import bexee.event.Event;
035: import bexee.model.activity.Activity;
036: import bexee.model.activity.Assign;
037: import bexee.model.activity.Compensate;
038: import bexee.model.activity.Empty;
039: import bexee.model.activity.Flow;
040: import bexee.model.activity.Invoke;
041: import bexee.model.activity.Receive;
042: import bexee.model.activity.Reply;
043: import bexee.model.activity.Sequence;
044: import bexee.model.activity.Switch;
045: import bexee.model.elements.BpelCase;
046: import bexee.model.elements.Copy;
047: import bexee.model.elements.Correlation;
048: import bexee.model.elements.CorrelationPattern;
049: import bexee.model.elements.From;
050: import bexee.model.elements.Link;
051: import bexee.model.elements.PartnerLink;
052: import bexee.model.elements.PartnerLinks;
053: import bexee.model.elements.To;
054: import bexee.model.elements.Variable;
055: import bexee.model.elements.Variables;
056: import bexee.model.process.BPELProcess;
057: import bexee.model.process.Process;
058:
059: /**
060: * The default imlementation of the <code>ProcessController</code>.
061: *
062: * @version $Revision: 1.22 $, $Date: 2004/12/09 12:34:19 $
063: * @author Patric Fornasier
064: * @author Pawel Kowalski
065: */
066: public class ProcessControllerImpl implements ProcessController {
067:
068: private static Log log = LogFactory
069: .getLog(ProcessControllerImpl.class);
070:
071: //**************************************************/
072: // processor entry method
073: //**************************************************/
074:
075: public void processMessage(ProcessInstance instance,
076: BexeeMessage message) {
077:
078: log.info("Processing a ProcessInstance");
079:
080: // initialize objects
081: //
082: ProcessContext ctx = instance.getContext();
083: BPELProcess process = instance.getProcess();
084:
085: ctx.setMessage(message);
086:
087: // start processing process elements
088: try {
089: process.getProcess().accept(this , instance);
090: } catch (AwaitMessageException e) {
091: e.printStackTrace();
092: } catch (Exception e) {
093: e.printStackTrace();
094: }
095: }
096:
097: //**************************************************/
098: // process the Process
099: //**************************************************/
100:
101: public void process(Process process, ProcessInstance instance)
102: throws Exception {
103:
104: log.info("Processing a Process");
105:
106: // process all child elements
107: Variables variables = process.getVariables();
108: if (variables != null) {
109: variables.accept(this , instance);
110: }
111:
112: PartnerLinks partnerLinks = process.getPartnerLinks();
113: if (partnerLinks != null) {
114: partnerLinks.accept(this , instance);
115: }
116:
117: Activity activity = process.getActivity();
118: // there must be one root activity of this process
119: if (activity != null) {
120: activity.accept(this , instance);
121: } else {
122: throw new MissingActivityException();
123: }
124: }
125:
126: //**************************************************/
127: // process structured activities
128: //**************************************************/
129:
130: public void process(Sequence sequence, ProcessInstance instance)
131: throws Exception {
132:
133: log(sequence);
134:
135: // process all activities in sequence
136: List activities = sequence.getActivities();
137: for (Iterator iter = activities.iterator(); iter.hasNext();) {
138: Activity activity = (Activity) iter.next();
139: activity.accept(this , instance);
140: }
141: }
142:
143: /**
144: * This is the implementation of the Flow activity which is used for
145: * parallel execution. In this primitive implementation we don't use any
146: * thread pooling mechanisms. In a more sophisticated version of the
147: * ProcessController, we might make use of the
148: * http://jakarta.apache.org/commons/pool/ library for thread pooling.
149: */
150: public void process(Flow flow, ProcessInstance instance)
151: throws Exception {
152:
153: log(flow);
154:
155: List activities = flow.getActivities();
156: int activitiesCount = activities.size();
157: Thread[] flowThreads = new Thread[activitiesCount];
158:
159: // execute activities in separate threads
160: // for parallelity
161: //
162: for (int i = 0; i < activitiesCount; i++) {
163: Activity activity = (Activity) activities.get(i);
164: flowThreads[i] = new FlowThread(this , instance, activity);
165: flowThreads[i].start();
166: }
167:
168: // wait for termination of all started FlowThreads
169: for (int i = 0; i < flowThreads.length; i++) {
170: flowThreads[i].join();
171: }
172:
173: /**
174: * TODO because the run() Thread method is not allowed to throw
175: * Exceptions, it is necessary to check the status of each executed
176: * Thread in order to know wheter it finished its taks correctly or
177: * failed.
178: */
179: }
180:
181: /**
182: * A default implementation of the
183: * <code>process(Switch, ProcessInstance)</code> method.
184: *
185: */
186: public void process(Switch bpelSwitch, ProcessInstance instance)
187: throws Exception {
188:
189: List bpelCases = bpelSwitch.getCases();
190: for (Iterator iter = bpelCases.iterator(); iter.hasNext();) {
191: BpelCase bpelCase = (BpelCase) iter.next();
192: if (bpelCase.getBooleanExpression().evaluate()) {
193: bpelCase.getCaseActivity().accept(this , instance);
194: return;
195: }
196: }
197:
198: Activity otherwise = bpelSwitch.getOtherwise();
199: if (otherwise != null) {
200: otherwise.accept(this , instance);
201: }
202: }
203:
204: //**************************************************/
205: // process atomic activities
206: //**************************************************/
207:
208: public void process(Receive receive, ProcessInstance instance)
209: throws AwaitMessageException {
210:
211: log(receive);
212:
213: // get process context
214: ProcessContext ctx = instance.getContext();
215: BPELProcess process = instance.getProcess();
216:
217: Event event = ctx.getEvent(receive);
218: if (event == null) {
219:
220: BexeeMessage message = ctx.removeMessage();
221:
222: if (message == null) {
223: throw new AwaitMessageException(receive);
224: } else {
225: event = new Event(receive);
226:
227: // copy received value into context
228: Variable var = receive.getVariable();
229: ctx.setVariable(var, message.getParts());
230:
231: ctx.addEvent(event);
232: }
233: }
234: }
235:
236: public void process(Assign assign, ProcessInstance instance)
237: throws Exception {
238:
239: log(assign);
240:
241: BPELProcess process = instance.getProcess();
242: ProcessContext ctx = instance.getContext();
243:
244: Event event = ctx.getEvent(assign);
245: if (event == null) {
246:
247: // event creation
248: event = new Event(assign);
249:
250: // get the copy of this assign
251: Copy copy = assign.getCopy();
252:
253: // get from information
254: From from = copy.getFrom();
255: Variable fromVar = from.getVariable();
256: String fromVarPart = from.getPart();
257:
258: // get to information
259: To to = copy.getTo();
260: Variable toVar = to.getVariable();
261: String toVarPart = to.getPart();
262:
263: // get the from variable part and assign it
264: Object variablePart = ctx.getVariablePart(fromVar,
265: fromVarPart);
266:
267: // this is some magic just for demonstration purposes
268: //
269: String responseString = "";
270:
271: if (assign.getName() != null
272: && assign.getName().indexOf("Response") != -1) {
273: Collection varIds = ctx.getVariablesIdentifiers();
274: for (Iterator iter = varIds.iterator(); iter.hasNext();) {
275: Variable variable = (Variable) iter.next();
276: Collection parts = ctx.getVariable(variable)
277: .values();
278: for (Iterator iterator = parts.iterator(); iterator
279: .hasNext();) {
280: Object element = (Object) iterator.next();
281: if (element != null) {
282: responseString += "} {"
283: + element.toString();
284: }
285: }
286: }
287: variablePart = responseString;
288: }
289:
290: ctx.setVariablePart(toVar, toVarPart, variablePart);
291: ctx.addEvent(event);
292: }
293: }
294:
295: public void process(Invoke invoke, ProcessInstance instance)
296: throws Exception {
297:
298: log(invoke);
299:
300: // initialize objects
301: //
302: ProcessContext ctx = instance.getContext();
303: BPELProcess process = instance.getProcess();
304:
305: Event event = ctx.getEvent(invoke);
306: if (event == null) {
307:
308: // event creation
309: event = new Event(invoke);
310:
311: // get in variable value
312: Variable inVar = invoke.getInputVariable();
313: Map inVarParts = ctx.getVariable(inVar);
314: Object[] variablePartsAsArray = inVarParts.values()
315: .toArray();
316:
317: // get out variable value
318: Variable outVar = invoke.getOutputVariable();
319: Map outVarParts = ctx.getVariable(outVar);
320:
321: // get infos about the web service to be invoked
322: String operationName = invoke.getOperation();
323: QName portTypeName = invoke.getPortType();
324:
325: // traverse definition in order to find information about the call
326: Definition definition = findWSDL(portTypeName, process);
327: PortType portType = definition.getPortType(portTypeName);
328: Operation operationType = portType.getOperation(
329: operationName, null, null);
330: Message inMessage = operationType.getInput().getMessage();
331: Object[] messageParts = inMessage.getParts().values()
332: .toArray();
333:
334: Binding binding = findBinding(definition, portTypeName);
335: Port port = findPort(definition, binding);
336: SOAPAddress soapAddress = findSOAPAddress(port);
337:
338: // build service call
339: Service service = new Service();
340: Call call = (Call) service.createCall();
341: call.setTargetEndpointAddress(soapAddress.getLocationURI());
342: call.setPortName(invoke.getPortType());
343: call.setOperationName(invoke.getOperation());
344:
345: // add parameters to the call
346: for (int i = 0; i < messageParts.length; i++) {
347: Part messagePart = (Part) messageParts[i];
348: call.addParameter(messagePart.getName(), messagePart
349: .getTypeName(), ParameterMode.IN);
350: }
351: call.setReturnType(Constants.XSD_STRING);
352:
353: // this will be the result of the invoke
354: Object result = null;
355:
356: // call synchronously
357: if (invoke.isSynchronous()) {
358: result = call.invoke(variablePartsAsArray);
359: }
360: // call asynchronously
361: else {
362: // TODO asynchronous call in new thread
363: }
364:
365: // assign result to the variable
366: Variable variable = invoke.getOutputVariable();
367: Message outMessage = operationType.getOutput().getMessage();
368: Object[] outparts = outMessage.getParts().values()
369: .toArray();
370: for (int i = 0; i < outparts.length; i++) {
371: Part messagePart = (Part) messageParts[i];
372: ctx.setVariablePart(variable, messagePart.getName(),
373: result);
374: }
375:
376: ctx.addEvent(event);
377: }
378: }
379:
380: private Definition findWSDL(QName portTypeName, BPELProcess process) {
381:
382: Definition definition = null;
383: Binding binding = null;
384:
385: // find the right binding for the portType
386: //
387: Collection wsdlCollection = process.getPartnerWSDL();
388: mainloop: for (Iterator iter = wsdlCollection.iterator(); iter
389: .hasNext();) {
390: definition = (Definition) iter.next();
391: Collection bindings = definition.getBindings().values();
392: for (Iterator iterator = bindings.iterator(); iterator
393: .hasNext();) {
394: binding = (Binding) iterator.next();
395: if (binding.getPortType().getQName().equals(
396: portTypeName)) {
397: break mainloop;
398: }
399: }
400: }
401: return definition;
402: }
403:
404: private Binding findBinding(Definition definition,
405: QName portTypeName) {
406:
407: Binding binding = null;
408:
409: Collection bindings = definition.getBindings().values();
410: for (Iterator iterator = bindings.iterator(); iterator
411: .hasNext();) {
412: binding = (Binding) iterator.next();
413: if (binding.getPortType().getQName().equals(portTypeName)) {
414: break;
415: }
416: }
417: return binding;
418: }
419:
420: private Port findPort(Definition definition, Binding binding) {
421:
422: Port port = null;
423:
424: Collection services = definition.getServices().values();
425: for (Iterator iterator = services.iterator(); iterator
426: .hasNext();) {
427: javax.wsdl.Service service = (javax.wsdl.Service) iterator
428: .next();
429: Collection portTypes = service.getPorts().values();
430: for (Iterator iter2 = portTypes.iterator(); iter2.hasNext();) {
431: port = (Port) iter2.next();
432: if (port.getBinding().equals(binding)) {
433: break;
434: }
435: }
436: }
437: return port;
438: }
439:
440: private SOAPAddress findSOAPAddress(Port port) {
441:
442: SOAPAddress soapAddress = null;
443:
444: // find and return the location
445: //
446: Collection extElems = port.getExtensibilityElements();
447: for (Iterator iter = extElems.iterator(); iter.hasNext();) {
448: ExtensibilityElement element = (ExtensibilityElement) iter
449: .next();
450: if (element instanceof SOAPAddress) {
451: soapAddress = (SOAPAddress) element;
452: break;
453: }
454: }
455: return soapAddress;
456: }
457:
458: private String findPortLocation(QName portTypeName,
459: BPELProcess process) {
460:
461: Binding binding = null;
462: SOAPAddress soapAddress = null;
463: Port port = null;
464:
465: // find the right binding for the portType
466: //
467: Collection wsdlCollection = process.getPartnerWSDL();
468: mainloop: for (Iterator iter = wsdlCollection.iterator(); iter
469: .hasNext();) {
470: Definition defn = (Definition) iter.next();
471: Collection bindings = defn.getBindings().values();
472: for (Iterator iterator = bindings.iterator(); iterator
473: .hasNext();) {
474: binding = (Binding) iterator.next();
475: if (binding.getPortType().getQName().equals(
476: portTypeName)) {
477: break mainloop;
478: }
479: }
480: }
481:
482: // find the right port within the service for the portType
483: //
484: mainloop: for (Iterator iter = wsdlCollection.iterator(); iter
485: .hasNext();) {
486: Definition defn = (Definition) iter.next();
487: Collection services = defn.getServices().values();
488: for (Iterator iterator = services.iterator(); iterator
489: .hasNext();) {
490: javax.wsdl.Service service = (javax.wsdl.Service) iterator
491: .next();
492: Collection portTypes = service.getPorts().values();
493: for (Iterator iter2 = portTypes.iterator(); iter2
494: .hasNext();) {
495: port = (Port) iter2.next();
496: if (port.getBinding().equals(binding)) {
497: break mainloop;
498: }
499: }
500: }
501: }
502:
503: // find and return the location
504: //
505: if (port != null) {
506: Collection extElems = port.getExtensibilityElements();
507: for (Iterator iter = extElems.iterator(); iter.hasNext();) {
508: ExtensibilityElement element = (ExtensibilityElement) iter
509: .next();
510: if (element instanceof SOAPAddress) {
511: soapAddress = (SOAPAddress) element;
512: break;
513: }
514: }
515: return soapAddress.getLocationURI();
516: }
517:
518: // no location found
519: //
520: return null;
521: }
522:
523: public void process(Reply reply, ProcessInstance instance)
524: throws Exception {
525:
526: log(reply);
527:
528: BPELProcess process = instance.getProcess();
529: ProcessContext ctx = instance.getContext();
530:
531: Event event = ctx.getEvent(reply);
532: if (event == null) {
533:
534: event = new Event(reply);
535:
536: // copy variable into output
537: Variable variable = reply.getVariable();
538: Map parts = ctx.getVariable(variable);
539:
540: // we only return the first found part as a return
541: Object result = parts.values().iterator().next();
542: ctx.setResult(result);
543:
544: ctx.addEvent(event);
545: }
546: }
547:
548: public void process(Empty empty, ProcessInstance instance) {
549:
550: log(empty);
551:
552: BPELProcess process = instance.getProcess();
553: ProcessContext ctx = instance.getContext();
554:
555: Event event = ctx.getEvent(empty);
556: if (event == null) {
557: event = new Event(empty);
558:
559: //
560: // this is empty, do nothing
561: //
562:
563: ctx.addEvent(event);
564: }
565: }
566:
567: /**
568: * Process method for all unimplemented activities.
569: */
570: public void process(Activity activity, ProcessInstance instance) {
571: log(activity);
572: BPELProcess process = instance.getProcess();
573: ProcessContext ctx = instance.getContext();
574: Event event = ctx.getEvent(activity);
575: if (event == null) {
576: event = new Event(activity);
577: ctx.addEvent(event);
578: }
579: }
580:
581: //**************************************************/
582: // process Process elements
583: //**************************************************/
584:
585: public void process(Variables variables, ProcessInstance instance)
586: throws Exception {
587:
588: log.info("Processing Variables");
589:
590: // process all child elements
591: List list = variables.getVariables();
592: for (Iterator iter = list.iterator(); iter.hasNext();) {
593: Variable variable = (Variable) iter.next();
594: variable.accept(this , instance);
595: }
596: }
597:
598: public void process(Variable variable, ProcessInstance instance) {
599: log.info("Processing a Variable: " + variable.getName());
600: }
601:
602: public void process(PartnerLinks partnerLinks,
603: ProcessInstance instance) throws Exception {
604:
605: log.info("Processing PartnerLinks");
606:
607: // process all child elements
608: List list = partnerLinks.getPartnerLinks();
609: for (Iterator iter = list.iterator(); iter.hasNext();) {
610: PartnerLink partnerLink = (PartnerLink) iter.next();
611: partnerLink.accept(this , instance);
612: }
613: }
614:
615: public void process(PartnerLink partnerLink,
616: ProcessInstance instance) {
617: log.info("Processing a PartnerLink: " + partnerLink.getName());
618: }
619:
620: //**************************************************/
621: // will not be implemented in the scope of the diploma project
622: //**************************************************/
623:
624: public void process(Link link, ProcessInstance instance) {
625: }
626:
627: public void process(Compensate impl, ProcessInstance instance) {
628: }
629:
630: public void process(Correlation correlation,
631: ProcessInstance instance) {
632: }
633:
634: public void process(CorrelationPattern correlationPattern,
635: ProcessInstance instance) {
636: }
637:
638: //**************************************************/
639: // helper methods
640: //**************************************************/
641:
642: private Receive matchMessage(BexeeMessage message, List receives) {
643: // FIXME For now just return first encountered receive
644: return (Receive) receives.get(0);
645: }
646:
647: private void log(Activity activity) {
648: if (activity != null) {
649: log.info("Processing " + activity.getClass().getName()
650: + " name: " + activity.getName());
651: } else {
652: log.info("Processing null activity");
653: }
654: }
655:
656: public void process(Copy copy, ProcessInstance instance)
657: throws Exception {
658: }
659:
660: }
|