001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: *
017: * $Header:$
018: */
019: package org.apache.beehive.controls.system.jms.impl;
020:
021: import java.io.Serializable;
022: import java.lang.reflect.Method;
023: import java.lang.reflect.InvocationTargetException;
024: import java.util.Iterator;
025: import java.util.Map;
026: import java.util.HashMap;
027:
028: import javax.jms.JMSException;
029: import javax.jms.MapMessage;
030: import javax.jms.MessageProducer;
031: import javax.jms.Queue;
032: import javax.jms.QueueConnection;
033: import javax.jms.QueueConnectionFactory;
034: import javax.jms.ConnectionFactory;
035: import javax.jms.QueueSession;
036: import javax.jms.Session;
037: import javax.jms.BytesMessage;
038: import javax.jms.TopicConnection;
039: import javax.jms.TopicConnectionFactory;
040: import javax.jms.Topic;
041: import javax.jms.TopicPublisher;
042: import javax.jms.TopicSession;
043:
044: import org.apache.beehive.controls.api.ControlException;
045: import org.apache.beehive.controls.api.events.EventHandler;
046: import org.apache.beehive.controls.api.context.ControlBeanContext;
047: import org.apache.beehive.controls.api.context.Context;
048: import org.apache.beehive.controls.api.context.ResourceContext;
049: import org.apache.beehive.controls.api.bean.ControlImplementation;
050: import org.apache.beehive.controls.api.bean.Extensible;
051: import org.apache.beehive.controls.api.bean.Control;
052: import org.apache.beehive.controls.system.jms.JMSControl;
053: import org.apache.beehive.controls.system.jndi.JndiControlBean;
054:
055: /**
056: * <p/>
057: * Implementation of the {@link JMSControl}.
058: * </p>
059: */
060: @ControlImplementation
061: public class JMSControlImpl implements JMSControl, Extensible,
062: java.io.Serializable {
063:
064: private static Class XMLOBJ_CLASS = null;
065:
066: static {
067: try {
068: XMLOBJ_CLASS = Class
069: .forName("org.apache.xmlbeans.XmlObject");
070: } catch (ClassNotFoundException e) {
071: // NOOP if apache xml beans not present
072: }
073: }
074:
075: /**
076: * Implementation of the {@link org.apache.beehive.controls.system.jms.JMSControl#getSession()} method.
077: *
078: * @return the {@link Session}
079: * @throws ControlException when an error occurs trying to create a JMS session
080: */
081: public Session getSession() throws ControlException {
082: if (_session == null) {
083: try {
084: switch (getDestinationType()) {
085: case Auto:
086: determineDestination();
087: return getSession();
088: case Topic:
089: createTopicSession();
090: break;
091: case Queue:
092: createQueueSession();
093: break;
094: }
095: } catch (JMSException e) {
096: throw new ControlException(
097: "Failure to get JMS connection or session", e);
098: }
099: }
100: return _session;
101: }
102:
103: /**
104: * Get the JMS {@link javax.jms.Destination}. Implementation of the
105: * {@link org.apache.beehive.controls.system.jms.JMSControl#getDestination()} method.
106: */
107: public javax.jms.Destination getDestination()
108: throws ControlException {
109: if (_destination == null) {
110: _destination = (javax.jms.Destination) getJndiControl()
111: .getResource(
112: getDestinationProperties().sendJndiName(),
113: javax.jms.Destination.class);
114: }
115: return _destination;
116: }
117:
118: /**
119: * Get the JMS {@link javax.jms.Connection}. Implementation of the
120: * {@link JMSControl#getConnection()}.
121: */
122: public javax.jms.Connection getConnection() throws ControlException {
123: getSession();
124: return _connection;
125: }
126:
127: /**
128: * @see JMSControl#setHeaders
129: */
130: public void setHeaders(Map headers) {
131: if (headers == null)
132: return;
133:
134: HashMap<JMSControl.HeaderType, Object> map = new HashMap<JMSControl.HeaderType, Object>();
135: for (Object name : headers.keySet()) {
136:
137: Object value = headers.get(name);
138:
139: HeaderType type = null;
140: /*
141: * Allow for string valued or HeaderType valued
142: * map entries.
143: */
144: if (name instanceof HeaderType)
145: type = (HeaderType) name;
146: else {
147: if (name.equals(HeaderType.JMSCorrelationID.toString()))
148: type = HeaderType.JMSCorrelationID;
149: else if (name.equals(HeaderType.JMSDeliveryMode
150: .toString()))
151: type = HeaderType.JMSDeliveryMode;
152: else if (name.equals(HeaderType.JMSExpiration
153: .toString()))
154: type = HeaderType.JMSExpiration;
155: else if (name
156: .equals(HeaderType.JMSMessageID.toString()))
157: type = HeaderType.JMSMessageID;
158: else if (name.equals(HeaderType.JMSPriority.toString()))
159: type = HeaderType.JMSPriority;
160: else if (name.equals(HeaderType.JMSRedelivered
161: .toString()))
162: type = HeaderType.JMSRedelivered;
163: else if (name
164: .equals(HeaderType.JMSTimestamp.toString()))
165: type = HeaderType.JMSTimestamp;
166: else if (name.equals(HeaderType.JMSType.toString()))
167: type = HeaderType.JMSType;
168: else
169: throw new IllegalArgumentException(
170: "Invalid JMS header type '" + name + "'");
171: }
172: map.put(type, value);
173: }
174:
175: for (HeaderType key : map.keySet())
176: setHeader(key, map.get(key));
177: }
178:
179: /**
180: * @see JMSControl#setHeader
181: */
182: public void setHeader(JMSControl.HeaderType type, Object value) {
183: if (_headers == null)
184: _headers = new HashMap<HeaderType, Object>();
185:
186: _headers.put(type, value);
187: }
188:
189: /**
190: * @see JMSControl#setProperties
191: */
192: public void setProperties(Map properties) {
193: if (properties == null)
194: return;
195:
196: /* todo: this is making an implicit assumption that the Map contains Strings */
197: Iterator<String> i = properties.keySet().iterator();
198: while (i.hasNext()) {
199: String name = i.next();
200: Object value = properties.get(name);
201: setProperty(name, value);
202: }
203: }
204:
205: /**
206: * @see JMSControl#setProperty
207: */
208: public void setProperty(String name, Object value) {
209: if (_properties == null)
210: _properties = new HashMap<String, Object>();
211:
212: _properties.put(name, value);
213: }
214:
215: /**
216: * Implementation of the {@link Extensible#invoke(java.lang.reflect.Method, Object[])} method. This
217: * method allows extension by interface.
218: */
219: public Object invoke(Method method, Object[] args)
220: throws ControlException {
221: assert (method != null && args != null);
222: javax.jms.Message m = null;
223: boolean isBody = false;
224: try {
225: Destination props = getDestinationProperties();
226: Session session = getSession();
227: String jmsType = null;
228: String correlationId = null;
229: /*
230: * Set the deliveryMode, priority and expiration.
231: */
232: int deliveryMode = getProducer().getDeliveryMode();
233: int priority = getProducer().getPriority();
234: long expiration = getProducer().getTimeToLive();
235: Object mode = getHeader(HeaderType.JMSDeliveryMode);
236: if (mode != null)
237: deliveryMode = deliveryModeToJmsMode(mode);
238:
239: Integer v = getHeaderAsInteger(HeaderType.JMSPriority);
240: if (v != null)
241: priority = v.intValue();
242:
243: Long l = getHeaderAsLong(HeaderType.JMSExpiration);
244: if (l != null)
245: expiration = l.longValue();
246:
247: /* Get the body of the message. */
248: Object body = null;
249:
250: // Check to see if any parameter has annotation. If it doesn't then don't bother checking them.
251: boolean hasAnnotation = method.getParameterAnnotations().length > 0;
252: for (int i = 0; i < args.length; i++) {
253: if (hasAnnotation) {
254: if (_context.getParameterPropertySet(method, i,
255: Priority.class) != null)
256: continue;
257: if (_context.getParameterPropertySet(method, i,
258: Property.class) != null)
259: continue;
260: if (_context.getParameterPropertySet(method, i,
261: Expiration.class) != null)
262: continue;
263: if (_context.getParameterPropertySet(method, i,
264: Delivery.class) != null)
265: continue;
266: if (_context.getParameterPropertySet(method, i,
267: Type.class) != null)
268: continue;
269: if (_context.getParameterPropertySet(method, i,
270: CorrelationId.class) != null)
271: continue;
272: }
273:
274: if (isBody)
275: throw new IllegalArgumentException(
276: "At most one parameter may be defined as the body of the JMS message");
277:
278: body = args[i];
279: isBody = true;
280: }
281: /*
282: * Get the method level annotation properties.
283: */
284: Priority pri = _context.getMethodPropertySet(method,
285: Priority.class);
286: if (pri != null && pri.value() != -1)
287: priority = pri.value();
288:
289: Expiration exp = _context.getMethodPropertySet(method,
290: Expiration.class);
291: if (exp != null && exp.value() != -1L)
292: expiration = exp.value();
293:
294: Delivery del = _context.getMethodPropertySet(method,
295: Delivery.class);
296: if (del != null && del.value() != DeliveryMode.Auto)
297: deliveryMode = deliveryModeToJmsMode(del.value());
298:
299: Type t = _context.getMethodPropertySet(method, Type.class);
300: if (t != null && t.value().length() != 0)
301: jmsType = t.value();
302:
303: CorrelationId id = _context.getMethodPropertySet(method,
304: CorrelationId.class);
305: if (id != null && id.value().length() != 0)
306: correlationId = id.value();
307:
308: /* Create a message of the appropriate type and set the body. */
309: JMSControl.Message mess = _context.getMethodPropertySet(
310: method, JMSControl.Message.class);
311: MessageType type = MessageType.Auto;
312: if (mess != null)
313: type = mess.value();
314:
315: if (type.equals(MessageType.Auto)) {
316: if (body instanceof byte[])
317: type = MessageType.Bytes;
318: else if (body instanceof Map || !isBody)
319: type = MessageType.Map;
320: else if (body instanceof String)
321: type = MessageType.Text;
322: else if (XMLOBJ_CLASS != null
323: && XMLOBJ_CLASS.isAssignableFrom(body
324: .getClass()))
325: type = MessageType.Text;
326: else if (body instanceof javax.jms.Message)
327: type = MessageType.JMSMessage;
328: else if (body instanceof Serializable)
329: type = MessageType.Object;
330: else
331: throw new ControlException(
332: "Cannot determine message type from body");
333: }
334: switch (type) {
335: case Object:
336: checkBody(body, Serializable.class);
337: m = session.createObjectMessage((Serializable) body);
338: break;
339: case Bytes:
340: BytesMessage sm;
341: checkBody(body, byte[].class);
342: m = sm = session.createBytesMessage();
343: sm.writeBytes((byte[]) body);
344: break;
345: case Text:
346: if (XMLOBJ_CLASS != null
347: && XMLOBJ_CLASS.isAssignableFrom(body
348: .getClass())) {
349: try {
350: Method xmlText = XMLOBJ_CLASS.getMethod(
351: "xmlText", new Class[] {});
352: body = xmlText.invoke(body, new Object[] {});
353: } catch (NoSuchMethodException e) {
354: throw new ControlException(e.getMessage(), e);
355: } catch (IllegalAccessException e) {
356: throw new ControlException(e.getMessage(), e);
357: } catch (InvocationTargetException e) {
358: throw new ControlException(e.getMessage(), e);
359: }
360: }
361: checkBody(body, String.class);
362: m = session.createTextMessage((String) body);
363: break;
364: case Map:
365: MapMessage mm;
366: checkBody(body, Map.class);
367: m = mm = session.createMapMessage();
368: Map map = (Map) body;
369: Iterator iter = map.keySet().iterator();
370: while (iter.hasNext()) {
371: String key = (String) iter.next();
372: mm.setObject(key, map.get(key));
373: }
374: break;
375: case JMSMessage:
376: checkBody(body, javax.jms.Message.class);
377: m = (javax.jms.Message) body;
378: break;
379: }
380: /*
381: * Set the correlation id if given.
382: */
383: String correlationProp = props.sendCorrelationProperty();
384: if (correlationProp != null
385: && correlationProp.length() == 0)
386: correlationProp = null;
387:
388: if (correlationId == null)
389: correlationId = getHeaderAsString(HeaderType.JMSCorrelationID);
390:
391: Properties jmsProps = (Properties) _context
392: .getMethodPropertySet(method, Properties.class);
393: if (jmsProps != null && jmsProps.value() != null) {
394: PropertyValue[] jprops = jmsProps.value();
395: for (int i = 0; i < jprops.length; i++) {
396: PropertyValue jprop = jprops[i];
397: Class cls = jprop.type();
398: if (cls.equals(String.class))
399: m
400: .setStringProperty(jprop.name(), jprop
401: .value());
402: else if (cls.equals(Integer.class))
403: m.setIntProperty(jprop.name(),
404: valueAsInteger(jprop.value()));
405: else if (cls.equals(Long.class))
406: m.setLongProperty(jprop.name(),
407: valueAsLong(jprop.value()));
408: else
409: throw new IllegalArgumentException(
410: "Invalid type for property-value");
411: }
412: }
413: /*
414: * Set the properties/headers of the message from
415: * the parameters to the invoke.
416: */
417: if (hasAnnotation) {
418: for (int i = 0; i < args.length; i++) {
419: Property jmsProp = (Property) _context
420: .getParameterPropertySet(method, i,
421: Property.class);
422: if (jmsProp != null)
423: m.setObjectProperty(jmsProp.name(), args[i]);
424:
425: Priority jmsPriority = (Priority) _context
426: .getParameterPropertySet(method, i,
427: Priority.class);
428: if (jmsPriority != null) {
429: Integer p = valueAsInteger(args[i]);
430: if (p != null)
431: priority = p.intValue();
432: }
433: Expiration jmsExpiration = (Expiration) _context
434: .getParameterPropertySet(method, i,
435: Expiration.class);
436: if (jmsExpiration != null) {
437: Long e = valueAsLong(args[i]);
438: if (e != null)
439: expiration = e.longValue();
440: }
441: Delivery jmsDelivery = (Delivery) _context
442: .getParameterPropertySet(method, i,
443: Delivery.class);
444: if (jmsDelivery != null && args[i] != null) {
445: deliveryMode = deliveryModeToJmsMode(args[i]);
446: }
447: t = (Type) _context.getParameterPropertySet(method,
448: i, Type.class);
449: if (t != null && args[i] != null)
450: jmsType = (String) args[i];
451:
452: CorrelationId jmsId = (CorrelationId) _context
453: .getParameterPropertySet(method, i,
454: CorrelationId.class);
455: if (jmsId != null && args[i] != null)
456: correlationId = (String) args[i];
457: }
458: }
459: if (correlationProp != null)
460: m.setStringProperty(correlationProp, correlationId);
461: else
462: m.setJMSCorrelationID(correlationId);
463:
464: /* Set the headers and properties from maps provided by setProperties() and setHeaders() */
465: m.setJMSExpiration(expiration);
466: m.setJMSDeliveryMode(deliveryMode);
467: m.setJMSPriority(priority);
468: setMessageHeaders(m);
469: setMessageProperties(m);
470: expiration = m.getJMSExpiration();
471: deliveryMode = m.getJMSDeliveryMode();
472: priority = m.getJMSPriority();
473:
474: _headers = null;
475: _properties = null;
476:
477: /* Send the message. */
478: switch (getDestinationType()) {
479: case Topic:
480: ((TopicPublisher) getProducer()).publish(m,
481: deliveryMode, priority, expiration);
482: break;
483: case Queue:
484: getProducer().send(m, deliveryMode, priority,
485: expiration);
486: break;
487: }
488: } catch (JMSException e) {
489: throw new ControlException("Error in sending message", e);
490: }
491: return m;
492: }
493:
494: @EventHandler(field="_resourceContext",eventSet=ResourceContext.ResourceEvents.class,eventName="onAcquire")
495: public void onAcquire() {
496: }
497:
498: /*
499: * The onRelease event handler for the associated context This method will
500: * release all resource acquired by onAcquire.
501: */
502: @EventHandler(field="_resourceContext",eventSet=ResourceContext.ResourceEvents.class,eventName="onRelease")
503: public void onRelease() {
504: close();
505: }
506:
507: /**
508: * Release any JMS related resources.
509: */
510: protected void close() {
511: try {
512: if (_producer != null) {
513: _producer.close();
514: _producer = null;
515: }
516: if (_session != null) {
517: _session.close();
518: _session = null;
519: }
520: if (_connection != null) {
521: _connection.close();
522: _connection = null;
523: }
524: } catch (JMSException e) {
525: throw new ControlException(
526: "Unable to release JMS resource", e);
527: }
528: }
529:
530: /**
531: * Deterimine whether we are working with a queue or a topic.
532: */
533: protected void determineDestination() {
534: Destination props = getDestinationProperties();
535: String factory = props.jndiConnectionFactory();
536: ConnectionFactory cfactory = (ConnectionFactory) getJndiControl()
537: .getResource(factory, ConnectionFactory.class);
538:
539: if (cfactory instanceof QueueConnectionFactory
540: && cfactory instanceof TopicConnectionFactory) {
541: javax.jms.Destination dest = getDestination();
542: if (dest instanceof Queue && dest instanceof Topic) {
543: /* Try to create a topic producer...if fail then assume that it is a queue */
544: try {
545: createTopicSession();
546: _producer = ((TopicSession) getSession())
547: .createPublisher((Topic) getDestination());
548: } catch (Exception e) {
549: close();
550: _destinationType = DestinationType.Queue;
551: return;
552: }
553: _destinationType = DestinationType.Topic;
554: } else {
555: if (dest instanceof javax.jms.Queue)
556: _destinationType = DestinationType.Queue;
557: if (dest instanceof javax.jms.Topic)
558: _destinationType = DestinationType.Topic;
559: }
560: } else {
561: if (cfactory instanceof QueueConnectionFactory)
562: _destinationType = DestinationType.Queue;
563: if (cfactory instanceof TopicConnectionFactory)
564: _destinationType = DestinationType.Topic;
565: }
566: }
567:
568: /**
569: * Get the queue/topic producer.
570: *
571: * @return the JMS producer.
572: */
573: protected MessageProducer getProducer() {
574: if (_producer == null) {
575: // Acquire the publisher/sender.
576: try {
577: javax.jms.Session sess = getSession();
578: switch (getDestinationType()) {
579: case Auto:
580: try {
581: _producer = ((QueueSession) sess)
582: .createSender((Queue) getDestination());
583: } catch (Exception e) {
584: /* todo: should never be catching an exception and running code like this */
585: _producer = ((TopicSession) sess)
586: .createPublisher((Topic) getDestination());
587: }
588: break;
589: case Topic:
590: _producer = ((TopicSession) sess)
591: .createPublisher((Topic) getDestination());
592: break;
593: case Queue:
594: _producer = ((QueueSession) sess)
595: .createSender((Queue) getDestination());
596: break;
597: }
598: } catch (JMSException e) {
599: throw new ControlException(
600: "Unable to acquire JMS resource", e);
601: }
602: }
603: return _producer;
604: }
605:
606: /**
607: * Creates a topic session.
608: */
609: protected void createTopicSession() throws JMSException {
610: Destination props = getDestinationProperties();
611: String factory = props.jndiConnectionFactory();
612: boolean transacted = props.transacted();
613:
614: AcknowledgeMode ackMode = props.acknowledgeMode();
615: TopicConnectionFactory connFactory = (TopicConnectionFactory) getJndiControl()
616: .getResource(factory, TopicConnectionFactory.class);
617:
618: _connection = connFactory.createTopicConnection();
619: _session = ((TopicConnection) _connection).createTopicSession(
620: transacted, modeToJmsMode(ackMode));
621: }
622:
623: /**
624: * Creates a queue session.
625: */
626: protected void createQueueSession() throws JMSException {
627: Destination props = getDestinationProperties();
628: String factory = props.jndiConnectionFactory();
629: boolean transacted = props.transacted();
630: AcknowledgeMode ackMode = props.acknowledgeMode();
631: QueueConnectionFactory connFactory = (QueueConnectionFactory) getJndiControl()
632: .getResource(factory, QueueConnectionFactory.class);
633: _connection = connFactory.createQueueConnection();
634: _session = ((QueueConnection) _connection).createQueueSession(
635: transacted, modeToJmsMode(ackMode));
636: }
637:
638: /**
639: * Convert the enum to the JMS ack mode.
640: *
641: * @param mode the enum mode.
642: * @return the JMS mode.
643: */
644: protected int modeToJmsMode(AcknowledgeMode mode) {
645: if (mode == AcknowledgeMode.Auto)
646: return Session.AUTO_ACKNOWLEDGE;
647: else if (mode == AcknowledgeMode.Client)
648: return Session.CLIENT_ACKNOWLEDGE;
649: else
650: return Session.DUPS_OK_ACKNOWLEDGE;
651: }
652:
653: /**
654: * Convert the object to the JMS delivery mode.
655: *
656: * @param value a integer valued string, integer or DeliveryMode.
657: * @return the JMS delivery mode.
658: */
659: protected int deliveryModeToJmsMode(Object value) {
660: if (value instanceof DeliveryMode) {
661: DeliveryMode mode = (DeliveryMode) value;
662: switch (mode) {
663: case Persistent:
664: return javax.jms.DeliveryMode.PERSISTENT;
665: case NonPersistent:
666: return javax.jms.DeliveryMode.NON_PERSISTENT;
667: }
668: }
669:
670: if (value instanceof Number)
671: return ((Number) value).intValue();
672: else if (value instanceof String)
673: return Integer.parseInt((String) value);
674: else
675: throw new IllegalArgumentException(
676: "Invalid delivery mode value");
677: }
678:
679: /**
680: * Check the given value to see if is appropriate for the given message value class.
681: *
682: * @param value the body.
683: * @param cls the expected class.
684: */
685: protected void checkBody(Object value, Class cls)
686: throws ControlException {
687:
688: if (!cls.isInstance(value))
689: throw new ControlException(
690: "Message body is not of correct type expected "
691: + cls.getName());
692: }
693:
694: /**
695: * Get the destination annotation info for the message.
696: *
697: * @return the destination method annotation.
698: */
699: protected Destination getDestinationProperties() {
700: return _context.getControlPropertySet(Destination.class);
701: }
702:
703: /**
704: * Get the destination type as specified via the
705: * {@link org.apache.beehive.controls.system.jms.JMSControl.Destination#sendType()}
706: * annotation.
707: */
708: protected DestinationType getDestinationType()
709: throws ControlException {
710:
711: /*
712: when previously unset, obtain from the value of the annotation.
713: this value tends to be overridden later based on the configuration
714: of the control instance.
715: */
716: if (_destinationType == null) {
717: Destination d = getDestinationProperties();
718: assert d != null : "Found a null Destination annotation";
719: _destinationType = d.sendType();
720: }
721:
722: return _destinationType;
723: }
724:
725: /**
726: * Set the properties of the given message.
727: *
728: * @param msg the message.
729: */
730: protected void setMessageProperties(javax.jms.Message msg)
731: throws ControlException {
732:
733: if (_properties == null)
734: return;
735:
736: Iterator i = _properties.keySet().iterator();
737: while (i.hasNext()) {
738: String name = (String) i.next();
739: Object value = _properties.get(name);
740: try {
741: msg.setObjectProperty(name, value);
742: } catch (JMSException e) {
743: throw new ControlException("Cannot set property '"
744: + name + "' into JMS message");
745: }
746: }
747: }
748:
749: /**
750: * Set the headers. Accessing msg may throw exception.
751: *
752: * @param msg the message.
753: */
754: protected void setMessageHeaders(javax.jms.Message msg)
755: throws ControlException {
756:
757: if (_headers == null)
758: return;
759:
760: for (HeaderType name : _headers.keySet()) {
761: Object value = _headers.get(name);
762: setMessageHeader(msg, name, value);
763: }
764: }
765:
766: /**
767: * Set the header value of the given message.
768: *
769: * @param msg the message.
770: * @param type the header type.
771: * @param value the value.
772: */
773: protected void setMessageHeader(javax.jms.Message msg,
774: HeaderType type, Object value) {
775: switch (type) {
776: case JMSCorrelationID:
777: try {
778: if (value instanceof byte[])
779: msg.setJMSCorrelationIDAsBytes((byte[]) value);
780: else if (value instanceof String)
781: msg.setJMSCorrelationID((String) value);
782: } catch (javax.jms.JMSException e) {
783: throw new ControlException(
784: "Error setting JMSCorrelationID for message", e);
785: }
786: break;
787: case JMSPriority:
788: try {
789: if (value instanceof Integer)
790: msg.setJMSPriority((Integer) value);
791: else if (value instanceof String)
792: msg.setJMSPriority(Integer
793: .getInteger((String) value));
794: } catch (javax.jms.JMSException e) {
795: throw new ControlException(
796: "Error setting JMSPriority for message", e);
797: }
798: break;
799: case JMSExpiration:
800: try {
801: if (value instanceof Long)
802: msg.setJMSExpiration((Long) value);
803: else if (value instanceof String)
804: msg
805: .setJMSExpiration(Long
806: .parseLong((String) value));
807: } catch (javax.jms.JMSException e) {
808: throw new ControlException(
809: "Error setting JMSExpiration for message", e);
810: }
811: break;
812: case JMSType:
813: try {
814: msg.setJMSType((String) value);
815: } catch (javax.jms.JMSException e) {
816: throw new ControlException(
817: "Error setting JMSType for message", e);
818: }
819: break;
820: }
821: }
822:
823: /**
824: * Get the header type value.
825: *
826: * @param name the header type.
827: * @return the value or null.
828: */
829: protected Object getHeader(HeaderType name) {
830: assert name != null : "Can't get header with null name";
831:
832: if (_headers == null)
833: return null;
834: else
835: return _headers.get(name.toString());
836: }
837:
838: /**
839: * Get the header type value as a string.
840: *
841: * @param name the header type.
842: * @return the string value or null.
843: */
844: protected String getHeaderAsString(HeaderType name) {
845: Object obj = getHeader(name);
846: return obj != null ? obj.toString() : null;
847: }
848:
849: /**
850: * Get the header type value as a int.
851: *
852: * @param name the header type.
853: * @return the int value or null.
854: */
855: protected Integer getHeaderAsInteger(HeaderType name) {
856: return valueAsInteger(getHeader(name));
857: }
858:
859: /**
860: * Return the given value as an integer.
861: *
862: * @param obj a string or number object.
863: * @return the integer value or null.
864: */
865: protected Integer valueAsInteger(Object obj) {
866: if (obj instanceof String)
867: return Integer.parseInt((String) obj);
868: else if (obj instanceof Integer)
869: return (Integer) obj;
870: else if (obj instanceof Number)
871: return new Integer(((Number) obj).intValue());
872: return null;
873: }
874:
875: /**
876: * Get the header type value as a long.
877: *
878: * @param name the header type.
879: * @return the long value or null.
880: */
881: protected Long getHeaderAsLong(HeaderType name) {
882: return valueAsLong(getHeader(name));
883: }
884:
885: /**
886: * Return the given value as a long.
887: *
888: * @param obj a string or number object.
889: * @return the long value or null.
890: */
891: protected Long valueAsLong(Object obj) {
892: if (obj instanceof String)
893: return Long.parseLong((String) obj);
894: else if (obj instanceof Long)
895: return (Long) obj;
896: else if (obj instanceof Number)
897: return new Long(((Number) obj).longValue());
898: return null;
899: }
900:
901: /**
902: * Get the JNDI control instance.
903: *
904: * @return the jndi-control.
905: */
906: protected JndiControlBean getJndiControl() {
907: if (!_jndiInitialized) {
908:
909: _jndiInitialized = true;
910: Destination dest = getDestinationProperties();
911:
912: String contextFactory = nullIfEmpty(dest
913: .jndiContextFactory());
914: if (contextFactory != null)
915: _jndiControl.setFactory(contextFactory);
916:
917: String providerURL = nullIfEmpty(dest.jndiProviderURL());
918: if (providerURL != null)
919: _jndiControl.setUrl(providerURL);
920:
921: String userName = nullIfEmpty(dest.jndiUsername());
922: if (userName != null)
923: _jndiControl.setJndiSecurityPrincipal(userName);
924:
925: String password = nullIfEmpty(dest.jndiPassword());
926: if (password != null)
927: _jndiControl.setJndiSecurityCredentials(password);
928: }
929: return _jndiControl;
930: }
931:
932: /**
933: * Return null if the given string is null or an empty string.
934: *
935: * @param str a string.
936: * @return null or the string.
937: */
938: protected String nullIfEmpty(String str) {
939: if (str == null || str.trim().length() == 0) {
940: return null;
941: }
942: return str;
943: }
944:
945: @Context
946: private ResourceContext _resourceContext;
947:
948: @Context
949: private ControlBeanContext _context;
950:
951: @Control
952: private JndiControlBean _jndiControl;
953:
954: /** The destination */
955: private transient javax.jms.Destination _destination;
956:
957: private transient DestinationType _destinationType = null;
958:
959: /** The JMS connection. */
960: private transient javax.jms.Connection _connection;
961:
962: /** The JMS session */
963: private transient Session _session;
964:
965: /** The message producer. */
966: private transient MessageProducer _producer;
967:
968: /** The JNDI control has been initialized */
969: private boolean _jndiInitialized = false;
970:
971: /** The headers to set in the next message to be sent. */
972: private Map<HeaderType, Object> _headers;
973:
974: /** The properties to set in the next message to be sent. */
975: private Map<String, Object> _properties;
976: }
|