001: // $Id: JMS.java,v 1.14 2006/06/23 09:01:24 belaban Exp $
002:
003: package org.jgroups.protocols;
004:
005: import org.jgroups.Address;
006: import org.jgroups.Event;
007: import org.jgroups.Message;
008: import org.jgroups.View;
009: import org.jgroups.stack.Protocol;
010: import org.jgroups.util.Util;
011:
012: import javax.naming.Context;
013: import javax.naming.InitialContext;
014: import java.io.*;
015: import java.util.Hashtable;
016: import java.util.Properties;
017: import java.util.Vector;
018:
019: /**
020: * Implementation of the transport protocol using the Java Message Service (JMS).
021: * This implementation depends on the JMS server that will distribute messages
022: * published to the specific topic to all topic subscribers.
023: * <p>
024: * Protocol parameters are:
025: * <ul>
026: * <li><code>topicName</code> - (required), full JNDI name of the topic to be
027: * used for message publishing;
028: *
029: * <li><code>cf</code> - (optional), full JNDI name of the topic connection
030: * factory that will create topic connection, default value is
031: * <code>"ConnectionFactory"</code>;
032: *
033: * <li><code>jndiCtx</code> - (optional), value of the
034: * <code>javax.naming.Context.INITIAL_CONTEXT_FACTORY</code> property; you can
035: * specify it as the JVM system property
036: * <code>-Djava.naming.factory.initial=factory.class.Name</code>;
037: *
038: * <li><code>providerURL</code> - (optional), value of the
039: * <code>javax.naming.Context.PROVIDER_URL</code> property; you can specify it
040: * as the JVM system property <code>-Djava.naming.provider.url=some_url</code>
041: *
042: * <li><code>ttl</code> - (required), time to live in milliseconds. Default
043: * value is 0, that means that messages will never expire and will be
044: * accumulated by a JMS server.
045: *
046: * </ul>
047: *
048: * <p>
049: * Note, when you are using the JMS protocol, try to avoid using protocols
050: * that open server socket connections, like FD_SOCK. I belive that FD is more
051: * appropriate failure detector for JMS case.
052: *
053: * @author Roman Rokytskyy (rrokytskyy@acm.org)
054: */
055: public class JMS extends Protocol implements javax.jms.MessageListener {
056:
057: public static final String DEFAULT_CONNECTION_FACTORY = "ConnectionFactory";
058:
059: public static final String INIT_CONNECTION_FACTORY = "cf";
060:
061: public static final String INIT_TOPIC_NAME = "topicName";
062:
063: public static final String INIT_JNDI_CONTEXT = "jndiCtx";
064:
065: public static final String INIT_PROVIDER_URL = "providerURL";
066:
067: public static final String TIME_TO_LIVE = "ttl";
068:
069: public static final String GROUP_NAME_PROPERTY = "jgroups_group_name";
070:
071: public static final String SRC_PROPERTY = "src";
072:
073: public static final String DEST_PROPERTY = "dest";
074:
075: private final Vector members = new Vector();
076:
077: private javax.jms.TopicConnectionFactory connectionFactory;
078: private javax.jms.Topic topic;
079:
080: private javax.jms.TopicConnection connection;
081:
082: private javax.jms.TopicSession session;
083: private javax.jms.TopicPublisher publisher;
084: private javax.jms.TopicSubscriber subscriber;
085:
086: private String cfName;
087: private String topicName;
088: private String initCtxFactory;
089: private String providerUrl;
090: private long timeToLive;
091:
092: private Context ctx;
093:
094: private String group_addr;
095: private Address local_addr;
096: private Address mcast_addr;
097:
098: private final ByteArrayOutputStream out_stream = new ByteArrayOutputStream(
099: 65535);
100:
101: private static final java.util.Random RND = new java.util.Random();
102:
103: /**
104: * Empty constructor.
105: */
106: public JMS() {
107: }
108:
109: /**
110: * Get the name of the protocol.
111: *
112: * @return always returns the <code>"JMS"</code> string.
113: */
114: public String getName() {
115: return "JMS";
116: }
117:
118: /**
119: * Get the string representation of the protocol.
120: *
121: * @return string representation of the protocol (not very useful though).
122: */
123: public String toString() {
124: return "Protocol JMS(local address: " + local_addr + ')';
125: }
126:
127: /**
128: * Set protocol properties. Properties are:
129: * <ul>
130: * <li><code>topicName</code> - (required), full JNDI name of the topic to be
131: * used for message publishing;
132: *
133: * <li><code>cf</code> - (optional), full JNDI name of the topic connection
134: * factory that will create topic connection, default value is
135: * <code>"ConnectionFactory"</code>;
136: *
137: * <li><code>jndiCtx</code> - (optional), value of the
138: * <code>javax.naming.Context.INITIAL_CONTEXT_FACTORY</code> property; you can
139: * specify it as the JVM system property
140: * <code>-Djava.naming.factory.initial=factory.class.Name</code>;
141: *
142: * <li><code>providerURL</code> - (optional), value of the
143: * <code>javax.naming.Context.PROVIDER_URL</code> property; you can specify it
144: * as the JVM system property <code>-Djava.naming.provider.url=some_url</code>
145: * </ul>
146: *
147: */
148: public boolean setProperties(Properties props) {
149: super .setProperties(props);
150: cfName = props.getProperty(INIT_CONNECTION_FACTORY,
151: DEFAULT_CONNECTION_FACTORY);
152:
153: props.remove(INIT_CONNECTION_FACTORY);
154:
155: topicName = props.getProperty(INIT_TOPIC_NAME);
156:
157: if (topicName == null)
158: throw new IllegalArgumentException(
159: "JMS topic has not been specified.");
160:
161: props.remove(INIT_TOPIC_NAME);
162:
163: initCtxFactory = props.getProperty(INIT_JNDI_CONTEXT);
164: props.remove(INIT_JNDI_CONTEXT);
165:
166: providerUrl = props.getProperty(INIT_PROVIDER_URL);
167: props.remove(INIT_PROVIDER_URL);
168:
169: String ttl = props.getProperty(TIME_TO_LIVE);
170:
171: if (ttl == null) {
172: if (log.isErrorEnabled())
173: log.error("ttl property not found.");
174: return false;
175: }
176:
177: props.remove(TIME_TO_LIVE);
178:
179: // try to parse ttl property
180: try {
181:
182: timeToLive = Long.parseLong(ttl);
183:
184: } catch (NumberFormatException nfex) {
185: if (log.isErrorEnabled())
186: log
187: .error("ttl property does not contain numeric value.");
188:
189: return false;
190: }
191:
192: return props.size() == 0;
193: }
194:
195: /**
196: * Implementation of the <code>javax.jms.MessageListener</code> interface.
197: * This method receives the JMS message, checks the destination group name.
198: * If the group name is the same as the group name of this channel, it
199: * checks the destination address. If destination address is either
200: * multicast or is the same as local address then message is unwrapped and
201: * passed up the protocol stack. Otherwise it is ignored.
202: *
203: * @param jmsMessage instance of <code>javax.jms.Message</code>.
204: */
205: public void onMessage(javax.jms.Message jmsMessage) {
206: try {
207: String groupName = jmsMessage
208: .getStringProperty(GROUP_NAME_PROPERTY);
209:
210: // there might be other messages in this topic
211: if (groupName == null)
212: return;
213:
214: if (log.isDebugEnabled())
215: log.debug("Got message for group [" + groupName + ']'
216: + ", my group is [" + group_addr + ']');
217:
218: // not our message, ignore it
219: if (!group_addr.equals(groupName))
220: return;
221:
222: JMSAddress src = jmsMessage.getStringProperty(SRC_PROPERTY) != null ? new JMSAddress(
223: jmsMessage.getStringProperty(SRC_PROPERTY))
224: : null;
225:
226: JMSAddress dest = jmsMessage
227: .getStringProperty(DEST_PROPERTY) != null ? new JMSAddress(
228: jmsMessage.getStringProperty(DEST_PROPERTY))
229: : null;
230:
231: // if message is unicast message and I'm not the destination - ignore
232: if (src != null && dest != null && !dest.equals(local_addr)
233: && !dest.isMulticastAddress())
234: return;
235:
236: if (jmsMessage instanceof javax.jms.ObjectMessage) {
237: byte[] buf = (byte[]) ((javax.jms.ObjectMessage) jmsMessage)
238: .getObject();
239:
240: ByteArrayInputStream inp_stream = new ByteArrayInputStream(
241: buf);
242: ObjectInputStream inp = new ObjectInputStream(
243: inp_stream);
244:
245: Message msg = new Message();
246: msg.readExternal(inp);
247:
248: Event evt = new Event(Event.MSG, msg);
249:
250: // +++ remove
251: if (log.isDebugEnabled())
252: log.debug("Message is " + msg + ", headers are "
253: + msg.getHeaders());
254:
255: /* Because Protocol.Up() is never called by this bottommost layer,
256: * we call Up() directly in the observer. This allows e.g.
257: * PerfObserver to get the time of reception of a message */
258: if (observer != null)
259: observer.up(evt, up_queue.size());
260:
261: passUp(evt);
262: }
263: } catch (javax.jms.JMSException ex) {
264: ex.printStackTrace();
265: if (log.isErrorEnabled())
266: log.error("JMSException : " + ex.toString());
267: } catch (IOException ioex) {
268: ioex.printStackTrace();
269: if (log.isErrorEnabled())
270: log.error("IOException : " + ioex.toString());
271: } catch (ClassNotFoundException cnfex) {
272: cnfex.printStackTrace();
273: if (log.isErrorEnabled())
274: log.error("ClassNotFoundException : "
275: + cnfex.toString());
276: }
277: }
278:
279: /**
280: * Handle down event, if it is not a Event.MSG type.
281: *
282: * @param evt event to handle.
283: */
284: protected void handleDownEvent(Event evt) {
285: switch (evt.getType()) {
286:
287: // we do not need this at present time, maybe in the future
288: case Event.TMP_VIEW:
289: case Event.VIEW_CHANGE:
290: synchronized (members) {
291: members.removeAllElements();
292: Vector tmpvec = ((View) evt.getArg()).getMembers();
293: for (int i = 0; i < tmpvec.size(); i++)
294: members.addElement(tmpvec.elementAt(i));
295: }
296: break;
297:
298: case Event.GET_LOCAL_ADDRESS:
299: // return local address -> Event(SET_LOCAL_ADDRESS, local)
300: passUp(new Event(Event.SET_LOCAL_ADDRESS, local_addr));
301: break;
302:
303: case Event.CONNECT:
304: group_addr = (String) evt.getArg();
305: passUp(new Event(Event.CONNECT_OK));
306: break;
307:
308: case Event.DISCONNECT:
309: passUp(new Event(Event.DISCONNECT_OK));
310: break;
311: }
312: }
313:
314: /**
315: * Called by the protocol above this. We check the event type, and if it is
316: * message, we publish it in the topic, otherwise we let the
317: * {@link #handleDownEvent(Event)} take care of it.
318: *
319: * @param evt event to process.
320: */
321: public void down(Event evt) {
322:
323: if (log.isInfoEnabled())
324: log.info("event is " + evt + ", group_addr=" + group_addr
325: + ", time=" + System.currentTimeMillis()
326: + ", hdrs are " + Util.printEvent(evt));
327:
328: // handle all non-message events
329: if (evt.getType() != Event.MSG) {
330: handleDownEvent(evt);
331: return;
332: }
333:
334: // extract message
335: Message msg = (Message) evt.getArg();
336:
337: // Because we don't call Protocol.passDown(), we notify the observer
338: // directly (e.g. PerfObserver).
339: // This way, we still have performance numbers for UDP
340: if (observer != null)
341: observer.passDown(evt);
342:
343: // publish the message to the topic
344: sendMessage(msg);
345:
346: }
347:
348: /**
349: * Publish message in the JMS topic. We set the message source and
350: * destination addresses if they were <code>null</code>.
351: *
352: * @param msg message to publish.
353: */
354: protected void sendMessage(Message msg) {
355: try {
356: if (msg.getSrc() == null)
357: msg.setSrc(local_addr);
358:
359: if (msg.getDest() == null)
360: msg.setDest(mcast_addr);
361:
362: if (log.isInfoEnabled())
363: log.info("msg is " + msg);
364:
365: // convert the message into byte array.
366: out_stream.reset();
367:
368: ObjectOutputStream out = new ObjectOutputStream(out_stream);
369: msg.writeExternal(out);
370: out.flush();
371:
372: byte[] buf = out_stream.toByteArray();
373:
374: javax.jms.ObjectMessage jmsMessage = session
375: .createObjectMessage();
376:
377: // set the payload
378: jmsMessage.setObject(buf);
379:
380: // set the group name
381: jmsMessage.setStringProperty(GROUP_NAME_PROPERTY,
382: group_addr);
383:
384: // if the source is JMSAddress, copy it to the header
385: if (msg.getSrc() instanceof JMSAddress)
386: jmsMessage.setStringProperty(SRC_PROPERTY, msg.getSrc()
387: .toString());
388:
389: // if the destination is JMSAddress, copy it to the header
390: if (msg.getDest() instanceof JMSAddress)
391: jmsMessage.setStringProperty(DEST_PROPERTY, msg
392: .getDest().toString());
393:
394: // publish message
395: publisher.publish(jmsMessage);
396:
397: } catch (javax.jms.JMSException ex) {
398: if (log.isErrorEnabled())
399: log.error("JMSException : " + ex.toString());
400: } catch (IOException ioex) {
401: if (log.isErrorEnabled())
402: log.error("IOException : " + ioex.toString());
403: }
404: }
405:
406: /**
407: * Start the JMS protocol. This method instantiates the JNDI initial context
408: * and looks up the topic connection factory and topic itself. If this step
409: * is successful, it creates a connection to JMS server, opens a session
410: * and obtains publisher and subscriber instances.
411: *
412: * @throws javax.jms.JMSException if something goes wrong with JMS.
413: * @throws javax.naming.NamingException if something goes wrong with JNDI.
414: * @throws IllegalArgumentException if the connection factory or topic
415: * cannot be found under specified names.
416: */
417: public void start() throws Exception {
418: if (initCtxFactory != null && providerUrl != null) {
419: Hashtable env = new Hashtable();
420: env.put(Context.INITIAL_CONTEXT_FACTORY, initCtxFactory);
421: env.put(Context.PROVIDER_URL, providerUrl);
422:
423: ctx = new InitialContext(env);
424: } else
425: ctx = new InitialContext();
426:
427: connectionFactory = (javax.jms.TopicConnectionFactory) ctx
428: .lookup(cfName);
429:
430: if (connectionFactory == null)
431: throw new IllegalArgumentException(
432: "Topic connection factory cannot be found in JNDI.");
433:
434: topic = (javax.jms.Topic) ctx.lookup(topicName);
435:
436: if (topic == null)
437: throw new IllegalArgumentException(
438: "Topic cannot be found in JNDI.");
439:
440: connection = connectionFactory.createTopicConnection();
441:
442: boolean addressAssigned = false;
443:
444: // check if JMS connection contains client ID,
445: // if not, try to assign randomly generated one
446: /*while(!addressAssigned) {
447: if (connection.getClientID() != null)
448: addressAssigned = true;
449: else
450: try {
451: connection.setClientID(generateLocalAddress());
452: addressAssigned = true;
453: } catch(javax.jms.InvalidClientIDException ex) {
454: // duplicate... ok, let's try again
455: }
456: }*/
457:
458: // Patch below submitted by Greg Woolsey
459: // Check if JMS connection contains client ID, if not, try to assign randomly generated one
460: // setClientID() must be the first method called on a new connection, per the JMS spec.
461: // If the client ID is already set, this will throw IllegalStateException and keep the original value.
462: while (!addressAssigned) {
463: try {
464: connection.setClientID(generateLocalAddress());
465: addressAssigned = true;
466: } catch (javax.jms.IllegalStateException e) {
467: // expected if connection already has a client ID.
468: addressAssigned = true;
469: } catch (javax.jms.InvalidClientIDException ex) {
470: // duplicate... OK, let's try again
471: }
472: }
473:
474: local_addr = new JMSAddress(connection.getClientID(), false);
475: mcast_addr = new JMSAddress(topicName, true);
476:
477: session = connection.createTopicSession(false,
478: javax.jms.Session.AUTO_ACKNOWLEDGE);
479:
480: publisher = session.createPublisher(topic);
481: publisher.setTimeToLive(timeToLive);
482:
483: subscriber = session.createSubscriber(topic);
484: subscriber.setMessageListener(this );
485:
486: connection.start();
487:
488: passUp(new Event(Event.SET_LOCAL_ADDRESS, local_addr));
489: }
490:
491: /**
492: * Stops the work of the JMS protocol. This method closes JMS session and
493: * connection and deregisters itself from the message notification.
494: */
495: public void stop() {
496:
497: if (log.isInfoEnabled())
498: log.info("finishing JMS transport layer.");
499:
500: try {
501: connection.stop();
502: subscriber.setMessageListener(null);
503: session.close();
504: connection.close();
505: } catch (Throwable ex) {
506: if (log.isErrorEnabled())
507: log.error("exception is " + ex);
508: }
509: }
510:
511: /**
512: * Generate random local address. This method takes host name and appends
513: * it with randomly generated integer.
514: *
515: * @return randomly generated local address.
516: */
517: protected String generateLocalAddress()
518: throws java.net.UnknownHostException {
519: String hostName = java.net.InetAddress.getLocalHost()
520: .getHostName();
521:
522: int rndPort = RND.nextInt(65535);
523:
524: return hostName + ':' + rndPort;
525: }
526:
527: /**
528: * Simple {@link Address} representing the JMS node ID or JMS topic group.
529: */
530: public static class JMSAddress implements Address {
531: private String address;
532: private boolean isMCast;
533:
534: /**
535: * Empty constructor to allow externalization work.
536: */
537: public JMSAddress() {
538: }
539:
540: /**
541: * Create instance of this class for given address string.
542: *
543: * Current implementation uses a hash mark <code>'#'</code> to determine
544: * if the address is a unicast or multicast. Therefore, this character is
545: * considered as reserved and is not allowed in the <code>address</code>
546: * parameter passed to this constructor.
547: *
548: * @param address string representing the address of the node connected
549: * to the JMS topic, usually, a value of
550: * <code>connection.getClientID()</code>, where the connection is
551: * instance of <code>javax.jms.TopicConnection</code>.
552: *
553: * @param isMCast <code>true</code> if the address is multicast address,
554: * otherwise - <code>false</code>.
555: */
556: JMSAddress(String address, boolean isMCast) {
557: this .address = address;
558: this .isMCast = isMCast;
559: }
560:
561: /**
562: * Reconstruct the address from the string representation. If the
563: * <code>str</code> starts with <code>'#'</code>, address is considered
564: * as unicast, and node address is the substring after <code>'#'</code>.
565: * Otherwise, address is multicast and address is <code>str</code>
566: * itself.
567: *
568: * @param str string used to reconstruct the instance.
569: */
570: JMSAddress(String str) {
571: if (str.startsWith("#")) {
572: address = str.substring(1);
573: isMCast = false;
574: } else {
575: address = str;
576: isMCast = true;
577: }
578: }
579:
580: /**
581: * Get the node address.
582: *
583: * @return node address in the form passed to the constructor
584: * {@link #JMS.JMSAddress(String, boolean)}.
585: */
586: public String getAddress() {
587: return address;
588: }
589:
590: /**
591: * Set the node address.
592: *
593: * @param address new node address.
594: */
595: public void setAddress(String address) {
596: this .address = address;
597: }
598:
599: /**
600: * Is the address a multicast address?
601: *
602: * @return <code>true</code> if the address is multicast address.
603: */
604: public boolean isMulticastAddress() {
605: return isMCast;
606: }
607:
608: public int size() {
609: return 22;
610: }
611:
612: /**
613: * Clone the object.
614: */
615: protected Object clone() throws CloneNotSupportedException {
616: return new JMSAddress(address, isMCast);
617: }
618:
619: /**
620: * Compare this object to <code>o</code>. It is possible to compare only
621: * addresses of the same class. Also they both should be either
622: * multicast or unicast addresses.
623: *
624: * @return value compliant with the {@link Comparable#compareTo(Object)}
625: * specififaction.
626: */
627: public int compareTo(Object o) throws ClassCastException {
628: if (!(o instanceof JMSAddress))
629: throw new ClassCastException(
630: "Cannot compare different classes.");
631:
632: JMSAddress that = (JMSAddress) o;
633:
634: if (that.isMCast != this .isMCast)
635: throw new ClassCastException(
636: "Addresses are different: one is multicast, and one is not");
637:
638: return this .address.compareTo(that.address);
639: }
640:
641: /**
642: * Test is this object is equal to <code>obj</code>.
643: *
644: * @return <code>true</code> iff the <code>obj</code> is
645: * <code>JMSAddress</code>, node addresses are equal and they both are
646: * either multicast or unicast addresses.
647: */
648: public boolean equals(Object obj) {
649: if (obj == this )
650: return true;
651:
652: if (!(obj instanceof JMSAddress))
653: return false;
654:
655: JMSAddress that = (JMSAddress) obj;
656:
657: if (this .isMCast)
658: return this .isMCast == that.isMCast;
659: else if (this .address == null || that.address == null)
660: return false;
661: else
662: return this .address.equals(that.address)
663: && this .isMCast == that.isMCast;
664: }
665:
666: /**
667: * Get the hash code of this address.
668: *
669: * @return hash code of this object.
670: */
671: public int hashCode() {
672: return toString().hashCode();
673: }
674:
675: /**
676: * Read object from external input.
677: */
678: public void readExternal(ObjectInput in) throws IOException,
679: ClassNotFoundException {
680: address = (String) in.readObject();
681: isMCast = in.readBoolean();
682: }
683:
684: /**
685: * Get the string representation of the address. The following property
686: * holds: <code>a2.equals(a1)</code> is always <code>true</code>, where
687: * <code>a2</code> is
688: * <code>JMSAddress a2 = new JMSAddress(a1.toString());</code>
689: *
690: * @return string representation of the address.
691: */
692: public String toString() {
693: return !isMCast ? '#' + address : address;
694: }
695:
696: /**
697: * Write the object to external output.
698: */
699: public void writeExternal(ObjectOutput out) throws IOException {
700: out.writeObject(address);
701: out.writeBoolean(isMCast);
702: }
703:
704: public void writeTo(DataOutputStream outstream)
705: throws IOException {
706: outstream.writeUTF(address);
707: outstream.writeBoolean(isMCast);
708: }
709:
710: public void readFrom(DataInputStream instream)
711: throws IOException, IllegalAccessException,
712: InstantiationException {
713: address = instream.readUTF();
714: isMCast = instream.readBoolean();
715: }
716: }
717:
718: }
|