001: /*
002: * <copyright>
003: *
004: * Copyright 1997-2006 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026: package org.cougaar.mts.jms;
027:
028: import java.net.URI;
029: import java.net.URISyntaxException;
030: import java.util.Hashtable;
031: import java.util.Map;
032:
033: import javax.jms.Connection;
034: import javax.jms.ConnectionFactory;
035: import javax.jms.DeliveryMode;
036: import javax.jms.Destination;
037: import javax.jms.ExceptionListener;
038: import javax.jms.JMSException;
039: import javax.jms.Message;
040: import javax.jms.MessageConsumer;
041: import javax.jms.MessageListener;
042: import javax.jms.MessageProducer;
043: import javax.jms.Session;
044: import javax.naming.Context;
045: import javax.naming.InitialContext;
046: import javax.naming.NamingException;
047:
048: import org.cougaar.bootstrap.SystemProperties;
049: import org.cougaar.core.component.ServiceBroker;
050: import org.cougaar.core.mts.MessageAddress;
051: import org.cougaar.core.mts.MessageAttributes;
052: import org.cougaar.mts.base.CommFailureException;
053: import org.cougaar.mts.base.DestinationLink;
054: import org.cougaar.mts.base.MessageDeliverer;
055: import org.cougaar.mts.base.MisdeliveredMessageException;
056: import org.cougaar.mts.base.NameLookupException;
057: import org.cougaar.mts.base.RPCLinkProtocol;
058: import org.cougaar.mts.base.UnregisteredNameException;
059: import org.cougaar.mts.std.AttributedMessage;
060:
061: /**
062: * This class implements a Cougaar LinkProtocol that uses JMS as the transport.
063: */
064: public class JMSLinkProtocol extends RPCLinkProtocol implements
065: MessageListener {
066: // TODO What is the advantage of using -D over plugin parameters.
067: // Plugin parameters would allow multiple JMS protocol instances
068: private static final String JMS_URL = SystemProperties
069: .getProperty("org.cougaar.mts.jms.url");
070: private static final String JNDI_FACTORY = SystemProperties
071: .getProperty("org.cougaar.mts.jms.jndi.factory");
072: private static final String JMS_FACTORY = SystemProperties
073: .getProperty("org.cougaar.mts.jms.factory");
074: // TODO Weblogic specific code should be pulled out
075: private static final String WEBLOGIC_SERVERNAME = SystemProperties
076: .getProperty("org.cougaar.mts.jms.weblogic.server");
077:
078: // For now use the name server as a unique id of the society
079: private static final String SOCIETY_UID = SystemProperties
080: .getProperty("org.cougaar.name.server");
081:
082: // JNDI naming context to get JMS connection factory and destinations
083: private Context context;
084: // Connection factory for our JMS server
085: private ConnectionFactory factory;
086: // Connection to our JMS Server
087: private Connection connection;
088: // Session to our JMS Server
089: private Session session;
090: // Our JMS destination queue/topic for receiving messages.
091: private Destination servantDestination;
092: // manager for receiving messages
093: private MessageReceiver receiver;
094: // manager for sending messages and waiting for replys
095: private ReplySync sync;
096: // JMS Callback object to receive jms messages
097: private MessageConsumer consumer;
098: // JMS object for sending messages, not bound to a specific defination.
099: private MessageProducer genericProducer; // shared for all outgoing
100:
101: // messages
102:
103: public void load() {
104: super .load();
105: }
106:
107: protected int computeCost(AttributedMessage message) {
108: // TODO Better cost function for JMS transport
109: // TODO JAZ This might be the place to ensure the session and Servent
110: // Be careful not to test on each call. if failed only test once per
111: // retry period.
112: // for non-infinite cost, our Servant up and remote destination
113: // available
114: return 1500;
115: }
116:
117: protected DestinationLink createDestinationLink(
118: MessageAddress address) {
119: return new JMSLink(address);
120: }
121:
122: protected void fillContextProperties(Map<String, Object> properties) {
123: properties.put(Context.INITIAL_CONTEXT_FACTORY, JNDI_FACTORY);
124: properties.put(Context.PROVIDER_URL, JMS_URL);
125: }
126:
127: protected InitialContext makeInitialContext(
128: Hashtable<String, Object> properties)
129: throws NamingException {
130: return new InitialContext(properties);
131: }
132:
133: protected Destination lookupDestinationInContext(
134: String destinationName) throws NamingException {
135: Object raw = context.lookup(destinationName);
136: if (raw instanceof Destination)
137: return (Destination) raw;
138: else
139: return null;
140: }
141:
142: protected void rebindDestinationInContext(String name,
143: Destination destination) throws NamingException {
144: // Make a delegating Destination with extra fields
145: context.rebind(name, destination);
146: }
147:
148: protected ConnectionFactory makeConnectionFactory()
149: throws NamingException {
150: return (ConnectionFactory) context.lookup(JMS_FACTORY);
151: }
152:
153: protected Connection makeConnection() throws JMSException {
154: return factory.createConnection();
155: }
156:
157: protected void makeSessionExceptionListener() throws JMSException {
158: JMSExceptionListener exceptionListener = new JMSExceptionListener();
159: connection.setExceptionListener(exceptionListener);
160: }
161:
162: protected Session makeSession() throws JMSException {
163: return connection
164: .createSession(false, Session.AUTO_ACKNOWLEDGE);
165: }
166:
167: protected String getMyServantId(String node) {
168: if (WEBLOGIC_SERVERNAME != null) {
169: return /* WEBLOGIC_SERVERNAME + "/" + */node;
170: } else {
171: return node + "." + SOCIETY_UID;
172: }
173: }
174:
175: protected MessageSender makeMessageSender(ReplySync replySync) {
176: return new MessageSender(this , replySync);
177: }
178:
179: protected MessageReceiver makeMessageReceiver(ReplySync sync,
180: MessageDeliverer deliverer) {
181: return new MessageReceiver(sync, deliverer);
182: }
183:
184: protected final ReplySync findOrMakeReplySync() {
185: if (sync == null)
186: sync = makeReplySync();
187: return sync;
188: }
189:
190: protected ReplySync makeReplySync() {
191: return new ReplySync(this );
192: }
193:
194: protected Destination makeServantDestination(String myServantId)
195: throws JMSException, NamingException {
196: Destination destination = session.createQueue(myServantId);
197: rebindDestinationInContext(myServantId, destination);
198: if (loggingService.isInfoEnabled())
199: loggingService.info("Made queue " + myServantId);
200: return destination;
201: }
202:
203: protected Session ensureSession() {
204: if (session == null) {
205: try {
206: Hashtable<String, Object> properties = new Hashtable<String, Object>();
207: fillContextProperties(properties);
208: context = makeInitialContext(properties);
209: factory = makeConnectionFactory();
210: connection = makeConnection();
211: makeSessionExceptionListener();
212: session = makeSession();
213: genericProducer = makeProducer(null);
214: } catch (NamingException e) {
215: if (loggingService.isWarnEnabled())
216: loggingService
217: .warn("Couldn't get JMS session: Naming Cause="
218: + e.getMessage());
219: session = null;
220: } catch (JMSException e) {
221: if (loggingService.isWarnEnabled())
222: loggingService
223: .warn("Couldn't get JMS session: JMS Cause="
224: + e.getMessage());
225: session = null;
226: }
227: }
228: return session;
229: }
230:
231: protected Context getContext() {
232: return context;
233: }
234:
235: protected void closeContext() throws NamingException {
236: if (context != null) {
237: try {
238: context.close();
239: } catch (NullPointerException e) {
240: // Don't care if context got set to null by another thread.
241: }
242: context = null;
243: }
244: }
245:
246: protected ConnectionFactory getFactory() {
247: return factory;
248: }
249:
250: protected Connection getConnection() {
251: return connection;
252: }
253:
254: protected void closeConnection() throws JMSException {
255: // Closing a contection also closes sessions, producers and consumers
256: if (connection != null) {
257: try {
258: connection.close();
259: } catch (NullPointerException e) {
260: // Ignore these, it just means another thread
261: // already did the close
262: }
263: connection = null;
264: }
265: }
266:
267: protected Session getSession() {
268: return session;
269: }
270:
271: protected Destination getServant() {
272: return servantDestination;
273: }
274:
275: protected void findOrMakeNodeServant() {
276: if (servantDestination != null)
277: return;
278: setNodeURI(null);
279: ensureSession();
280: if (session != null) {
281: String node = getNameSupport().getNodeMessageAddress()
282: .getAddress();
283: String myServantId = getMyServantId(node);
284: if (myServantId == null) {
285: if (loggingService.isWarnEnabled())
286: loggingService.warn("Servant Id not set");
287: }
288:
289: // Check for leftover queue, flush it manually
290: try {
291: Destination old = lookupDestinationInContext(myServantId);
292: if (old != null) {
293: if (loggingService.isInfoEnabled())
294: loggingService.info("Found old Queue");
295: servantDestination = old;
296: flushObsoleteMessages();
297: }
298: } catch (NamingException e1) {
299: if (loggingService.isInfoEnabled())
300: loggingService.info("Queue " + myServantId
301: + " doesn't exist yet");
302: } catch (JMSException e) {
303: if (loggingService.isWarnEnabled())
304: loggingService
305: .warn("Error flushing old message: Cause="
306: + e.getMessage());
307: }
308:
309: try {
310: if (servantDestination == null) {
311: servantDestination = makeServantDestination(myServantId);
312: }
313: if (consumer != null) {
314: // Old listener from a previous session
315: try {
316: // unsubscribe out of date consumer.
317: closeConsumer(consumer);
318: } catch (Exception e) {
319: // JMS Errors here should logged but otherwise ignored
320: if (loggingService.isInfoEnabled())
321: loggingService
322: .info("Error closing old message listener: "
323: + e.getMessage());
324: }
325: }
326: consumer = makeMessageConsumer(session,
327: servantDestination, myServantId);
328: subscribeConsumer(consumer, this );
329: if (receiver == null) {
330: ServiceBroker sb = getServiceBroker();
331: MessageDeliverer deliverer = (MessageDeliverer) sb
332: .getService(this , MessageDeliverer.class,
333: null);
334: receiver = makeMessageReceiver(
335: findOrMakeReplySync(), deliverer);
336: }
337: connection.start();
338: URI uri = makeURI(myServantId);
339: setNodeURI(uri);
340: } catch (JMSException e) {
341: if (loggingService.isWarnEnabled())
342: loggingService.warn("Couldn't make JMS queue "
343: + e.getMessage());
344: releaseNodeServant();
345: } catch (URISyntaxException e) {
346: if (loggingService.isWarnEnabled())
347: loggingService.warn("Couldn't make JMS URI "
348: + e.getMessage());
349: releaseNodeServant();
350: } catch (NamingException e) {
351: if (loggingService.isWarnEnabled())
352: loggingService
353: .warn("Couldn't register JMS queue in jndi"
354: + e.getMessage());
355: releaseNodeServant();
356: }
357: }
358: }
359:
360: protected String getSelector(String myServantId) {
361: return null;
362: }
363:
364: protected URI makeURI(String myServantId) throws URISyntaxException {
365: return new URI("jms", myServantId, null, null, null);
366: }
367:
368: protected String extractDestinationName(URI ref) {
369: return ref.getAuthority();
370: }
371:
372: protected MessageConsumer getConsumer() {
373: return consumer;
374: }
375:
376: protected MessageConsumer makeMessageConsumer(Session session,
377: Destination destination, String ServantID)
378: throws JMSException {
379: MessageConsumer consumer = session.createConsumer(destination);
380: return consumer;
381: }
382:
383: // Utility close method
384: protected void closeConsumer(MessageConsumer consumer)
385: throws JMSException {
386: try {
387: consumer.setMessageListener(null);
388: consumer.close();
389: } catch (NullPointerException e) {
390: // Don't care if consumer is set to null
391: // during this operation.
392: }
393: }
394:
395: protected void subscribeConsumer(MessageConsumer consumer,
396: JMSLinkProtocol protocol) throws JMSException {
397: consumer.setMessageListener(this );
398: }
399:
400: protected MessageProducer makeProducer(Destination destination)
401: throws JMSException {
402: MessageProducer producer = session.createProducer(destination);
403: producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
404: return producer;
405: }
406:
407: protected MessageProducer getGenericProducer() {
408: return genericProducer;
409: }
410:
411: protected void flushObsoleteMessages() throws JMSException {
412: int flushCount = 0;
413: MessageConsumer flush = makeMessageConsumer(session,
414: servantDestination, null);
415: Object flushedMessage = flush.receiveNoWait();
416: while (flushedMessage != null) {
417: flushCount += 1;
418: if (loggingService.isDebugEnabled())
419: loggingService.debug("Flushing old message "
420: + flushedMessage);
421: flushedMessage = flush.receiveNoWait();
422: }
423: if (loggingService.isInfoEnabled())
424: loggingService.info("Flushed " + flushCount
425: + " old messages ");
426: flush.close();
427: }
428:
429: protected String getProtocolType() {
430: return "-JMS";
431: }
432:
433: protected void releaseNodeServant() {
434: // Tear down context->factory->connection->session->producers and
435: // consumers
436: if (loggingService.isInfoEnabled()) {
437: loggingService.warn("Releasing Servant");
438: }
439: // Closing connection closes session, producers, consummers, and
440: // exception listener
441: try {
442: closeConnection();
443: } catch (JMSException e) {
444: if (loggingService.isWarnEnabled()) {
445: loggingService.warn("Problem Closing Connection: " + e);
446: }
447: }
448: try {
449: closeContext();
450: } catch (NamingException e) {
451: if (loggingService.isWarnEnabled()) {
452: loggingService.warn("Problem Closing Context: " + e);
453: }
454: }
455: servantDestination = null;
456: consumer = null;
457: receiver = null;
458: session = null;
459: connection = null;
460: context = null;
461: }
462:
463: private Object remakeLock = new Object();
464: private boolean remakeInProgress = false;
465:
466: // This method should only be runnable
467: // in one thread at a time. But the
468: // other calls can't block. Instead
469: // they return immediately.
470: // TODO add a min retry period
471: protected void remakeNodeServant() {
472: synchronized (remakeLock) {
473: if (remakeInProgress) {
474: return;
475: }
476: remakeInProgress = true;
477: }
478: session = null;
479: servantDestination = null;
480: findOrMakeNodeServant();
481: remakeInProgress = false;
482: }
483:
484: protected Boolean usesEncryptedSocket() {
485: return Boolean.FALSE;
486: }
487:
488: // MessageListener
489: public void onMessage(Message msg) {
490: receiver.handleIncomingMessage(msg);
491: }
492:
493: protected boolean isServantAlive() {
494: return super .isServantAlive() && session != null
495: && servantDestination != null && consumer != null
496: && receiver != null;
497: }
498:
499: protected class JMSExceptionListener implements ExceptionListener {
500: public void onException(JMSException ex) {
501: if (loggingService.isWarnEnabled())
502: loggingService.warn("JMS Connection error: Cause="
503: + ex.getMessage());
504: releaseNodeServant();
505: }
506: }
507:
508: // MTS Station to send a message to a specific remote Agent
509: // Even if multiple remote Agents are on the same Node, there will be one
510: // instance per Agent
511: public class JMSLink extends Link {
512: private final MessageSender sender;
513: protected URI uri;
514:
515: protected JMSLink(MessageAddress addr) {
516: super (addr);
517: this .sender = makeMessageSender(findOrMakeReplySync());
518: }
519:
520: public boolean isValid() {
521: // Remake our servant if necessary. If that fails, the link is
522: // considered invalid,
523: // since the remote reference must be unreachable.
524: if (!isServantAlive()) {
525: remakeNodeServant();
526: if (!isServantAlive()) {
527: return false;
528: } else {
529: reregisterClients();
530: }
531: }
532: return super .isValid();
533: }
534:
535: protected Object decodeRemoteRef(URI ref) throws Exception {
536: if (ref == null) {
537: if (loggingService.isWarnEnabled())
538: loggingService.warn("Got null remote ref for "
539: + getDestination());
540: return null;
541: }
542: if (session != null) {
543: String destinationName = extractDestinationName(ref);
544: if (loggingService.isInfoEnabled()) {
545: loggingService
546: .info("Looking for Destination queue "
547: + destinationName
548: + " from reference " + ref);
549: }
550: try {
551: // TODO if JNDI server is down this will not work
552: // is test for null good enough
553: Destination d = lookupDestinationInContext(destinationName);
554: if (loggingService.isInfoEnabled())
555: loggingService.info("Got " + d);
556: this .uri = ref;
557: return d;
558: } catch (Exception e) {
559: if (loggingService.isWarnEnabled())
560: loggingService.warn("JNDI error: "
561: + e.getMessage());
562: throw e;
563: }
564: }
565: return null;
566: }
567:
568: protected MessageAttributes forwardByProtocol(
569: Object destination, AttributedMessage message)
570: throws NameLookupException, UnregisteredNameException,
571: CommFailureException, MisdeliveredMessageException {
572: if (!(destination instanceof Destination)) {
573: if (loggingService.isErrorEnabled())
574: loggingService.error(destination
575: + " is not a javax.jmx.Destination");
576: return null;
577: }
578: try {
579: return sender.handleOutgoingMessage(uri,
580: (Destination) destination, message);
581: } catch (CommFailureException e1) {
582: decache();
583: throw e1;
584: } catch (MisdeliveredMessageException e2) {
585: decache();
586: throw e2;
587: } catch (Exception e3) {
588: decache();
589: throw new CommFailureException(e3);
590: }
591: }
592:
593: public Class getProtocolClass() {
594: return JMSLinkProtocol.this.getClass();
595: }
596:
597: }
598:
599: }
|