001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017: package org.apache.servicemix.jbi.nmr.flow;
018:
019: import java.util.concurrent.locks.ReadWriteLock;
020: import java.util.concurrent.locks.ReentrantReadWriteLock;
021:
022: import javax.jbi.JBIException;
023: import javax.jbi.management.LifeCycleMBean;
024: import javax.jbi.messaging.MessageExchange;
025: import javax.jbi.messaging.MessageExchange.Role;
026: import javax.jbi.messaging.MessagingException;
027: import javax.jbi.servicedesc.ServiceEndpoint;
028: import javax.management.JMException;
029: import javax.management.MBeanAttributeInfo;
030: import javax.management.ObjectName;
031:
032: import org.apache.commons.logging.Log;
033: import org.apache.commons.logging.LogFactory;
034: import org.apache.servicemix.JbiConstants;
035: import org.apache.servicemix.executors.ExecutorFactory;
036: import org.apache.servicemix.jbi.framework.ComponentMBeanImpl;
037: import org.apache.servicemix.jbi.framework.ComponentNameSpace;
038: import org.apache.servicemix.jbi.management.AttributeInfoHelper;
039: import org.apache.servicemix.jbi.management.BaseLifeCycle;
040: import org.apache.servicemix.jbi.messaging.ExchangePacket;
041: import org.apache.servicemix.jbi.messaging.MessageExchangeImpl;
042: import org.apache.servicemix.jbi.nmr.Broker;
043: import org.apache.servicemix.jbi.servicedesc.InternalEndpoint;
044:
045: /**
046: * A simple Straight through flow
047: *
048: * @version $Revision: 564607 $
049: */
050: public abstract class AbstractFlow extends BaseLifeCycle implements
051: Flow {
052:
053: protected final Log log = LogFactory.getLog(getClass());
054: protected Broker broker;
055: protected ExecutorFactory executorFactory;
056: private ReadWriteLock lock = new ReentrantReadWriteLock();
057: private Thread suspendThread;
058: private String name;
059:
060: /**
061: * Initialize the Region
062: *
063: * @param br
064: * @throws JBIException
065: */
066: public void init(Broker br) throws JBIException {
067: this .broker = br;
068: this .executorFactory = br.getContainer().getExecutorFactory();
069: // register self with the management context
070: ObjectName objectName = br.getContainer()
071: .getManagementContext().createObjectName(this );
072: try {
073: br.getContainer().getManagementContext().registerMBean(
074: objectName, this , LifeCycleMBean.class);
075: } catch (JMException e) {
076: throw new JBIException(
077: "Failed to register MBean with the ManagementContext",
078: e);
079: }
080: }
081:
082: /**
083: * start the flow
084: * @throws JBIException
085: */
086: public void start() throws JBIException {
087: super .start();
088: }
089:
090: /**
091: * stop the flow
092: * @throws JBIException
093: */
094: public void stop() throws JBIException {
095: if (log.isDebugEnabled()) {
096: log.debug("Called Flow stop");
097: }
098: if (suspendThread != null) {
099: suspendThread.interrupt();
100: }
101: super .stop();
102: }
103:
104: /**
105: * shutDown the flow
106: * @throws JBIException
107: */
108: public void shutDown() throws JBIException {
109: if (log.isDebugEnabled()) {
110: log.debug("Called Flow shutdown");
111: }
112: broker.getContainer().getManagementContext().unregisterMBean(
113: this );
114: super .shutDown();
115: }
116:
117: /**
118: * Distribute an ExchangePacket
119: * @param packet
120: * @throws JBIException
121: */
122: public void send(MessageExchange me) throws JBIException {
123: if (log.isDebugEnabled()) {
124: log.debug("Called Flow send");
125: }
126: // do send
127: try {
128: lock.readLock().lock();
129: doSend((MessageExchangeImpl) me);
130: } finally {
131: lock.readLock().unlock();
132: }
133: }
134:
135: /**
136: * suspend the flow to prevent any message exchanges
137: */
138: public synchronized void suspend() {
139: if (log.isDebugEnabled()) {
140: log.debug("Called Flow suspend");
141: }
142: lock.writeLock().lock();
143: suspendThread = Thread.currentThread();
144: }
145:
146: /**
147: * resume message exchange processing
148: */
149: public synchronized void resume() {
150: if (log.isDebugEnabled()) {
151: log.debug("Called Flow resume");
152: }
153: lock.writeLock().unlock();
154: suspendThread = null;
155: }
156:
157: /**
158: * Do the Flow specific routing
159: * @param packet
160: * @throws JBIException
161: */
162: protected abstract void doSend(MessageExchangeImpl me)
163: throws JBIException;
164:
165: /**
166: * Distribute an ExchangePacket
167: *
168: * @param packet
169: * @throws MessagingException
170: */
171: protected void doRouting(MessageExchangeImpl me)
172: throws MessagingException {
173: ComponentNameSpace id = me.getRole() == Role.PROVIDER ? me
174: .getDestinationId() : me.getSourceId();
175: //As the MessageExchange could come from another container - ensure we get the local Component
176: ComponentMBeanImpl lcc = broker.getContainer().getRegistry()
177: .getComponent(id.getName());
178: if (lcc != null) {
179: if (lcc.getDeliveryChannel() != null) {
180: lcc.getDeliveryChannel().processInBound(me);
181: } else {
182: throw new MessagingException("Component "
183: + id.getName() + " is shut down");
184: }
185: } else {
186: throw new MessagingException("No component named "
187: + id.getName()
188: + " - Couldn't route MessageExchange " + me);
189: }
190: }
191:
192: /**
193: * Get an array of MBeanAttributeInfo
194: *
195: * @return array of AttributeInfos
196: * @throws JMException
197: */
198: public MBeanAttributeInfo[] getAttributeInfos() throws JMException {
199: AttributeInfoHelper helper = new AttributeInfoHelper();
200: helper.addAttribute(getObjectToManage(), "description",
201: "The type of flow");
202: return AttributeInfoHelper.join(super .getAttributeInfos(),
203: helper.getAttributeInfos());
204: }
205:
206: /**
207: * Check if the given packet should be persisted or not.
208: * @param packet
209: * @return
210: */
211: protected boolean isPersistent(MessageExchange me) {
212: ExchangePacket packet = ((MessageExchangeImpl) me).getPacket();
213: if (packet.getPersistent() != null) {
214: return packet.getPersistent().booleanValue();
215: } else {
216: return broker.getContainer().isPersistent();
217: }
218: }
219:
220: protected boolean isTransacted(MessageExchange me) {
221: return me
222: .getProperty(MessageExchange.JTA_TRANSACTION_PROPERTY_NAME) != null;
223: }
224:
225: protected boolean isSynchronous(MessageExchange me) {
226: Boolean sync = (Boolean) me.getProperty(JbiConstants.SEND_SYNC);
227: return sync != null && sync.booleanValue();
228: }
229:
230: protected boolean isClustered(MessageExchange me) {
231: MessageExchangeImpl mei = (MessageExchangeImpl) me;
232: if (mei.getDestinationId() == null) {
233: ServiceEndpoint se = me.getEndpoint();
234: if (se instanceof InternalEndpoint) {
235: return ((InternalEndpoint) se).isClustered();
236: // Unknown: assume this is not clustered
237: } else {
238: return false;
239: }
240: } else {
241: String destination = mei.getDestinationId()
242: .getContainerName();
243: String source = mei.getSourceId().getContainerName();
244: return !source.equals(destination);
245: }
246: }
247:
248: public Broker getBroker() {
249: return broker;
250: }
251:
252: /**
253: * Get the type of the item
254: * @return the type
255: */
256: public String getType() {
257: return "Flow";
258: }
259:
260: /**
261: * Get the name of the item
262: * @return the name
263: */
264: public String getName() {
265: if (this .name == null) {
266: String n = super .getName();
267: if (n.endsWith("Flow")) {
268: n = n.substring(0, n.length() - 4);
269: }
270: return n;
271: } else {
272: return this .name;
273: }
274: }
275:
276: public void setName(String name) {
277: this .name = name;
278: }
279:
280: /**
281: * @return the executorFactory
282: */
283: public ExecutorFactory getExecutorFactory() {
284: return executorFactory;
285: }
286:
287: }
|