001: /*
002: * BEGIN_HEADER - DO NOT EDIT
003: *
004: * The contents of this file are subject to the terms
005: * of the Common Development and Distribution License
006: * (the "License"). You may not use this file except
007: * in compliance with the License.
008: *
009: * You can obtain a copy of the license at
010: * https://open-esb.dev.java.net/public/CDDLv1.0.html.
011: * See the License for the specific language governing
012: * permissions and limitations under the License.
013: *
014: * When distributing Covered Code, include this CDDL
015: * HEADER in each file and include the License file at
016: * https://open-esb.dev.java.net/public/CDDLv1.0.html.
017: * If applicable add the following below this CDDL HEADER,
018: * with the fields enclosed by brackets "[]" replaced with
019: * your own identifying information: Portions Copyright
020: * [year] [name of copyright owner]
021: */
022:
023: /*
024: * @(#)MessageHandlerImpl.java
025: * Copyright 2004-2007 Sun Microsystems, Inc. All Rights Reserved.
026: *
027: * END_HEADER - DO NOT EDIT
028: */
029: package com.sun.jbi.binding.jms.handler;
030:
031: import com.sun.jbi.StringTranslator;
032:
033: import com.sun.jbi.binding.jms.EndpointBean;
034: import com.sun.jbi.binding.jms.JMSBindingContext;
035: import com.sun.jbi.binding.jms.JMSBindingResources;
036: import com.sun.jbi.binding.jms.MessageRegistry;
037:
038: import com.sun.jbi.binding.jms.mq.MQDestination;
039: import com.sun.jbi.binding.jms.mq.MQManager;
040: import com.sun.jbi.binding.jms.mq.MQSession;
041:
042: import com.sun.jbi.binding.jms.config.ConfigConstants;
043: import com.sun.jbi.binding.jms.util.UtilBase;
044:
045: import java.util.logging.Logger;
046:
047: import javax.jbi.messaging.DeliveryChannel;
048: import javax.jbi.messaging.ExchangeStatus;
049: import javax.jbi.messaging.MessageExchange;
050: import javax.jbi.messaging.MessagingException;
051:
052: import javax.jbi.servicedesc.ServiceEndpoint;
053:
054: import javax.jms.Message;
055:
056: import javax.xml.namespace.QName;
057:
058: /**
059: * Implementation for a message handler.
060: *
061: * @author Sun Microsystems Inc.
062: */
063: class MessageHandlerImpl extends UtilBase implements MessageHandler,
064: JMSBindingResources {
065: /**
066: * Logger.
067: */
068: protected Logger mLogger;
069: /**
070: * Translator.
071: */
072: protected StringTranslator mStringTranslator;
073: /**
074: * Bean.
075: */
076: private EndpointBean mBean;
077: /**
078: * Message.
079: */
080: private Message mMessage;
081: /**
082: * Message exchange.
083: */
084: private MessageExchange mExchange;
085: /**
086: * Current operation.
087: */
088: private QName mCurrentOperation;
089:
090: /**
091: * Creates a new MessageHandlerImpl object.
092: */
093: public MessageHandlerImpl() {
094: mLogger = JMSBindingContext.getInstance().getLogger();
095: mStringTranslator = JMSBindingContext.getInstance()
096: .getStringTranslator();
097: }
098:
099: /**
100: * Gets the artifact.
101: *
102: * @param id id.
103: *
104: * @return artifact object.
105: */
106: public Object getArtifact(String id) {
107: MessageRegistry reg = JMSBindingContext.getInstance()
108: .getMessageRegistry();
109:
110: return reg.getObject(id);
111: }
112:
113: /**
114: * Setter for property mBean.
115: *
116: * @param mBean New value of property mBean.
117: */
118: public void setBean(EndpointBean mBean) {
119: this .mBean = mBean;
120: }
121:
122: /**
123: * Getter for property mBean.
124: *
125: * @return Value of property mBean.
126: */
127: public EndpointBean getBean() {
128: return mBean;
129: }
130:
131: /**
132: * Sets the current operation.
133: *
134: * @param op operation q name.
135: */
136: public void setCurrentOperation(QName op) {
137: mCurrentOperation = op;
138: }
139:
140: /**
141: * Gets the current operation.
142: *
143: * @return current operation.
144: */
145: public QName getCurrentOperation() {
146: return mCurrentOperation;
147: }
148:
149: /**
150: * Sets the error.
151: *
152: * @param err error string.
153: */
154: public void setError(String err) {
155: super .setError(err);
156: }
157:
158: /**
159: * Gets the error.
160: *
161: * @return error string.
162: */
163: public String getError() {
164: return super .getError();
165: }
166:
167: /**
168: * Sets the exception.
169: *
170: * @param ex exception.
171: */
172: public void setException(Exception ex) {
173: super .setException(ex);
174: }
175:
176: /**
177: * Gets the exception.
178: *
179: * @return exception.
180: */
181: public Exception getException() {
182: return super .getException();
183: }
184:
185: /**
186: * Sets the JMS message.
187: *
188: * @param msg JMSmesage.
189: */
190: public void setJMSMessage(Message msg) {
191: mMessage = msg;
192: }
193:
194: /**
195: * Gets the JMS message.
196: *
197: * @return JMS message.
198: */
199: public Message getJMSMessage() {
200: return mMessage;
201: }
202:
203: /**
204: *Sets the NMR headers.
205: */
206: public void setNMSHeaders() {
207: QName servicename = new QName(mBean
208: .getValue(ConfigConstants.SERVICE_NAMESPACE), mBean
209: .getValue(ConfigConstants.SERVICENAME));
210: mLogger.fine("Namespace "
211: + mBean.getValue(ConfigConstants.SERVICE_NAMESPACE));
212:
213: mLogger.fine("Service :"
214: + mBean.getValue(ConfigConstants.SERVICENAME));
215:
216: mLogger.fine("Endpoint :"
217: + mBean.getValue(ConfigConstants.ENDPOINTNAME));
218:
219: String endpointname = mBean
220: .getValue(ConfigConstants.ENDPOINTNAME);
221: ServiceEndpoint ref = null;
222:
223: try {
224: ref = mBean.getServiceEndpoint();
225:
226: if (ref == null) {
227: setServiceEndpoint();
228: ref = mBean.getServiceEndpoint();
229:
230: if (ref == null) {
231: mExchange = null;
232: mLogger.severe(mStringTranslator.getString(
233: JMS_CANNOT_GET_SERVICE, servicename
234: .toString()));
235: setError(mStringTranslator.getString(
236: JMS_CANNOT_GET_SERVICE, servicename
237: .toString()));
238:
239: return;
240: }
241: }
242:
243: QName iqname = new QName(mBean
244: .getValue(ConfigConstants.INTERFACE_NAMESPACE),
245: mBean.getValue(ConfigConstants.INTERFACE_LOCALNAME));
246: mExchange.setService(mBean.getServiceEndpoint()
247: .getServiceName());
248: mExchange.setEndpoint(ref);
249: mExchange.setInterfaceName(iqname);
250: mExchange.setOperation(getCurrentOperation());
251: } catch (Exception e) {
252: e.printStackTrace();
253: setException(e);
254: }
255: }
256:
257: /**
258: * Sets the NMR message.
259: *
260: * @param exch NMR msg.
261: */
262: public void setNMSMessage(MessageExchange exch) {
263: mExchange = exch;
264: }
265:
266: /**
267: * Gets the nMR message.
268: *
269: * @return NMR msg.
270: */
271: public MessageExchange getNMSMessage() {
272: return mExchange;
273: }
274:
275: /**
276: * Get session.
277: *
278: * @return MQ session.
279: */
280: public MQSession getSession() {
281: MQSession session = null;
282:
283: while (session == null) {
284: //this sshud be wait / notify not buys waiting.
285: // will change it soon
286: session = JMSBindingContext.getInstance().getMQManager()
287: .getSession(false, mBean);
288: }
289:
290: try {
291: session.init();
292: } catch (Exception e) {
293: e.printStackTrace();
294: setException(e);
295: JMSBindingContext.getInstance().getMQManager()
296: .releaseSession(session);
297: session = null;
298: }
299:
300: return session;
301: }
302:
303: /**
304: * Checks if the ME can be terminated.
305: *
306: * @return true if can terminate.
307: */
308: public boolean canTerminateConsumer() {
309: if (mExchange.getPattern().toString().equals(
310: ConfigConstants.IN_ONLY)) {
311: if (mExchange.getStatus() == ExchangeStatus.DONE) {
312: mLogger.fine(mStringTranslator.getString(
313: JMS_INONLY_DONE, mExchange.getExchangeId()));
314: } else {
315: mLogger.fine(mStringTranslator.getString(
316: JMS_INONLY_ERROR, mExchange.getExchangeId()));
317: }
318:
319: //Update message history here
320: return true;
321: }
322:
323: if (mExchange.getPattern().toString().equals(
324: ConfigConstants.ROBUST_IN_ONLY)) {
325: if (mExchange.getStatus() == ExchangeStatus.DONE) {
326: mLogger.fine(mStringTranslator.getString(
327: JMS_ROBUSTINONLY_DONE, mExchange
328: .getExchangeId()));
329:
330: //Update message history here
331: return true;
332: }
333:
334: else {
335: mLogger.fine(mStringTranslator.getString(
336: JMS_ROBUSTINONLY_DONE, mExchange
337: .getExchangeId()));
338:
339: return false;
340: }
341: }
342:
343: if (mExchange.getPattern().toString().equals(
344: ConfigConstants.IN_OPTIONAL_OUT)) {
345: mLogger.info(mStringTranslator.getString(
346: JMS_INOPTOUT_SUPPORT, mExchange.getExchangeId()));
347:
348: return true;
349: }
350:
351: return false;
352: }
353:
354: /**
355: * Can or cannot end a exchange.
356: *
357: * @return true if can terminate.
358: */
359: public boolean canTerminateProvider() {
360: if (mExchange.getPattern().toString().equals(
361: ConfigConstants.ROBUST_IN_ONLY)) {
362: if (mExchange.getStatus() == ExchangeStatus.DONE) {
363: mLogger.fine(mStringTranslator.getString(
364: JMS_ROBUSTINONLY_DONE, mExchange
365: .getExchangeId()));
366:
367: //Update message history here
368: return true;
369: } else if (mExchange.getStatus() == ExchangeStatus.ERROR) {
370: mLogger.fine(mStringTranslator.getString(
371: JMS_ROBUSTINONLY_ERROR, mExchange
372: .getExchangeId()));
373:
374: //Update message history here
375: return true;
376: } else {
377: return false;
378: }
379: }
380:
381: if (mExchange.getPattern().toString().equals(
382: ConfigConstants.IN_OUT)) {
383: if (mExchange.getStatus() == ExchangeStatus.DONE) {
384: mLogger.fine(mStringTranslator.getString(
385: JMS_INOUT_DONE, mExchange.getExchangeId()));
386:
387: //Update message history here
388: return true;
389: } else if (mExchange.getStatus() == ExchangeStatus.ERROR) {
390: mLogger.fine(mStringTranslator.getString(
391: JMS_INOUT_ERROR, mExchange.getExchangeId()));
392:
393: //Update message history here
394: return true;
395: } else {
396: return false;
397: }
398: }
399:
400: if (mExchange.getPattern().toString().equals(
401: ConfigConstants.IN_OPTIONAL_OUT)) {
402: mLogger.info(mStringTranslator.getString(
403: JMS_INOPTOUT_SUPPORT, mExchange.getExchangeId()));
404:
405: return true;
406: }
407:
408: return false;
409: }
410:
411: /**
412: * Clear.
413: */
414: public void clear() {
415: super .clear();
416: }
417:
418: /**
419: * Deregisters the Message.
420: *
421: * @param id message id.
422: */
423: public void deRegisterMessage(String id) {
424: MessageRegistry reg = JMSBindingContext.getInstance()
425: .getMessageRegistry();
426: reg.deregisterExchange(id);
427: }
428:
429: /**
430: *
431: */
432: public void execute() {
433: mLogger.fine(mStringTranslator.getString(JMS_HANDLER_BASE));
434:
435: /* do sanity first
436: */
437: }
438:
439: /**
440: *
441: *
442: * @param id
443: * @param ob
444: */
445: public void registerMessage(String id, Object ob) {
446: MessageRegistry reg = JMSBindingContext.getInstance()
447: .getMessageRegistry();
448: reg.registerExchange(id, ob);
449: }
450:
451: /**
452: * Method that sends any message exchnage on the channel.
453: *
454: * @param msg exchange to be sent
455: *
456: * @return true if success else false
457: */
458: public boolean send(MessageExchange msg) {
459: /*
460: * Send message mSendMessage
461: * register in message registry
462: * start timer -1 indicates no timer
463: */
464: DeliveryChannel channel = JMSBindingContext.getInstance()
465: .getChannel();
466:
467: if (msg == null) {
468: mLogger.severe(mStringTranslator
469: .getString(JMS_NULL_EXCHANGE));
470: setError(getError() + "\n"
471: + mStringTranslator.getString(JMS_NULL_EXCHANGE));
472:
473: return false;
474: }
475:
476: try {
477: channel.send(msg);
478: } catch (MessagingException me) {
479: me.printStackTrace();
480: setException(me);
481:
482: return false;
483: }
484:
485: return true;
486: }
487:
488: /**
489: *
490: *
491: * @param msg
492: * @param dest
493: * @param session
494: *
495: * @return
496: */
497: public boolean send(Message msg, Object dest, MQSession session) {
498: if (dest == null) {
499: mLogger.severe(mStringTranslator
500: .getString(JMS_NO_DESTINATION));
501: setError(mStringTranslator.getString(JMS_NO_DESTINATION));
502: JMSBindingContext.getInstance().getMQManager()
503: .releaseSession(session);
504:
505: return false;
506: }
507: MQDestination destination = null;
508: destination = JMSBindingContext.getInstance().getMQManager()
509: .getDestination(dest);
510:
511: if (!session.sendMessage(msg, destination)) {
512: mLogger.severe(mStringTranslator.getString(JMS_CANNOT_SEND)
513: + session.getError());
514: setError(mStringTranslator.getString(JMS_CANNOT_SEND)
515: + session.getError());
516: JMSBindingContext.getInstance().getMQManager()
517: .releaseSession(session);
518:
519: return false;
520: }
521:
522: JMSBindingContext.getInstance().getMQManager().releaseSession(
523: session);
524: destination = null;
525: return true;
526: }
527:
528: /**
529: *
530: *
531: * @return
532: */
533: public boolean sendJMSMessage() {
534: return false;
535: }
536:
537: /**
538: *
539: *
540: * @return
541: */
542: public boolean sendNMSMessage() {
543: return false;
544: }
545:
546: /**
547: *
548: */
549: public void sendNMSStatus() {
550: Exception exp = getException();
551:
552: try {
553: if (exp != null) {
554: mLogger.fine("**ERROR STATUS**");
555: mExchange.setError(exp);
556: } else {
557: mLogger.fine("**DONE STATUS** "
558: + mExchange.getExchangeId());
559: mExchange.setStatus(ExchangeStatus.DONE);
560: }
561:
562: send(mExchange);
563: }
564:
565: catch (Exception e) {
566: mLogger.severe(mStringTranslator.getString(
567: JMS_CANNOT_SEND_STATUS, mExchange.getExchangeId()));
568: }
569: }
570:
571: /**
572: *
573: */
574: private void setServiceEndpoint() {
575: /**
576: * Always try to get an endpoint only before sending this gives the
577: * maximum probability of finding an endpoint, also we give an engine
578: * enough time to activate the endpoint
579: */
580: try {
581: DeliveryChannel chnl = JMSBindingContext.getInstance()
582: .getChannel();
583: QName qname = new QName(mBean
584: .getValue(ConfigConstants.SERVICE_NAMESPACE), mBean
585: .getValue(ConfigConstants.SERVICENAME));
586: QName iname = new QName(mBean
587: .getValue(ConfigConstants.INTERFACE_NAMESPACE),
588: mBean.getValue(ConfigConstants.INTERFACE_LOCALNAME));
589: ServiceEndpoint[] ref = JMSBindingContext.getInstance()
590: .getContext().getEndpointsForService(qname);
591: ServiceEndpoint tmp = null;
592:
593: if ((ref == null) || (ref.length == 0)) {
594: ref = JMSBindingContext.getInstance().getContext()
595: .getEndpoints(iname);
596: }
597:
598: if ((ref == null) || (ref.length == 0)) {
599: throw new Exception("Cannot locate endpoint " + qname);
600: } else {
601: tmp = ref[0];
602:
603: for (int k = 0; k < ref.length; k++) {
604: if (ref[k]
605: .getEndpointName()
606: .trim()
607: .equals(
608: mBean
609: .getValue(ConfigConstants.ENDPOINTNAME))) {
610: tmp = ref[k];
611: }
612: }
613: }
614:
615: mBean.setServiceEndpoint(tmp);
616: } catch (Exception me) {
617: me.printStackTrace();
618: }
619: }
620: }
|