Source Code Cross Referenced for JMSLinkProtocol.java in  » Science » Cougaar12_4 » org » cougaar » mts » jms » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Science » Cougaar12_4 » org.cougaar.mts.jms 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        /*
002:         * <copyright>
003:         *  
004:         *  Copyright 1997-2006 BBNT Solutions, LLC
005:         *  under sponsorship of the Defense Advanced Research Projects
006:         *  Agency (DARPA).
007:         * 
008:         *  You can redistribute this software and/or modify it under the
009:         *  terms of the Cougaar Open Source License as published on the
010:         *  Cougaar Open Source Website (www.cougaar.org).
011:         * 
012:         *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013:         *  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014:         *  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015:         *  A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016:         *  OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017:         *  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018:         *  LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019:         *  DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020:         *  THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021:         *  (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022:         *  OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023:         *  
024:         * </copyright>
025:         */
026:        package org.cougaar.mts.jms;
027:
028:        import java.net.URI;
029:        import java.net.URISyntaxException;
030:        import java.util.Hashtable;
031:        import java.util.Map;
032:
033:        import javax.jms.Connection;
034:        import javax.jms.ConnectionFactory;
035:        import javax.jms.DeliveryMode;
036:        import javax.jms.Destination;
037:        import javax.jms.ExceptionListener;
038:        import javax.jms.JMSException;
039:        import javax.jms.Message;
040:        import javax.jms.MessageConsumer;
041:        import javax.jms.MessageListener;
042:        import javax.jms.MessageProducer;
043:        import javax.jms.Session;
044:        import javax.naming.Context;
045:        import javax.naming.InitialContext;
046:        import javax.naming.NamingException;
047:
048:        import org.cougaar.bootstrap.SystemProperties;
049:        import org.cougaar.core.component.ServiceBroker;
050:        import org.cougaar.core.mts.MessageAddress;
051:        import org.cougaar.core.mts.MessageAttributes;
052:        import org.cougaar.mts.base.CommFailureException;
053:        import org.cougaar.mts.base.DestinationLink;
054:        import org.cougaar.mts.base.MessageDeliverer;
055:        import org.cougaar.mts.base.MisdeliveredMessageException;
056:        import org.cougaar.mts.base.NameLookupException;
057:        import org.cougaar.mts.base.RPCLinkProtocol;
058:        import org.cougaar.mts.base.UnregisteredNameException;
059:        import org.cougaar.mts.std.AttributedMessage;
060:
061:        /**
062:         * This class implements a Cougaar LinkProtocol that uses JMS as the transport.
063:         */
064:        public class JMSLinkProtocol extends RPCLinkProtocol implements 
065:                MessageListener {
066:            // TODO What is the advantage of using -D over plugin parameters.
067:            // Plugin parameters would allow multiple JMS protocol instances
068:            private static final String JMS_URL = SystemProperties
069:                    .getProperty("org.cougaar.mts.jms.url");
070:            private static final String JNDI_FACTORY = SystemProperties
071:                    .getProperty("org.cougaar.mts.jms.jndi.factory");
072:            private static final String JMS_FACTORY = SystemProperties
073:                    .getProperty("org.cougaar.mts.jms.factory");
074:            // TODO Weblogic specific code should be pulled out
075:            private static final String WEBLOGIC_SERVERNAME = SystemProperties
076:                    .getProperty("org.cougaar.mts.jms.weblogic.server");
077:
078:            // For now use the name server as a unique id of the society
079:            private static final String SOCIETY_UID = SystemProperties
080:                    .getProperty("org.cougaar.name.server");
081:
082:            // JNDI naming context to get JMS connection factory and destinations
083:            private Context context;
084:            // Connection factory for our JMS server
085:            private ConnectionFactory factory;
086:            // Connection to our JMS Server
087:            private Connection connection;
088:            // Session to our JMS Server
089:            private Session session;
090:            // Our JMS destination queue/topic for receiving messages.
091:            private Destination servantDestination;
092:            // manager for receiving messages
093:            private MessageReceiver receiver;
094:            // manager for sending messages and waiting for replys
095:            private ReplySync sync;
096:            // JMS Callback object to receive jms messages
097:            private MessageConsumer consumer;
098:            // JMS object for sending messages, not bound to a specific defination.
099:            private MessageProducer genericProducer; // shared for all outgoing
100:
101:            // messages
102:
103:            public void load() {
104:                super .load();
105:            }
106:
107:            protected int computeCost(AttributedMessage message) {
108:                // TODO Better cost function for JMS transport
109:                // TODO JAZ This might be the place to ensure the session and Servent
110:                // Be careful not to test on each call. if failed only test once per
111:                // retry period.
112:                // for non-infinite cost, our Servant up and remote destination
113:                // available
114:                return 1500;
115:            }
116:
117:            protected DestinationLink createDestinationLink(
118:                    MessageAddress address) {
119:                return new JMSLink(address);
120:            }
121:
122:            protected void fillContextProperties(Map<String, Object> properties) {
123:                properties.put(Context.INITIAL_CONTEXT_FACTORY, JNDI_FACTORY);
124:                properties.put(Context.PROVIDER_URL, JMS_URL);
125:            }
126:
127:            protected InitialContext makeInitialContext(
128:                    Hashtable<String, Object> properties)
129:                    throws NamingException {
130:                return new InitialContext(properties);
131:            }
132:
133:            protected Destination lookupDestinationInContext(
134:                    String destinationName) throws NamingException {
135:                Object raw = context.lookup(destinationName);
136:                if (raw instanceof  Destination)
137:                    return (Destination) raw;
138:                else
139:                    return null;
140:            }
141:
142:            protected void rebindDestinationInContext(String name,
143:                    Destination destination) throws NamingException {
144:                // Make a delegating Destination with extra fields
145:                context.rebind(name, destination);
146:            }
147:
148:            protected ConnectionFactory makeConnectionFactory()
149:                    throws NamingException {
150:                return (ConnectionFactory) context.lookup(JMS_FACTORY);
151:            }
152:
153:            protected Connection makeConnection() throws JMSException {
154:                return factory.createConnection();
155:            }
156:
157:            protected void makeSessionExceptionListener() throws JMSException {
158:                JMSExceptionListener exceptionListener = new JMSExceptionListener();
159:                connection.setExceptionListener(exceptionListener);
160:            }
161:
162:            protected Session makeSession() throws JMSException {
163:                return connection
164:                        .createSession(false, Session.AUTO_ACKNOWLEDGE);
165:            }
166:
167:            protected String getMyServantId(String node) {
168:                if (WEBLOGIC_SERVERNAME != null) {
169:                    return /* WEBLOGIC_SERVERNAME + "/" + */node;
170:                } else {
171:                    return node + "." + SOCIETY_UID;
172:                }
173:            }
174:
175:            protected MessageSender makeMessageSender(ReplySync replySync) {
176:                return new MessageSender(this , replySync);
177:            }
178:
179:            protected MessageReceiver makeMessageReceiver(ReplySync sync,
180:                    MessageDeliverer deliverer) {
181:                return new MessageReceiver(sync, deliverer);
182:            }
183:
184:            protected final ReplySync findOrMakeReplySync() {
185:                if (sync == null)
186:                    sync = makeReplySync();
187:                return sync;
188:            }
189:
190:            protected ReplySync makeReplySync() {
191:                return new ReplySync(this );
192:            }
193:
194:            protected Destination makeServantDestination(String myServantId)
195:                    throws JMSException, NamingException {
196:                Destination destination = session.createQueue(myServantId);
197:                rebindDestinationInContext(myServantId, destination);
198:                if (loggingService.isInfoEnabled())
199:                    loggingService.info("Made queue " + myServantId);
200:                return destination;
201:            }
202:
203:            protected Session ensureSession() {
204:                if (session == null) {
205:                    try {
206:                        Hashtable<String, Object> properties = new Hashtable<String, Object>();
207:                        fillContextProperties(properties);
208:                        context = makeInitialContext(properties);
209:                        factory = makeConnectionFactory();
210:                        connection = makeConnection();
211:                        makeSessionExceptionListener();
212:                        session = makeSession();
213:                        genericProducer = makeProducer(null);
214:                    } catch (NamingException e) {
215:                        if (loggingService.isWarnEnabled())
216:                            loggingService
217:                                    .warn("Couldn't get JMS session: Naming Cause="
218:                                            + e.getMessage());
219:                        session = null;
220:                    } catch (JMSException e) {
221:                        if (loggingService.isWarnEnabled())
222:                            loggingService
223:                                    .warn("Couldn't get JMS session: JMS Cause="
224:                                            + e.getMessage());
225:                        session = null;
226:                    }
227:                }
228:                return session;
229:            }
230:
231:            protected Context getContext() {
232:                return context;
233:            }
234:
235:            protected void closeContext() throws NamingException {
236:                if (context != null) {
237:                    try {
238:                        context.close();
239:                    } catch (NullPointerException e) {
240:                        // Don't care if context got set to null by another thread.
241:                    }
242:                    context = null;
243:                }
244:            }
245:
246:            protected ConnectionFactory getFactory() {
247:                return factory;
248:            }
249:
250:            protected Connection getConnection() {
251:                return connection;
252:            }
253:
254:            protected void closeConnection() throws JMSException {
255:                // Closing a contection also closes sessions, producers and consumers
256:                if (connection != null) {
257:                    try {
258:                        connection.close();
259:                    } catch (NullPointerException e) {
260:                        // Ignore these, it just means another thread
261:                        // already did the close
262:                    }
263:                    connection = null;
264:                }
265:            }
266:
267:            protected Session getSession() {
268:                return session;
269:            }
270:
271:            protected Destination getServant() {
272:                return servantDestination;
273:            }
274:
275:            protected void findOrMakeNodeServant() {
276:                if (servantDestination != null)
277:                    return;
278:                setNodeURI(null);
279:                ensureSession();
280:                if (session != null) {
281:                    String node = getNameSupport().getNodeMessageAddress()
282:                            .getAddress();
283:                    String myServantId = getMyServantId(node);
284:                    if (myServantId == null) {
285:                        if (loggingService.isWarnEnabled())
286:                            loggingService.warn("Servant Id not set");
287:                    }
288:
289:                    // Check for leftover queue, flush it manually
290:                    try {
291:                        Destination old = lookupDestinationInContext(myServantId);
292:                        if (old != null) {
293:                            if (loggingService.isInfoEnabled())
294:                                loggingService.info("Found old Queue");
295:                            servantDestination = old;
296:                            flushObsoleteMessages();
297:                        }
298:                    } catch (NamingException e1) {
299:                        if (loggingService.isInfoEnabled())
300:                            loggingService.info("Queue " + myServantId
301:                                    + " doesn't exist yet");
302:                    } catch (JMSException e) {
303:                        if (loggingService.isWarnEnabled())
304:                            loggingService
305:                                    .warn("Error flushing old message: Cause="
306:                                            + e.getMessage());
307:                    }
308:
309:                    try {
310:                        if (servantDestination == null) {
311:                            servantDestination = makeServantDestination(myServantId);
312:                        }
313:                        if (consumer != null) {
314:                            // Old listener from a previous session
315:                            try {
316:                                // unsubscribe out of date consumer.
317:                                closeConsumer(consumer);
318:                            } catch (Exception e) {
319:                                // JMS Errors here should logged but otherwise ignored
320:                                if (loggingService.isInfoEnabled())
321:                                    loggingService
322:                                            .info("Error closing old message listener: "
323:                                                    + e.getMessage());
324:                            }
325:                        }
326:                        consumer = makeMessageConsumer(session,
327:                                servantDestination, myServantId);
328:                        subscribeConsumer(consumer, this );
329:                        if (receiver == null) {
330:                            ServiceBroker sb = getServiceBroker();
331:                            MessageDeliverer deliverer = (MessageDeliverer) sb
332:                                    .getService(this , MessageDeliverer.class,
333:                                            null);
334:                            receiver = makeMessageReceiver(
335:                                    findOrMakeReplySync(), deliverer);
336:                        }
337:                        connection.start();
338:                        URI uri = makeURI(myServantId);
339:                        setNodeURI(uri);
340:                    } catch (JMSException e) {
341:                        if (loggingService.isWarnEnabled())
342:                            loggingService.warn("Couldn't make JMS queue "
343:                                    + e.getMessage());
344:                        releaseNodeServant();
345:                    } catch (URISyntaxException e) {
346:                        if (loggingService.isWarnEnabled())
347:                            loggingService.warn("Couldn't make JMS URI "
348:                                    + e.getMessage());
349:                        releaseNodeServant();
350:                    } catch (NamingException e) {
351:                        if (loggingService.isWarnEnabled())
352:                            loggingService
353:                                    .warn("Couldn't register JMS queue in jndi"
354:                                            + e.getMessage());
355:                        releaseNodeServant();
356:                    }
357:                }
358:            }
359:
360:            protected String getSelector(String myServantId) {
361:                return null;
362:            }
363:
364:            protected URI makeURI(String myServantId) throws URISyntaxException {
365:                return new URI("jms", myServantId, null, null, null);
366:            }
367:
368:            protected String extractDestinationName(URI ref) {
369:                return ref.getAuthority();
370:            }
371:
372:            protected MessageConsumer getConsumer() {
373:                return consumer;
374:            }
375:
376:            protected MessageConsumer makeMessageConsumer(Session session,
377:                    Destination destination, String ServantID)
378:                    throws JMSException {
379:                MessageConsumer consumer = session.createConsumer(destination);
380:                return consumer;
381:            }
382:
383:            // Utility close method
384:            protected void closeConsumer(MessageConsumer consumer)
385:                    throws JMSException {
386:                try {
387:                    consumer.setMessageListener(null);
388:                    consumer.close();
389:                } catch (NullPointerException e) {
390:                    // Don't care if consumer is set to null
391:                    // during this operation.
392:                }
393:            }
394:
395:            protected void subscribeConsumer(MessageConsumer consumer,
396:                    JMSLinkProtocol protocol) throws JMSException {
397:                consumer.setMessageListener(this );
398:            }
399:
400:            protected MessageProducer makeProducer(Destination destination)
401:                    throws JMSException {
402:                MessageProducer producer = session.createProducer(destination);
403:                producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
404:                return producer;
405:            }
406:
407:            protected MessageProducer getGenericProducer() {
408:                return genericProducer;
409:            }
410:
411:            protected void flushObsoleteMessages() throws JMSException {
412:                int flushCount = 0;
413:                MessageConsumer flush = makeMessageConsumer(session,
414:                        servantDestination, null);
415:                Object flushedMessage = flush.receiveNoWait();
416:                while (flushedMessage != null) {
417:                    flushCount += 1;
418:                    if (loggingService.isDebugEnabled())
419:                        loggingService.debug("Flushing old message "
420:                                + flushedMessage);
421:                    flushedMessage = flush.receiveNoWait();
422:                }
423:                if (loggingService.isInfoEnabled())
424:                    loggingService.info("Flushed " + flushCount
425:                            + " old messages ");
426:                flush.close();
427:            }
428:
429:            protected String getProtocolType() {
430:                return "-JMS";
431:            }
432:
433:            protected void releaseNodeServant() {
434:                // Tear down context->factory->connection->session->producers and
435:                // consumers
436:                if (loggingService.isInfoEnabled()) {
437:                    loggingService.warn("Releasing Servant");
438:                }
439:                // Closing connection closes session, producers, consummers, and
440:                // exception listener
441:                try {
442:                    closeConnection();
443:                } catch (JMSException e) {
444:                    if (loggingService.isWarnEnabled()) {
445:                        loggingService.warn("Problem Closing Connection: " + e);
446:                    }
447:                }
448:                try {
449:                    closeContext();
450:                } catch (NamingException e) {
451:                    if (loggingService.isWarnEnabled()) {
452:                        loggingService.warn("Problem Closing Context: " + e);
453:                    }
454:                }
455:                servantDestination = null;
456:                consumer = null;
457:                receiver = null;
458:                session = null;
459:                connection = null;
460:                context = null;
461:            }
462:
463:            private Object remakeLock = new Object();
464:            private boolean remakeInProgress = false;
465:
466:            // This method should only be runnable
467:            // in one thread at a time.  But the 
468:            // other calls can't block.  Instead
469:            // they return immediately.
470:            // TODO add a min retry period
471:            protected void remakeNodeServant() {
472:                synchronized (remakeLock) {
473:                    if (remakeInProgress) {
474:                        return;
475:                    }
476:                    remakeInProgress = true;
477:                }
478:                session = null;
479:                servantDestination = null;
480:                findOrMakeNodeServant();
481:                remakeInProgress = false;
482:            }
483:
484:            protected Boolean usesEncryptedSocket() {
485:                return Boolean.FALSE;
486:            }
487:
488:            // MessageListener
489:            public void onMessage(Message msg) {
490:                receiver.handleIncomingMessage(msg);
491:            }
492:
493:            protected boolean isServantAlive() {
494:                return super .isServantAlive() && session != null
495:                        && servantDestination != null && consumer != null
496:                        && receiver != null;
497:            }
498:
499:            protected class JMSExceptionListener implements  ExceptionListener {
500:                public void onException(JMSException ex) {
501:                    if (loggingService.isWarnEnabled())
502:                        loggingService.warn("JMS Connection error: Cause="
503:                                + ex.getMessage());
504:                    releaseNodeServant();
505:                }
506:            }
507:
508:            // MTS Station to send a message to a specific remote Agent
509:            // Even if multiple remote Agents are on the same Node, there will be one
510:            // instance per Agent
511:            public class JMSLink extends Link {
512:                private final MessageSender sender;
513:                protected URI uri;
514:
515:                protected JMSLink(MessageAddress addr) {
516:                    super (addr);
517:                    this .sender = makeMessageSender(findOrMakeReplySync());
518:                }
519:
520:                public boolean isValid() {
521:                    // Remake our servant if necessary. If that fails, the link is
522:                    // considered invalid,
523:                    // since the remote reference must be unreachable.
524:                    if (!isServantAlive()) {
525:                        remakeNodeServant();
526:                        if (!isServantAlive()) {
527:                            return false;
528:                        } else {
529:                            reregisterClients();
530:                        }
531:                    }
532:                    return super .isValid();
533:                }
534:
535:                protected Object decodeRemoteRef(URI ref) throws Exception {
536:                    if (ref == null) {
537:                        if (loggingService.isWarnEnabled())
538:                            loggingService.warn("Got null remote ref for "
539:                                    + getDestination());
540:                        return null;
541:                    }
542:                    if (session != null) {
543:                        String destinationName = extractDestinationName(ref);
544:                        if (loggingService.isInfoEnabled()) {
545:                            loggingService
546:                                    .info("Looking for Destination queue "
547:                                            + destinationName
548:                                            + " from reference " + ref);
549:                        }
550:                        try {
551:                            // TODO if JNDI server is down this will not work
552:                            // is test for null good enough
553:                            Destination d = lookupDestinationInContext(destinationName);
554:                            if (loggingService.isInfoEnabled())
555:                                loggingService.info("Got " + d);
556:                            this .uri = ref;
557:                            return d;
558:                        } catch (Exception e) {
559:                            if (loggingService.isWarnEnabled())
560:                                loggingService.warn("JNDI error: "
561:                                        + e.getMessage());
562:                            throw e;
563:                        }
564:                    }
565:                    return null;
566:                }
567:
568:                protected MessageAttributes forwardByProtocol(
569:                        Object destination, AttributedMessage message)
570:                        throws NameLookupException, UnregisteredNameException,
571:                        CommFailureException, MisdeliveredMessageException {
572:                    if (!(destination instanceof  Destination)) {
573:                        if (loggingService.isErrorEnabled())
574:                            loggingService.error(destination
575:                                    + " is not a javax.jmx.Destination");
576:                        return null;
577:                    }
578:                    try {
579:                        return sender.handleOutgoingMessage(uri,
580:                                (Destination) destination, message);
581:                    } catch (CommFailureException e1) {
582:                        decache();
583:                        throw e1;
584:                    } catch (MisdeliveredMessageException e2) {
585:                        decache();
586:                        throw e2;
587:                    } catch (Exception e3) {
588:                        decache();
589:                        throw new CommFailureException(e3);
590:                    }
591:                }
592:
593:                public Class getProtocolClass() {
594:                    return JMSLinkProtocol.this.getClass();
595:                }
596:
597:            }
598:
599:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.