001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017: package org.apache.servicemix.eip.patterns;
018:
019: import java.util.Iterator;
020:
021: import javax.jbi.management.DeploymentException;
022: import javax.jbi.messaging.ExchangeStatus;
023: import javax.jbi.messaging.InOnly;
024: import javax.jbi.messaging.MessageExchange;
025: import javax.jbi.messaging.NormalizedMessage;
026:
027: import org.apache.servicemix.JbiConstants;
028: import org.apache.servicemix.eip.EIPEndpoint;
029: import org.apache.servicemix.eip.support.ExchangeTarget;
030: import org.apache.servicemix.jbi.util.MessageUtil;
031: import org.apache.servicemix.store.Store;
032:
033: /**
034: *
035: * A WireTap component can be used to forward a copy of the input message to a listener.
036: * This component implements the
037: * <a href="http://www.enterpriseintegrationpatterns.com/WireTap.html">WireTap</a>
038: * pattern.
039: * It can handle all 4 standard MEPs, but will only send an In-Only MEP to the listener.
040: * In addition, this component is fully asynchronous and uses an exchange store to provide
041: * full HA and recovery for clustered / persistent flows.
042: *
043: * @author gnodet
044: * @version $Revision: 376451 $
045: * @org.apache.xbean.XBean element="wire-tap"
046: * description="A WireTap"
047: */
048: public class WireTap extends EIPEndpoint {
049:
050: /**
051: * The main target destination which will receive the exchange
052: */
053: private ExchangeTarget target;
054: /**
055: * The listener destination for in messages
056: */
057: private ExchangeTarget inListener;
058: /**
059: * The listener destination for out messages
060: */
061: private ExchangeTarget outListener;
062: /**
063: * The listener destination for fault messages
064: */
065: private ExchangeTarget faultListener;
066: /**
067: * The correlation property used by this component
068: */
069: private String correlation;
070: /**
071: * If copyProperties is <code>true</code>, properties
072: * on the in message will be copied to the out / fault
073: * message before it is sent.
074: */
075: private boolean copyProperties;
076:
077: /**
078: * @return Returns the target.
079: */
080: public ExchangeTarget getTarget() {
081: return target;
082: }
083:
084: /**
085: * @param target The target to set.
086: */
087: public void setTarget(ExchangeTarget target) {
088: this .target = target;
089: this .wsdlExchangeTarget = target;
090: }
091:
092: /**
093: * @return Returns the faultListener.
094: */
095: public ExchangeTarget getFaultListener() {
096: return faultListener;
097: }
098:
099: /**
100: * @param faultListener The faultListener to set.
101: */
102: public void setFaultListener(ExchangeTarget faultListener) {
103: this .faultListener = faultListener;
104: }
105:
106: /**
107: * @return Returns the inListener.
108: */
109: public ExchangeTarget getInListener() {
110: return inListener;
111: }
112:
113: /**
114: * @param inListener The inListener to set.
115: */
116: public void setInListener(ExchangeTarget inListener) {
117: this .inListener = inListener;
118: }
119:
120: /**
121: * @return Returns the outListener.
122: */
123: public ExchangeTarget getOutListener() {
124: return outListener;
125: }
126:
127: /**
128: * @param outListener The outListener to set.
129: */
130: public void setOutListener(ExchangeTarget outListener) {
131: this .outListener = outListener;
132: }
133:
134: /**
135: * @return the copyProperties
136: */
137: public boolean isCopyProperties() {
138: return copyProperties;
139: }
140:
141: /**
142: * @param copyProperties the copyProperties to set
143: */
144: public void setCopyProperties(boolean copyProperties) {
145: this .copyProperties = copyProperties;
146: }
147:
148: /* (non-Javadoc)
149: * @see org.apache.servicemix.eip.EIPEndpoint#validate()
150: */
151: public void validate() throws DeploymentException {
152: super .validate();
153: // Check target
154: if (target == null) {
155: throw new IllegalArgumentException(
156: "target should be set to a valid ExchangeTarget");
157: }
158: // Create correlation property
159: correlation = "WireTap.Correlation." + getService() + "."
160: + getEndpoint();
161: }
162:
163: /* (non-Javadoc)
164: * @see org.apache.servicemix.eip.EIPEndpoint#processSync(javax.jbi.messaging.MessageExchange)
165: */
166: protected void processSync(MessageExchange exchange)
167: throws Exception {
168: // Create exchange for target
169: MessageExchange tme = getExchangeFactory().createExchange(
170: exchange.getPattern());
171: target.configureTarget(tme, getContext());
172: sendSyncToListenerAndTarget(exchange, tme, inListener, "in",
173: false);
174: if (tme.getStatus() == ExchangeStatus.DONE) {
175: done(exchange);
176: } else if (tme.getStatus() == ExchangeStatus.ERROR) {
177: fail(exchange, tme.getError());
178: } else if (tme.getFault() != null) {
179: sendSyncToListenerAndTarget(tme, exchange, faultListener,
180: "fault", isCopyProperties());
181: done(tme);
182: } else if (tme.getMessage("out") != null) {
183: sendSyncToListenerAndTarget(tme, exchange, outListener,
184: "out", isCopyProperties());
185: done(tme);
186: } else {
187: done(tme);
188: throw new IllegalStateException("Exchange status is "
189: + ExchangeStatus.ACTIVE
190: + " but has no Out nor Fault message");
191: }
192: }
193:
194: /* (non-Javadoc)
195: * @see org.apache.servicemix.eip.EIPEndpoint#processAsync(javax.jbi.messaging.MessageExchange)
196: */
197: protected void processAsync(MessageExchange exchange)
198: throws Exception {
199: if (exchange.getRole() == MessageExchange.Role.PROVIDER
200: && exchange.getProperty(correlation) == null) {
201: // Create exchange for target
202: MessageExchange tme = getExchangeFactory().createExchange(
203: exchange.getPattern());
204: if (store.hasFeature(Store.CLUSTERED)) {
205: exchange.setProperty(JbiConstants.STATELESS_PROVIDER,
206: Boolean.TRUE);
207: tme.setProperty(JbiConstants.STATELESS_CONSUMER,
208: Boolean.TRUE);
209: }
210: target.configureTarget(tme, getContext());
211: // Set correlations
212: exchange.setProperty(correlation, tme.getExchangeId());
213: tme.setProperty(correlation, exchange.getExchangeId());
214: // Put exchange to store
215: store.store(exchange.getExchangeId(), exchange);
216: // Send in to listener and target
217: sendToListenerAndTarget(exchange, tme, inListener, "in",
218: false);
219: // Mimic the exchange on the other side and send to needed listener
220: } else {
221: String id = (String) exchange.getProperty(correlation);
222: if (id == null) {
223: if (exchange.getRole() == MessageExchange.Role.CONSUMER
224: && exchange.getStatus() != ExchangeStatus.ACTIVE) {
225: // This must be a listener status, so ignore
226: return;
227: }
228: throw new IllegalStateException(correlation
229: + " property not found");
230: }
231: MessageExchange org = (MessageExchange) store.load(id);
232: if (org == null) {
233: throw new IllegalStateException(
234: "Could not load original exchange with id "
235: + id);
236: }
237: // Reproduce DONE status to the other side
238: if (exchange.getStatus() == ExchangeStatus.DONE) {
239: done(org);
240: // Reproduce ERROR status to the other side
241: } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
242: fail(org, exchange.getError());
243: // Reproduce faults to the other side and listeners
244: } else if (exchange.getFault() != null) {
245: store.store(exchange.getExchangeId(), exchange);
246: sendToListenerAndTarget(exchange, org, faultListener,
247: "fault", isCopyProperties());
248: // Reproduce answers to the other side
249: } else if (exchange.getMessage("out") != null) {
250: store.store(exchange.getExchangeId(), exchange);
251: sendToListenerAndTarget(exchange, org, outListener,
252: "out", isCopyProperties());
253: } else {
254: throw new IllegalStateException("Exchange status is "
255: + ExchangeStatus.ACTIVE
256: + " but has no Out nor Fault message");
257: }
258: }
259: }
260:
261: private void sendToListenerAndTarget(MessageExchange source,
262: MessageExchange dest, ExchangeTarget listener,
263: String message, boolean copy) throws Exception {
264: if (listener != null) {
265: NormalizedMessage msg = MessageUtil.copy(source
266: .getMessage(message));
267: InOnly lme = getExchangeFactory().createInOnlyExchange();
268: if (store.hasFeature(Store.CLUSTERED)) {
269: lme.setProperty(JbiConstants.STATELESS_CONSUMER,
270: Boolean.TRUE);
271: }
272: listener.configureTarget(lme, getContext());
273: MessageUtil.transferToIn(msg, lme);
274: send(lme);
275: MessageUtil.transferTo(msg, dest, message);
276: if (copy) {
277: copyExchangeProperties(dest, "in", message);
278: }
279: send(dest);
280: } else {
281: MessageUtil.transferTo(source, dest, message);
282: if (copy) {
283: copyExchangeProperties(dest, "in", message);
284: }
285: send(dest);
286: }
287: }
288:
289: private void sendSyncToListenerAndTarget(MessageExchange source,
290: MessageExchange dest, ExchangeTarget listener,
291: String message, boolean copy) throws Exception {
292: if (listener != null) {
293: NormalizedMessage msg = MessageUtil.copy(source
294: .getMessage(message));
295: InOnly lme = getExchangeFactory().createInOnlyExchange();
296: if (store.hasFeature(Store.CLUSTERED)) {
297: lme.setProperty(JbiConstants.STATELESS_CONSUMER,
298: Boolean.TRUE);
299: }
300: listener.configureTarget(lme, getContext());
301: MessageUtil.transferToIn(msg, lme);
302: sendSync(lme);
303: MessageUtil.transferTo(msg, dest, message);
304: if (copy) {
305: copyExchangeProperties(dest, "in", message);
306: }
307: sendSync(dest);
308: } else {
309: MessageUtil.transferTo(source, dest, message);
310: if (copy) {
311: copyExchangeProperties(dest, "in", message);
312: }
313: sendSync(dest);
314: }
315: }
316:
317: /**
318: * A utility method to copy properties from the input of the original
319: * exchange to the output of the original exchange.
320: *
321: * @param exchange
322: * @param srcMessage
323: * @param @dstMessage
324: * @throws Exception
325: */
326: private void copyExchangeProperties(MessageExchange exchange,
327: String srcMessage, String dstMessage) {
328: NormalizedMessage src = exchange.getMessage(srcMessage);
329: NormalizedMessage dst = exchange.getMessage(dstMessage);
330: for (Iterator iter = src.getPropertyNames().iterator(); iter
331: .hasNext();) {
332: String name = (String) iter.next();
333: if (dst.getProperty(name) == null) {
334: Object prop = src.getProperty(name);
335: dst.setProperty(name, prop);
336: }
337: }
338: }
339:
340: }
|