001: /*
002: * This file is part of the WfMOpen project.
003: * Copyright (C) 2001-2003 Danet GmbH (www.danet.de), GS-AN.
004: * All rights reserved.
005: *
006: * This program is free software; you can redistribute it and/or modify
007: * it under the terms of the GNU General Public License as published by
008: * the Free Software Foundation; either version 2 of the License, or
009: * (at your option) any later version.
010: *
011: * This program is distributed in the hope that it will be useful,
012: * but WITHOUT ANY WARRANTY; without even the implied warranty of
013: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
014: * GNU General Public License for more details.
015: *
016: * You should have received a copy of the GNU General Public License
017: * along with this program; if not, write to the Free Software
018: * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
019: *
020: * $Id: Receiver.java,v 1.3 2007/05/03 21:58:25 mlipp Exp $
021: *
022: * $Log: Receiver.java,v $
023: * Revision 1.3 2007/05/03 21:58:25 mlipp
024: * Internal refactoring for making better use of local EJBs.
025: *
026: * Revision 1.2 2006/09/29 12:32:13 drmlipp
027: * Consistently using WfMOpen as projct name now.
028: *
029: * Revision 1.1.1.2 2004/08/18 15:17:38 drmlipp
030: * Update to 1.2
031: *
032: * Revision 1.8 2004/04/12 19:33:52 lipp
033: * Clarified application invocation interface.
034: *
035: * Revision 1.7 2004/03/20 21:08:44 lipp
036: * Added access to requesting processes' channels.
037: *
038: * Revision 1.6 2004/02/25 12:06:06 lipp
039: * Added some debugging info.
040: *
041: * Revision 1.5 2004/02/13 08:25:29 lipp
042: * Changed channel message data type to Map which is more appropriate.
043: *
044: * Revision 1.4 2004/02/06 10:25:46 lipp
045: * Finshed Receiver.
046: *
047: * Revision 1.3 2004/01/30 14:36:30 lipp
048: * Partial implementation of message receipt.
049: *
050: * Revision 1.2 2003/10/07 15:49:53 lipp
051: * Completed transmission sequence to activity.
052: *
053: * Revision 1.1 2003/10/06 20:43:22 lipp
054: * Continuing channel communication.
055: *
056: */
057: package de.danet.an.workflow.tools.chabacc;
058:
059: import java.util.ArrayList;
060: import java.util.List;
061: import java.util.Map;
062:
063: import java.rmi.RemoteException;
064:
065: import de.danet.an.util.CollectionsUtil;
066:
067: import de.danet.an.workflow.omgcore.CannotCompleteException;
068: import de.danet.an.workflow.omgcore.InvalidDataException;
069:
070: import de.danet.an.workflow.api.Activity;
071: import de.danet.an.workflow.api.DefaultProcessData;
072: import de.danet.an.workflow.api.FormalParameter;
073:
074: import de.danet.an.workflow.apix.ExtActivity;
075: import de.danet.an.workflow.apix.ExtProcess;
076: import de.danet.an.workflow.spis.aii.ApplicationNotStoppedException;
077: import de.danet.an.workflow.spis.aii.CannotExecuteException;
078: import de.danet.an.workflow.spis.aii.ToolAgent;
079:
080: /**
081: * This class provides a tool that receives messages from a
082: * channel. The first argument must be of type string and identifies
083: * the channel.
084: *
085: * @author <a href="mailto:lipp@danet.de">Michael Lipp</a>
086: * @version $Revision: 1.3 $
087: */
088: public class Receiver implements ToolAgent {
089:
090: private static final org.apache.commons.logging.Log logger = org.apache.commons.logging.LogFactory
091: .getLog(Receiver.class);
092:
093: private Integer listenerIndex = null;
094:
095: /**
096: * Creates an instance of <code>Receiver</code>
097: * with all attributes initialized to default values.
098: */
099: public Receiver() {
100: }
101:
102: /**
103: * Normally, the Receiver listens for data on a channel of the
104: * process it is invoked in. It is, however, also possible to send
105: * the data in a channel of its requesters. See the user manual
106: * for details.
107: * @param index used to determine the listener process, see
108: * user manual
109: */
110: public void setListenerIndex(String index) {
111: listenerIndex = new Integer(index);
112: }
113:
114: // Implementation of de.danet.an.workflow.spis.aii.ToolAgent
115:
116: /* Comment copied from interface. */
117: public void invoke(Activity activity,
118: FormalParameter[] formalParameter, Map map)
119: throws CannotExecuteException, RemoteException {
120: String channel = (String) map.get(formalParameter[0].id());
121: if (logger.isDebugEnabled()) {
122: logger.debug(activity + " wants message from channel "
123: + channel);
124: }
125: ExtProcess proc = (listenerIndex == null ? (ExtProcess) activity
126: .container()
127: : getListener(activity));
128: Map recvd = proc.lookForMessage(channel);
129: if (recvd == null) {
130: if (logger.isDebugEnabled()) {
131: logger.debug(activity
132: + " waits for message on channel " + channel);
133: }
134: ((ExtActivity) activity).waitOnChannel(proc.key(), channel);
135: return;
136: }
137: if (logger.isDebugEnabled()) {
138: logger.debug(activity + " found message "
139: + CollectionsUtil.toString(recvd) + " on channel "
140: + channel);
141: }
142: try {
143: activity.setResult(new DefaultProcessData(recvd));
144: activity.complete();
145: } catch (CannotCompleteException e) {
146: logger
147: .error(activity + " receiving from channel "
148: + "but cannot be completed?!: "
149: + e.getMessage(), e);
150: throw new CannotExecuteException(
151: "Receiving activity cannot be completed");
152: } catch (InvalidDataException e) {
153: logger.warn(activity
154: + " received invalid data from channel " + channel
155: + ": " + e.getMessage(), e);
156: throw new CannotExecuteException(e.getMessage());
157: }
158: }
159:
160: private ExtProcess getListener(Activity act)
161: throws CannotExecuteException, RemoteException {
162: ExtProcess proc = (ExtProcess) act.container();
163: if (listenerIndex.intValue() > 0) {
164: for (int i = listenerIndex.intValue(); i > 0; i--) {
165: proc = (ExtProcess) proc.requestingProcess();
166: if (proc == null) {
167: throw new CannotExecuteException(act
168: + " does not have " + listenerIndex
169: + " requesting anchestors");
170: }
171: }
172: } else {
173: List procs = new ArrayList();
174: while (true) {
175: procs.add(0, proc);
176: proc = (ExtProcess) proc.requestingProcess();
177: if (proc == null) {
178: break;
179: }
180: }
181: try {
182: proc = (ExtProcess) procs
183: .get(-listenerIndex.intValue());
184: } catch (IndexOutOfBoundsException e) {
185: throw new CannotExecuteException(
186: "Subflow depth from root of " + act
187: + " is less than "
188: + -listenerIndex.intValue());
189: }
190: }
191: return proc;
192: }
193:
194: /* Comment copied from interface. */
195: public void terminate(Activity activity)
196: throws ApplicationNotStoppedException {
197: }
198:
199: }
|