001: /*
002: * This file is part of the WfMOpen project.
003: * Copyright (C) 2001-2004 Danet GmbH (www.danet.de), GS-AN.
004: * All rights reserved.
005: *
006: * This program is free software; you can redistribute it and/or modify
007: * it under the terms of the GNU General Public License as published by
008: * the Free Software Foundation; either version 2 of the License, or
009: * (at your option) any later version.
010: *
011: * This program is distributed in the hope that it will be useful,
012: * but WITHOUT ANY WARRANTY; without even the implied warranty of
013: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
014: * GNU General Public License for more details.
015: *
016: * You should have received a copy of the GNU General Public License
017: * along with this program; if not, write to the Free Software
018: * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
019: *
020: * $Id: ChannelImpl.java,v 1.6 2007/02/17 21:22:56 mlipp Exp $
021: *
022: * $Log: ChannelImpl.java,v $
023: * Revision 1.6 2007/02/17 21:22:56 mlipp
024: * Improved service caching and topic connection handling.
025: *
026: * Revision 1.5 2007/02/16 21:43:23 mlipp
027: * Improved.
028: *
029: * Revision 1.4 2007/02/15 13:52:37 drmlipp
030: * Fixed channel release problem.
031: *
032: * Revision 1.3 2007/02/07 16:49:42 mlipp
033: * Added receiving with timeout.
034: *
035: * Revision 1.2 2006/09/29 12:32:09 drmlipp
036: * Consistently using WfMOpen as projct name now.
037: *
038: * Revision 1.1.1.1 2004/08/18 15:17:38 drmlipp
039: * Update to 1.2
040: *
041: * Revision 1.1 2004/02/21 21:31:00 lipp
042: * Some more refactoring to resolve cyclic dependencies.
043: *
044: * Revision 1.5 2004/02/19 15:36:36 lipp
045: * Fixed race condition.
046: *
047: * Revision 1.4 2004/02/13 08:25:29 lipp
048: * Changed channel message data type to Map which is more appropriate.
049: *
050: * Revision 1.3 2004/02/12 13:49:07 lipp
051: * Renamed sendChannelMessage to submitChannelMessage to emphasize
052: * difference to deliverChannelMessage.
053: *
054: * Revision 1.2 2004/01/30 14:36:30 lipp
055: * Partial implementation of message receipt.
056: *
057: * Revision 1.1 2004/01/28 16:11:38 lipp
058: * Re-implementation of chabacc, Sender working.
059: *
060: */
061: package de.danet.an.workflow.ejbs.client;
062:
063: import java.util.Map;
064:
065: import java.rmi.RemoteException;
066:
067: import javax.jms.JMSException;
068: import javax.jms.Message;
069: import javax.jms.ObjectMessage;
070: import javax.jms.Session;
071: import javax.jms.TopicConnection;
072: import javax.jms.TopicSession;
073: import javax.jms.TopicSubscriber;
074:
075: import de.danet.an.workflow.omgcore.InvalidDataException;
076: import de.danet.an.workflow.omgcore.WfExecutionObject.State;
077:
078: import de.danet.an.workflow.api.Channel;
079: import de.danet.an.workflow.api.InvalidKeyException;
080: import de.danet.an.workflow.api.Process;
081:
082: import de.danet.an.workflow.apix.ExtProcess;
083:
084: /**
085: * This class provides an implementation of <code>Channel</code>. Note
086: * that instances "live" in the client.
087: *
088: * @author <a href="mailto:lipp@danet.de">Michael Lipp</a>
089: * @version $Revision: 1.6 $
090: */
091:
092: public class ChannelImpl implements Channel {
093:
094: private static final org.apache.commons.logging.Log logger = org.apache.commons.logging.LogFactory
095: .getLog(ChannelImpl.class);
096:
097: private StandardWorkflowService workflowService = null;
098: private String channelName = null;
099: private String processMgr = null;
100: private String processKey = null;
101: private boolean sendOnly;
102:
103: private TopicConnection subsConn = null;
104: private TopicSession subsSess = null;
105: private TopicSubscriber subscriber = null;
106:
107: private boolean processIsOpen;
108:
109: /**
110: * Creates an instance of <code>ChannelImpl</code>
111: * with all attributes initialized to the given values.
112: * @param wfs the workflow service
113: * @param process the process this channel belongs to
114: * @param name the channel name
115: * @throws RemoteException if a system-level error occurs.
116: */
117: public ChannelImpl(StandardWorkflowService wfs, Process process,
118: String name, boolean sendOnly) throws RemoteException {
119: workflowService = wfs;
120: // Thou shan't cache remote interfaces in the client
121: processMgr = process.manager().name();
122: processKey = process.key();
123: channelName = name;
124: this .sendOnly = sendOnly;
125: if (!sendOnly) {
126: // Initialize subscriber now, so we do not loose messages.
127: initSubscriber();
128: }
129:
130: processIsOpen = (process.workflowState() == State.OPEN);
131: }
132:
133: private void initSubscriber() throws RemoteException {
134: try {
135: subsConn = workflowService.topicConnection();
136: subsSess = subsConn.createTopicSession(false,
137: Session.AUTO_ACKNOWLEDGE);
138: subscriber = subsSess.createSubscriber(workflowService
139: .channelMessageOutTopic(), "processKey = '"
140: + processKey + "'"
141: + " AND ((messageType = 'CLOSE_NOTIFICATION')"
142: + " OR (messageType = 'DATA' "
143: + " AND channelName = '" + channelName
144: + "'))", true);
145: } catch (JMSException e) {
146: logger
147: .error("Problem creating topic session. May be system "
148: + "misconfiguration or temporary failure. Assuming "
149: + "latter, mapped to RemoteException: "
150: + e.getMessage());
151: throw new RemoteException(e.getMessage());
152: }
153: }
154:
155: /* Comment copied from interface. */
156: public Process process() throws RemoteException,
157: InvalidKeyException {
158: return workflowService.processDirectory().lookupProcess(
159: processMgr, processKey);
160: }
161:
162: /* Comment copied from interface. */
163: public String name() throws RemoteException {
164: return channelName;
165: }
166:
167: /* Comment copied from interface. */
168: public Map receiveMessage() throws RemoteException {
169: return receiveMessage(0);
170: }
171:
172: /* Comment copied from interface. */
173: public Map receiveMessage(long timeout) throws RemoteException {
174: if (sendOnly) {
175: throw new IllegalStateException(
176: "Channel has been created for sending only.");
177: }
178: try {
179: if (!processIsOpen) {
180: return null;
181: }
182: while (true) {
183: long startedAt = System.currentTimeMillis();
184: Message msg = subscriber.receive(timeout);
185: if (msg == null) {
186: return null;
187: }
188: if (msg.getStringProperty("messageType").equals(
189: "CLOSE_NOTIFICATION")) {
190: processIsOpen = false;
191: return null;
192: }
193: if (msg instanceof ObjectMessage) {
194: if (logger.isDebugEnabled()) {
195: logger.debug("Received message from process "
196: + processKey + ", channel "
197: + channelName);
198: }
199: return (Map) ((ObjectMessage) msg).getObject();
200: }
201: if (timeout != 0) {
202: // reduce timeout by time spent to handle "wrong" message
203: timeout -= (System.currentTimeMillis() - startedAt);
204: if (timeout <= 0) {
205: return null;
206: }
207: }
208: }
209: } catch (JMSException e) {
210: logger.warn("JMSException (mapped to RemoteException): "
211: + e.getMessage(), e);
212: throw new RemoteException(e.getMessage());
213: }
214: }
215:
216: /* Comment copied from interface. */
217: public void sendMessage(Map data) throws InvalidKeyException,
218: InvalidDataException, RemoteException {
219: if (logger.isDebugEnabled()) {
220: logger.debug("Sending message to channel " + channelName
221: + " of process " + processKey);
222: }
223: ((ExtProcess) process())
224: .submitChannelMessage(channelName, data);
225: }
226:
227: /**
228: * Free any allocated resources.
229: */
230: public void release() {
231: if (subsSess != null) {
232: try {
233: subscriber.close();
234: subsSess.close();
235: subsConn.close();
236: } catch (JMSException e) {
237: logger.warn("Cannot close session (ignored): "
238: + e.getMessage());
239: }
240: subscriber = null;
241: subsSess = null;
242: subsConn = null;
243: }
244: }
245: }
|