001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017: package org.bpmscript.jbi.component;
018:
019: import java.io.Serializable;
020: import java.util.Date;
021: import java.util.List;
022: import java.util.Timer;
023: import java.util.TimerTask;
024:
025: import javax.jbi.JBIException;
026: import javax.jbi.component.ComponentContext;
027: import javax.jbi.management.DeploymentException;
028: import javax.jbi.messaging.DeliveryChannel;
029: import javax.jbi.messaging.ExchangeStatus;
030: import javax.jbi.messaging.InOut;
031: import javax.jbi.messaging.MessageExchange;
032: import javax.jbi.messaging.MessagingException;
033: import javax.jbi.messaging.NormalizedMessage;
034: import javax.jbi.messaging.MessageExchange.Role;
035: import javax.jbi.servicedesc.ServiceEndpoint;
036: import javax.jms.JMSException;
037: import javax.jms.Message;
038: import javax.jms.MessageListener;
039: import javax.jms.MessageProducer;
040: import javax.jms.ObjectMessage;
041: import javax.jms.Session;
042: import javax.jms.Topic;
043: import javax.xml.datatype.DatatypeConfigurationException;
044: import javax.xml.datatype.DatatypeFactory;
045: import javax.xml.datatype.Duration;
046:
047: import org.apache.servicemix.common.BaseLifeCycle;
048: import org.apache.servicemix.common.Endpoint;
049: import org.apache.servicemix.common.ExchangeProcessor;
050: import org.bpmscript.BpmScriptException;
051: import org.bpmscript.jms.JmsTemplate;
052: import org.bpmscript.jms.JmsTemplateException;
053: import org.bpmscript.jms.SessionCallback;
054:
055: /**
056: * @org.apache.xbean.XBean element="endpoint"
057: *
058: * TODO: think about start and stop lifecycle
059: */
060: public class JmsAlarmEndpoint extends Endpoint implements
061: ExchangeProcessor, MessageListener {
062:
063: protected final transient org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory
064: .getLog(getClass());
065:
066: private ServiceEndpoint activated;
067: private DeliveryChannel channel;
068: private IExchangeStore exchangeStore;
069:
070: /*
071: * (non-Javadoc)
072: *
073: * @see org.apache.servicemix.common.Endpoint#getRole()
074: */
075: public Role getRole() {
076: return Role.PROVIDER;
077: }
078:
079: public void activate() throws Exception {
080: logger = this .serviceUnit.getComponent().getLogger();
081: ComponentContext ctx = getServiceUnit().getComponent()
082: .getComponentContext();
083: channel = ctx.getDeliveryChannel();
084: activated = ctx.activateEndpoint(service, endpoint);
085:
086: try {
087: topic = template.getTopic(topicName);
088:
089: List<IExchange> exchanges = exchangeStore
090: .findAllExchanges();
091:
092: for (IExchange exchange : exchanges) {
093: NormalizedMessage inMessage = exchange.getExchange()
094: .getMessage("in");
095: Number delay = (Number) inMessage.getProperty("delay");
096: Date date = (Date) inMessage.getProperty("date");
097: String duration = (String) inMessage
098: .getProperty("duration");
099: scheduleAlarm(exchange.getExchangeId(), delay, date,
100: duration);
101: }
102:
103: template.setMessageListener(topic, null, this );
104:
105: } catch (JmsTemplateException e) {
106: throw new JBIException(e);
107: }
108:
109: start();
110: }
111:
112: public void deactivate() throws Exception {
113: stop();
114: ServiceEndpoint ep = activated;
115: activated = null;
116: ComponentContext ctx = getServiceUnit().getComponent()
117: .getComponentContext();
118: ctx.deactivateEndpoint(ep);
119: }
120:
121: public ExchangeProcessor getProcessor() {
122: return this ;
123: }
124:
125: public void validate() throws DeploymentException {
126: }
127:
128: protected void send(MessageExchange me) throws MessagingException {
129: if (me.getRole() == MessageExchange.Role.CONSUMER
130: && me.getStatus() == ExchangeStatus.ACTIVE) {
131: BaseLifeCycle lf = (BaseLifeCycle) getServiceUnit()
132: .getComponent().getLifeCycle();
133: lf.sendConsumerExchange(me, (Endpoint) this );
134: } else {
135: channel.send(me);
136: }
137: }
138:
139: protected void done(MessageExchange me) throws MessagingException {
140: me.setStatus(ExchangeStatus.DONE);
141: send(me);
142: }
143:
144: protected void fail(MessageExchange me, Exception error)
145: throws MessagingException {
146: me.setError(error);
147: send(me);
148: }
149:
150: public void start() throws Exception {
151: }
152:
153: public void stop() {
154: }
155:
156: private JmsTemplate template = null;
157: private String topicName = null;
158:
159: private final class AlarmTimerTask extends TimerTask {
160: private final String exchangeId;
161:
162: private AlarmTimerTask(String exchangeId) {
163: super ();
164: this .exchangeId = exchangeId;
165: }
166:
167: @Override
168: public void run() {
169:
170: log.debug("running alarm for " + exchangeId
171: + " with scheduled for "
172: + super .scheduledExecutionTime());
173:
174: try {
175: if (!exchangeStore.remove(exchangeId,
176: new IExchangeCallback() {
177: public void execute(IExchange exchange)
178: throws Exception {
179: InOut inOut = (InOut) exchange
180: .getExchange();
181: try {
182: NormalizedMessage outMessage = inOut
183: .createMessage();
184: inOut.setOutMessage(outMessage);
185: send(inOut);
186: } catch (MessagingException e) {
187: throw e;
188: }
189: }
190: })) {
191: log.warn("no message found for exchangeId "
192: + exchangeId);
193: }
194: } catch (BpmScriptException e) {
195: log.error(e, e);
196: }
197:
198: }
199: }
200:
201: private Timer timer = new Timer();
202: private Topic topic;
203:
204: public void process(final MessageExchange exchange)
205: throws MessagingException {
206:
207: if (exchange.getStatus() == ExchangeStatus.ACTIVE
208: && exchange instanceof InOut) {
209:
210: // validation
211: final InOut inOut = (InOut) exchange;
212: NormalizedMessage inMessage = inOut.getInMessage();
213: Number delay = (Number) inMessage.getProperty("delay");
214: Date date = (Date) inMessage.getProperty("date");
215: String duration = (String) inMessage
216: .getProperty("duration");
217: if (delay == null && date == null && duration == null) {
218: fail(
219: inOut,
220: new Exception(
221: "Either the delay, duration or the date property should be set"));
222: }
223: if (duration != null) {
224: try {
225: DatatypeFactory.newInstance().newDuration(duration);
226: } catch (DatatypeConfigurationException e) {
227: fail(exchange, e);
228: }
229: }
230:
231: try {
232: template.execute(true, Session.AUTO_ACKNOWLEDGE,
233: new SessionCallback() {
234: public Object doInJms(Session session)
235: throws Exception {
236: try {
237:
238: exchangeStore.save(inOut);
239:
240: ObjectMessage message = session
241: .createObjectMessage();
242: message
243: .setObject((Serializable) exchange);
244:
245: MessageProducer topicProducer = session
246: .createProducer(topic);
247:
248: topicProducer.send(message);
249: session.commit();
250: } catch (Exception e) {
251: session.rollback();
252: throw e;
253: } catch (Throwable e) {
254: session.rollback();
255: throw new Exception(e);
256: }
257: return null;
258: }
259: });
260: } catch (JmsTemplateException ex) {
261: fail(exchange, ex);
262: }
263:
264: log.debug("added exchange " + exchange.getExchangeId()
265: + " to queue and topic");
266:
267: }
268: }
269:
270: public synchronized void onMessage(Message message) {
271: InOut inOut;
272: try {
273: inOut = (InOut) ((ObjectMessage) message).getObject();
274: String exchangeId = inOut.getExchangeId();
275: log.debug("adding alarm for " + exchangeId);
276: NormalizedMessage inMessage = inOut.getInMessage();
277: Number delay = (Number) inMessage.getProperty("delay");
278: Date date = (Date) inMessage.getProperty("date");
279: String duration = (String) inMessage
280: .getProperty("duration");
281: scheduleAlarm(exchangeId, delay, date, duration);
282: } catch (JMSException e1) {
283: log.error(e1, e1);
284: }
285: }
286:
287: protected void scheduleAlarm(String exchangeId, Number delay,
288: Date date, String duration) {
289: if (delay != null) {
290: timer.schedule(new AlarmTimerTask(exchangeId), delay
291: .longValue());
292: } else if (date != null) {
293: timer.schedule(new AlarmTimerTask(exchangeId), date);
294: } else if (duration != null) {
295: try {
296: Duration d = DatatypeFactory.newInstance().newDuration(
297: duration);
298: timer.schedule(new AlarmTimerTask(exchangeId), d
299: .getTimeInMillis(new Date()));
300: } catch (DatatypeConfigurationException e) {
301: log.error(e, e);
302: }
303: } else {
304: log
305: .error("Either the delay, date or duration property should be set");
306: }
307: }
308:
309: public void setTopicName(String topicName) {
310: this .topicName = topicName;
311: }
312:
313: public void setTemplate(JmsTemplate template) {
314: this .template = template;
315: }
316:
317: public void setExchangeStore(IExchangeStore exchangeStore) {
318: this.exchangeStore = exchangeStore;
319: }
320:
321: }
|