001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017: package org.bpmscript.jbi.component;
018:
019: import java.util.Date;
020: import java.util.Timer;
021: import java.util.TimerTask;
022:
023: import javax.jbi.component.ComponentContext;
024: import javax.jbi.management.DeploymentException;
025: import javax.jbi.messaging.DeliveryChannel;
026: import javax.jbi.messaging.ExchangeStatus;
027: import javax.jbi.messaging.InOut;
028: import javax.jbi.messaging.MessageExchange;
029: import javax.jbi.messaging.MessagingException;
030: import javax.jbi.messaging.NormalizedMessage;
031: import javax.jbi.messaging.MessageExchange.Role;
032: import javax.jbi.servicedesc.ServiceEndpoint;
033: import javax.xml.datatype.DatatypeConfigurationException;
034: import javax.xml.datatype.DatatypeFactory;
035: import javax.xml.datatype.Duration;
036:
037: import org.apache.servicemix.common.BaseLifeCycle;
038: import org.apache.servicemix.common.Endpoint;
039: import org.apache.servicemix.common.ExchangeProcessor;
040:
041: /**
042: * @org.apache.xbean.XBean element="endpoint"
043: */
044: public class AlarmEndpoint extends Endpoint implements
045: ExchangeProcessor {
046:
047: private final class AlarmTimerTask extends TimerTask {
048: private final InOut out;
049:
050: private AlarmTimerTask(InOut out) {
051: super ();
052: this .out = out;
053: }
054:
055: @Override
056: public void run() {
057: try {
058: NormalizedMessage outMessage = out.createMessage();
059: out.setOutMessage(outMessage);
060: send(out);
061: } catch (MessagingException e) {
062: try {
063: fail(out, e);
064: } catch (MessagingException e1) {
065: logger.error(e1, e1);
066: }
067: }
068: }
069: }
070:
071: private ServiceEndpoint activated;
072: private DeliveryChannel channel;
073: // private MessageExchangeFactory exchangeFactory;
074: private Timer timer = new Timer();
075:
076: /* (non-Javadoc)
077: * @see org.apache.servicemix.common.Endpoint#getRole()
078: */
079: public Role getRole() {
080: return Role.PROVIDER;
081: }
082:
083: public void activate() throws Exception {
084: logger = this .serviceUnit.getComponent().getLogger();
085: ComponentContext ctx = getServiceUnit().getComponent()
086: .getComponentContext();
087: channel = ctx.getDeliveryChannel();
088: // exchangeFactory = channel.createExchangeFactory();
089: activated = ctx.activateEndpoint(service, endpoint);
090: start();
091: }
092:
093: public void deactivate() throws Exception {
094: stop();
095: ServiceEndpoint ep = activated;
096: activated = null;
097: ComponentContext ctx = getServiceUnit().getComponent()
098: .getComponentContext();
099: ctx.deactivateEndpoint(ep);
100: }
101:
102: public ExchangeProcessor getProcessor() {
103: return this ;
104: }
105:
106: public void validate() throws DeploymentException {
107: }
108:
109: protected void send(MessageExchange me) throws MessagingException {
110: if (me.getRole() == MessageExchange.Role.CONSUMER
111: && me.getStatus() == ExchangeStatus.ACTIVE) {
112: BaseLifeCycle lf = (BaseLifeCycle) getServiceUnit()
113: .getComponent().getLifeCycle();
114: lf.sendConsumerExchange(me, (Endpoint) this );
115: } else {
116: channel.send(me);
117: }
118: }
119:
120: protected void done(MessageExchange me) throws MessagingException {
121: me.setStatus(ExchangeStatus.DONE);
122: send(me);
123: }
124:
125: protected void fail(MessageExchange me, Exception error)
126: throws MessagingException {
127: me.setError(error);
128: send(me);
129: }
130:
131: public void start() throws Exception {
132: }
133:
134: public void stop() {
135: }
136:
137: public void process(MessageExchange exchange) throws Exception {
138: if (exchange.getStatus() == ExchangeStatus.ACTIVE
139: && exchange instanceof InOut) {
140: final InOut inOut = (InOut) exchange;
141: NormalizedMessage inMessage = inOut.getInMessage();
142: Number delay = (Number) inMessage.getProperty("delay");
143: Date date = (Date) inMessage.getProperty("date");
144: String duration = (String) inMessage
145: .getProperty("duration");
146: if (delay != null) {
147: timer.schedule(new AlarmTimerTask(inOut), delay
148: .longValue());
149: } else if (date != null) {
150: timer.schedule(new AlarmTimerTask(inOut), date);
151: } else if (duration != null) {
152: try {
153: Duration d = DatatypeFactory.newInstance()
154: .newDuration(duration);
155: timer.schedule(new AlarmTimerTask(inOut), d
156: .getTimeInMillis(new Date()));
157: } catch (DatatypeConfigurationException e) {
158: fail(exchange, e);
159: }
160: } else {
161: fail(
162: inOut,
163: new Exception(
164: "Either the delay or the date property should be set"));
165: }
166: }
167: }
168:
169: }
|