001: /*
002: * BEGIN_HEADER - DO NOT EDIT
003: *
004: * The contents of this file are subject to the terms
005: * of the Common Development and Distribution License
006: * (the "License"). You may not use this file except
007: * in compliance with the License.
008: *
009: * You can obtain a copy of the license at
010: * https://open-esb.dev.java.net/public/CDDLv1.0.html.
011: * See the License for the specific language governing
012: * permissions and limitations under the License.
013: *
014: * When distributing Covered Code, include this CDDL
015: * HEADER in each file and include the License file at
016: * https://open-esb.dev.java.net/public/CDDLv1.0.html.
017: * If applicable add the following below this CDDL HEADER,
018: * with the fields enclosed by brackets "[]" replaced with
019: * your own identifying information: Portions Copyright
020: * [year] [name of copyright owner]
021: */
022:
023: /*
024: * @(#)InboundMessageHandler.java
025: * Copyright 2004-2007 Sun Microsystems, Inc. All Rights Reserved.
026: *
027: * END_HEADER - DO NOT EDIT
028: */
029: package com.sun.jbi.binding.jms.handler;
030:
031: import com.sun.jbi.binding.jms.EndpointBean;
032: import com.sun.jbi.binding.jms.JMSBindingContext;
033: import com.sun.jbi.wsdl11wrapper.Wsdl11WrapperHelper;
034: import com.sun.jbi.binding.jms.mq.MQManager;
035: import com.sun.jbi.binding.jms.mq.MQSession;
036: import com.sun.jbi.binding.jms.config.ConfigConstants;
037:
038: import javax.jbi.messaging.ExchangeStatus;
039: import javax.jbi.messaging.MessageExchange;
040:
041: import javax.jms.JMSException;
042: import javax.jms.Message;
043:
044: import javax.xml.namespace.QName;
045:
046: /**
047: * This class handles only In MEPS.
048: *
049: * @author Sun Microsystems Inc.
050: */
051: public class InboundMessageHandler extends MessageHandlerImpl {
052: /**
053: * Creates a new InboundMessageHandler object.
054: */
055: public InboundMessageHandler() {
056: super ();
057: }
058:
059: /**
060: * Sanity.
061: */
062: public void check() {
063: EndpointBean eb = getBean();
064: Message msg = getJMSMessage();
065: MessageExchange exch = getNMSMessage();
066: QName operation = null;
067: String oplocal = null;
068: String opnamespace = null;
069:
070: if (msg != null) {
071: try {
072: oplocal = msg
073: .getStringProperty(MessageProperties.JBI_OPERATION_NAME);
074: opnamespace = msg
075: .getStringProperty(MessageProperties.JBI_OPERATION_NAMESPACE);
076: } catch (JMSException je) {
077: ;
078: }
079:
080: if (oplocal != null) {
081: operation = new QName(opnamespace, oplocal);
082: }
083:
084: if ((operation == null)
085: || (operation.toString().trim().equals(""))) {
086: operation = eb.getDefaultOperation();
087: }
088:
089: String mep = eb.getMEP(operation.getLocalPart());
090: }
091:
092: if (exch != null) {
093: operation = exch.getOperation();
094: }
095:
096: setCurrentOperation(operation);
097: }
098:
099: /**
100: * Handler contract.
101: */
102: public void execute() {
103: super .execute();
104: mLogger.fine(mStringTranslator.getString(JMS_INBOUND_HANDLER));
105:
106: /* do sanity first
107: */
108: check();
109:
110: if (!isValid()) {
111: mLogger.severe(mStringTranslator
112: .getString(JMS_INVALID_MESSAGE));
113:
114: return;
115: }
116:
117: if (getNMSMessage() != null) {
118: processNMSMessage();
119:
120: if (!isValid()) {
121: if (getNMSMessage().getStatus() == ExchangeStatus.ACTIVE) {
122: sendNMSStatus();
123: }
124: }
125: } else if (getJMSMessage() != null) {
126: processJMSMessage();
127:
128: if (!isValid()) {
129: sendJMSError();
130: }
131: } else {
132: mLogger.severe(mStringTranslator
133: .getString(JMS_INVALID_MESSAGE));
134:
135: return;
136: }
137: }
138:
139: /**
140: * Main task for JMS message processing is done here.
141: */
142: private void processJMSMessage() {
143: Message msg = getJMSMessage();
144: EndpointBean eb = getBean();
145: MessageAdaptor adaptor = MessageAdaptorFactory.getAdaptor(msg,
146: eb.getDeploymentType());
147:
148: if (adaptor == null) {
149: mLogger.severe(mStringTranslator
150: .getString(JMS_UNSUPPORTED_MESSAGE));
151: setError(mStringTranslator
152: .getString(JMS_UNSUPPORTED_MESSAGE));
153:
154: return;
155: }
156:
157: Wsdl11WrapperHelper wrapperhelper;
158: if (eb.getDeploymentType().trim().equals("WSDL11")) {
159: wrapperhelper = new Wsdl11WrapperHelper(eb
160: .getWsdlDefinition());
161: adaptor.setEpilogueProcessor(wrapperhelper);
162: }
163:
164: String inputtype = eb.getInputType(getCurrentOperation()
165: .getLocalPart());
166:
167: if (inputtype == null) {
168: mLogger.severe(mStringTranslator
169: .getString(JMS_UNSUPPORTED_MESSAGE));
170: setError(mStringTranslator
171: .getString(JMS_UNSUPPORTED_MESSAGE));
172:
173: return;
174: }
175:
176: if (!adaptor.getName().equals(inputtype.trim())) {
177: mLogger.severe(mStringTranslator
178: .getString(JMS_INCONSISTENT_MESSAGE_TYPE));
179: setError(mStringTranslator
180: .getString(JMS_INCONSISTENT_MESSAGE_TYPE));
181:
182: //send errorr
183: return;
184: }
185:
186: mLogger.fine("CURRENT OPERATION "
187: + getCurrentOperation().getLocalPart());
188: mLogger.fine("CURRENT MEP "
189: + getBean()
190: .getMEP(getCurrentOperation().getLocalPart()));
191:
192: MessageExchange ex = MessageExchangeHelper
193: .createExchange(getBean().getMEP(
194: getCurrentOperation().getLocalPart()));
195: setNMSMessage(ex);
196: setNMSHeaders();
197:
198: if (getNMSMessage() == null) {
199: mLogger.severe(mStringTranslator
200: .getString(JMS_NMS_HEADERS_FAILED));
201: setError(mStringTranslator
202: .getString(JMS_NMS_HEADERS_FAILED));
203:
204: //free up any resources here. set them to null
205: return;
206: }
207:
208: adaptor
209: .convertJMStoNMSMessage(getJMSMessage(),
210: getNMSMessage());
211:
212: if (ex == null) {
213: mLogger.severe(mStringTranslator.getString(
214: JMS_JMS_TO_NMS_FAILED, adaptor.getError()));
215: setError(mStringTranslator.getString(JMS_JMS_TO_NMS_FAILED,
216: adaptor.getError()));
217:
218: //free up any resources here. set them to null
219: return;
220: }
221:
222: Object replyto = MessageExchangeHelper.getDestinationName(
223: adaptor.getJMSReplyTo(), eb);
224: registerMessage(ex.getExchangeId(), replyto);
225:
226: if (!send(ex)) {
227: mLogger
228: .severe(mStringTranslator
229: .getString(JMS_CANNOT_SEND));
230: setError(mStringTranslator.getString(JMS_CANNOT_SEND)
231: + "\n" + getError());
232: deRegisterMessage(ex.getExchangeId());
233:
234: return;
235: }
236:
237: return;
238: }
239:
240: /**
241: * Processes the NMR message.
242: */
243: private void processNMSMessage() {
244: MessageExchange exch = getNMSMessage();
245:
246: if (canTerminateConsumer()) {
247: return;
248: }
249:
250: String oper = getCurrentOperation().getLocalPart();
251: mLogger.fine("OPERATION " + oper);
252:
253: EndpointBean eb = getBean();
254: mLogger.fine("OUTPUT TYPE" + eb.getOutputType(oper));
255:
256: MessageAdaptor adaptor = MessageAdaptorFactory.getAdaptor(eb
257: .getOutputType(oper), eb.getDeploymentType());
258: MQSession session = getSession();
259:
260: if (session == null) {
261: return;
262: }
263:
264: Message msg = session.createJMSMessage(eb.getOutputType(oper));
265:
266: if (adaptor == null) {
267: mLogger.severe(mStringTranslator
268: .getString(JMS_UNSUPPORTED_MESSAGE));
269: setError(mStringTranslator
270: .getString(JMS_UNSUPPORTED_MESSAGE));
271:
272: return;
273: }
274:
275: Wsdl11WrapperHelper wrapperhelper = null;
276: if (eb.getDeploymentType().trim().equals("WSDL11")) {
277: wrapperhelper = new Wsdl11WrapperHelper(eb
278: .getWsdlDefinition());
279: adaptor.setEpilogueProcessor(wrapperhelper);
280: }
281:
282: adaptor.setEpilogueProcessor(wrapperhelper);
283: adaptor.convertNMStoJMSMessage(exch, msg);
284:
285: MQManager man = JMSBindingContext.getInstance().getMQManager();
286: Object lookup = getArtifact(exch.getExchangeId());
287:
288: if (!send(msg, lookup, session)) {
289: deRegisterMessage(exch.getExchangeId());
290:
291: return;
292: }
293:
294: deRegisterMessage(exch.getExchangeId());
295: clear();
296:
297: if (exch.getStatus() == ExchangeStatus.ACTIVE) {
298: sendNMSStatus();
299: }
300: }
301:
302: /**
303: * Sends an error to JMS.
304: */
305: private void sendJMSError() {
306: EndpointBean eb = getBean();
307:
308: String mep = eb.getMEP(getCurrentOperation().getLocalPart());
309:
310: if ((mep == null)
311: || (mep.trim().equals(ConfigConstants.IN_ONLY))) {
312: return;
313: }
314:
315: MessageAdaptor adaptor = MessageAdaptorFactory.getAdaptor(eb
316: .getOutputType(getCurrentOperation().getLocalPart()),
317: "");
318:
319: if (adaptor == null) {
320: mLogger.severe(mStringTranslator
321: .getString(JMS_IRRECOVERABLE_ERROR));
322:
323: return;
324: }
325:
326: MQSession session = getSession();
327:
328: if (session == null) {
329: mLogger.severe(mStringTranslator
330: .getString(JMS_IRRECOVERABLE_ERROR));
331:
332: return;
333: }
334:
335: Message msg = session.createJMSMessage(eb
336: .getOutputType(getCurrentOperation().getLocalPart()));
337: adaptor.updateJMSErrorMessage(msg, getError());
338:
339: MessageExchange exch = getNMSMessage();
340: Object replyto = MessageExchangeHelper.getDestinationName(
341: adaptor.getJMSReplyTo(), eb);
342:
343: if (!send(msg, replyto, session)) {
344: ;
345: }
346: }
347: }
|