001: /*
002: * This file is part of the WfMOpen project.
003: * Copyright (C) 2001-2003 Danet GmbH (www.danet.de), GS-AN.
004: * All rights reserved.
005: *
006: * This program is free software; you can redistribute it and/or modify
007: * it under the terms of the GNU General Public License as published by
008: * the Free Software Foundation; either version 2 of the License, or
009: * (at your option) any later version.
010: *
011: * This program is distributed in the hope that it will be useful,
012: * but WITHOUT ANY WARRANTY; without even the implied warranty of
013: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
014: * GNU General Public License for more details.
015: *
016: * You should have received a copy of the GNU General Public License
017: * along with this program; if not, write to the Free Software
018: * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
019: *
020: * $Id: Sender.java,v 1.3 2007/01/18 09:57:15 drmlipp Exp $
021: *
022: * $Log: Sender.java,v $
023: * Revision 1.3 2007/01/18 09:57:15 drmlipp
024: * Fixed problem with J2EE 1.4 allowing only one session per JMS connection.
025: *
026: * Revision 1.2 2006/09/29 12:32:13 drmlipp
027: * Consistently using WfMOpen as projct name now.
028: *
029: * Revision 1.1.1.2 2004/08/18 15:17:38 drmlipp
030: * Update to 1.2
031: *
032: * Revision 1.18 2004/07/15 08:35:34 lipp
033: * Removed direct invocability again, as there seem to be problem
034: * allocating the topic connection. Will be investigated later.
035: *
036: * Revision 1.17 2004/07/14 14:54:37 lipp
037: * Restrictions for being direct invocable relaxed a bit.
038: *
039: * Revision 1.16 2004/07/02 13:43:37 lipp
040: * Fixed JMS usage.
041: *
042: * Revision 1.15 2004/04/12 19:33:52 lipp
043: * Clarified application invocation interface.
044: *
045: * Revision 1.14 2004/03/31 19:36:20 lipp
046: * Completed implementation of Activity.abandon(String).
047: *
048: * Revision 1.13 2004/03/21 12:37:55 lipp
049: * Fixed bug with self-sending.
050: *
051: * Revision 1.12 2004/03/20 21:08:44 lipp
052: * Added access to requesting processes' channels.
053: *
054: * Revision 1.11 2004/02/25 12:13:08 lipp
055: * Fixed silly bug.
056: *
057: * Revision 1.10 2004/02/25 12:06:06 lipp
058: * Added some debugging info.
059: *
060: * Revision 1.9 2004/02/19 15:52:12 schlue
061: * Changed default for local delivery.
062: *
063: * Revision 1.8 2004/02/13 10:01:34 lipp
064: * Changed result type for result provider to Map which is more
065: * appropriate.
066: *
067: * Revision 1.7 2004/02/13 08:25:29 lipp
068: * Changed channel message data type to Map which is more appropriate.
069: *
070: * Revision 1.6 2004/02/06 13:37:35 lipp
071: * Added channel close notification.
072: *
073: * Revision 1.5 2004/01/28 16:11:38 lipp
074: * Re-implementation of chabacc, Sender working.
075: *
076: * Revision 1.4 2003/10/08 15:15:35 lipp
077: * Updated channel handling.
078: *
079: * Revision 1.3 2003/10/07 15:49:53 lipp
080: * Completed transmission sequence to activity.
081: *
082: * Revision 1.2 2003/10/06 15:25:56 lipp
083: * Using proper complete procedure.
084: *
085: * Revision 1.1 2003/10/05 15:40:18 lipp
086: * Continuing chabacc implementation.
087: *
088: */
089: package de.danet.an.workflow.tools.chabacc;
090:
091: import java.util.ArrayList;
092: import java.util.HashMap;
093: import java.util.List;
094: import java.util.Map;
095:
096: import java.rmi.RemoteException;
097:
098: import de.danet.an.util.CollectionsUtil;
099: import de.danet.an.util.EJBUtil;
100: import de.danet.an.util.ResourceNotAvailableException;
101:
102: import de.danet.an.workflow.omgcore.InvalidDataException;
103:
104: import de.danet.an.workflow.api.Activity;
105: import de.danet.an.workflow.api.ActivityUniqueKey;
106: import de.danet.an.workflow.api.FormalParameter;
107:
108: import de.danet.an.workflow.apix.ExtProcess;
109: import de.danet.an.workflow.ejbs.util.QueuerLocal;
110: import de.danet.an.workflow.ejbs.util.QueuerLocalHome;
111: import de.danet.an.workflow.spis.aii.ApplicationNotStoppedException;
112: import de.danet.an.workflow.spis.aii.CannotExecuteException;
113: import de.danet.an.workflow.spis.aii.ResultProvider;
114: import de.danet.an.workflow.spis.aii.ToolAgent;
115:
116: /**
117: * This class provides a tool that sends messages to a channel. The
118: * first argument must be of type string and identifies the
119: * channel. The second and subsequent arguments may be of any type and
120: * will be send to the channel unmodified.
121: *
122: * @author <a href="mailto:lipp@danet.de">Michael Lipp</a>
123: * @version $Revision: 1.3 $
124: */
125: public class Sender implements ToolAgent, ResultProvider {
126:
127: private static final org.apache.commons.logging.Log logger = org.apache.commons.logging.LogFactory
128: .getLog(Sender.class);
129:
130: private boolean localDelivery = false;
131: private Integer originatorIndex = null;
132:
133: private QueuerLocal queuerCache = null;
134:
135: /**
136: * Access cached event queuer.
137: * @return the event queuer
138: */
139: private QueuerLocal queuer() throws ResourceNotAvailableException {
140: if (queuerCache == null) {
141: queuerCache = (QueuerLocal) EJBUtil.createSession(
142: QueuerLocalHome.class, "java:comp/env/ejb/Queuer");
143: }
144: return queuerCache;
145: }
146:
147: /**
148: * Creates an instance of <code>Sender</code>
149: * with all attributes initialized to default values.
150: */
151: public Sender() {
152: }
153:
154: /**
155: * By default, messages are delivered to clients listening on the
156: * channel only. If messages should also be delivered to receiver
157: * tools, this property must be set to <code>true</code>. Note
158: * that in this case the channel cannot be used for bidirectional
159: * communication.
160: *
161: * @param enabled a string denoting a boolean value. Any string
162: * that, ignoring case, equals "true" will enable local delivery.
163: */
164: public void setLocalDelivery(String enabled) {
165: localDelivery = (new Boolean(enabled)).booleanValue();
166: }
167:
168: /**
169: * Normally, the Sender sends data on a channel of the process it
170: * is invoked in. It is, however, also possible to send the data
171: * in a channel of its requesters. See the user manual for
172: * details.
173: * @param index used to determine the originating process, see
174: * user manual
175: */
176: public void setOriginatorIndex(String index) {
177: originatorIndex = new Integer(index);
178: }
179:
180: // Implementation of de.danet.an.workflow.spis.aii.ToolAgent
181:
182: /* Comment copied from interface. */
183: public void invoke(Activity activity,
184: FormalParameter[] formalParameter, Map map)
185: throws CannotExecuteException, RemoteException {
186: ActivityUniqueKey auk = null;
187: String channelName = null;
188: try {
189: auk = activity.uniqueKey();
190: channelName = (String) map.get(formalParameter[0].id());
191: if (logger.isDebugEnabled()) {
192: logger.debug(auk + " sending data to " + channelName);
193: }
194: Map data = new HashMap();
195: for (int i = 1; i < formalParameter.length; i++) {
196: String fid = formalParameter[i].id();
197: data.put(fid, map.get(fid));
198: }
199: if (logger.isDebugEnabled()) {
200: logger.debug("Sending "
201: + CollectionsUtil.toString(data));
202: }
203: ExtProcess proc = (originatorIndex == null ? (ExtProcess) activity
204: .container()
205: : getOriginator(activity));
206: queuer().broadcastChannelMessage(proc.key(), channelName,
207: data);
208: if (localDelivery) {
209: // Loop back message to process for tool-to-tool messages
210: proc.deliverChannelMessage(channelName, data);
211: }
212: if (logger.isDebugEnabled()) {
213: logger.debug(auk + " sent data to "
214: + map.get(formalParameter[0].id()));
215: }
216: } catch (ResourceNotAvailableException e) {
217: logger.error(e.getMessage(), e);
218: throw new CannotExecuteException(e.getMessage());
219: } catch (InvalidDataException e) {
220: logger.error(e.getMessage(), e);
221: throw new CannotExecuteException(e.getMessage());
222: }
223: }
224:
225: private ExtProcess getOriginator(Activity act)
226: throws CannotExecuteException, RemoteException {
227: ExtProcess proc = (ExtProcess) act.container();
228: if (originatorIndex.intValue() > 0) {
229: for (int i = originatorIndex.intValue(); i > 0; i--) {
230: proc = (ExtProcess) proc.requestingProcess();
231: if (proc == null) {
232: throw new CannotExecuteException(act
233: + " does not have " + originatorIndex
234: + " requesting anchestors");
235: }
236: }
237: } else {
238: List procs = new ArrayList();
239: while (true) {
240: procs.add(0, proc);
241: proc = (ExtProcess) proc.requestingProcess();
242: if (proc == null) {
243: break;
244: }
245: }
246: try {
247: proc = (ExtProcess) procs.get(-originatorIndex
248: .intValue());
249: } catch (IndexOutOfBoundsException e) {
250: throw new CannotExecuteException(
251: "Subflow depth from root of " + act
252: + " is less than "
253: + -originatorIndex.intValue());
254: }
255: }
256: return proc;
257: }
258:
259: /* Comment copied from interface. */
260: public void terminate(Activity activity)
261: throws ApplicationNotStoppedException {
262: }
263:
264: /* Comment copied from interface. */
265: public Object result() {
266: return null;
267: }
268:
269: }
|