Source Code Cross Referenced for JMSSessionFactory.java in  » ESB » celtix-1.0 » org » objectweb » celtix » bus » transports » jms » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » ESB » celtix 1.0 » org.objectweb.celtix.bus.transports.jms 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        package org.objectweb.celtix.bus.transports.jms;
002:
003:        import java.net.InetAddress;
004:        import java.net.UnknownHostException;
005:        import java.util.Calendar;
006:        import java.util.logging.Level;
007:        import java.util.logging.Logger;
008:
009:        import javax.jms.Connection;
010:        import javax.jms.Destination;
011:        import javax.jms.JMSException;
012:        import javax.jms.MessageConsumer;
013:        import javax.jms.Queue;
014:        import javax.jms.QueueConnection;
015:        import javax.jms.QueueSession;
016:        import javax.jms.Session;
017:        import javax.jms.Topic;
018:        import javax.jms.TopicConnection;
019:        import javax.jms.TopicSession;
020:        import javax.jms.TopicSubscriber;
021:        import javax.naming.Context;
022:        import javax.naming.NamingException;
023:
024:        import org.objectweb.celtix.common.logging.LogUtils;
025:        import org.objectweb.celtix.common.util.AbstractTwoStageCache;
026:        import org.objectweb.celtix.transports.jms.JMSAddressPolicyType;
027:        import org.objectweb.celtix.transports.jms.JMSServerBehaviorPolicyType;
028:
029:        /**
030:         * This class encapsulates the creation and pooling logic for JMS Sessions.
031:         * The usage patterns for sessions, producers & consumers are as follows ...
032:         * <p>
033:         * client-side: an invoking thread requires relatively short-term exclusive
034:         * use of a session, an unidentified producer to send the request message,
035:         * and in the point-to-point domain a consumer for the temporary ReplyTo
036:         * destination to synchronously receive the reply if the operation is twoway
037:         * (in the pub-sub domain only oneway operations are supported, so a there
038:         * is never a requirement for a reply destination)
039:         * <p>
040:         * server-side receive: each port based on <jms:address> requires relatively
041:         * long-term exclusive use of a session, a consumer with a MessageListener for
042:         * the JMS destination specified for the port, and an unidentified producer
043:         * to send the request message
044:         * <p>
045:         * server-side send: each dispatch of a twoway request requires relatively
046:         * short-term exclusive use of a session and an indentified producer (but
047:         * not a consumer) - note that the session used for the recieve side cannot
048:         * be re-used for the send, as MessageListener usage precludes any synchronous
049:         * sends or receives on that session
050:         * <p>
051:         * So on the client-side, pooling of sessions is bound up with pooling
052:         * of temporary reply destinations, whereas on the server receive side
053:         * the benefit of pooling is marginal as the session is required from
054:         * the point at which the port was activated until the Bus is shutdown
055:         * The server send side resembles the client side,
056:         * except that a consumer for the temporary destination is never required.
057:         * Hence different pooling strategies make sense ...
058:         * <p>
059:         * client-side: a SoftReference-based cache of send/receive sessions is
060:         * maintained containing an aggregate of a session, indentified producer,
061:         * temporary reply destination & consumer for same
062:         * <p>
063:         * server-side receive: as sessions cannot be usefully recycled, they are
064:         * simply created on demand and closed when no longer required
065:         * <p>
066:         * server-side send: a SoftReference-based cache of send-only sessions is
067:         * maintained containing an aggregate of a session and an indentified producer
068:         * <p>
069:         * In a pure client or pure server, only a single cache is ever
070:         * populated.  Where client and server logic is co-located, a client
071:         * session retrieval for a twoway invocation checks the reply-capable
072:         * cache first and then the send-only cache - if a session is
073:         * available in the later then its used after a tempory destination is
074:         * created before being recycled back into the reply-capable cache. A
075:         * server send side retrieval or client retrieval for a oneway
076:         * invocation checks the send-only cache first and then the
077:         * reply-capable cache - if a session is available in the later then
078:         * its used and the tempory destination is ignored. So in the
079:         * co-located case, sessions migrate from the send-only cache to the
080:         * reply-capable cache as necessary.
081:         * <p>
082:         *
083:         * @author Eoghan Glynn
084:         */
085:        public class JMSSessionFactory {
086:
087:            private static final int CACHE_HIGH_WATER_MARK = 500;
088:            private static final Logger LOG = LogUtils
089:                    .getL7dLogger(JMSSessionFactory.class);
090:            private static final int PRIMARY_CACHE_MAX = 20;
091:
092:            private final Context initialContext;
093:            private final Connection theConnection;
094:            private AbstractTwoStageCache<PooledSession> replyCapableSessionCache;
095:            private AbstractTwoStageCache<PooledSession> sendOnlySessionCache;
096:            private final Destination theReplyDestination;
097:            private final boolean isQueueConnection;
098:
099:            private final JMSAddressPolicyType addressExtensor;
100:            private final JMSServerBehaviorPolicyType jmsServerPolicy;
101:
102:            /**
103:             * Constructor.
104:             *
105:             * @param connection the shared {Queue|Topic}Connection
106:             */
107:            public JMSSessionFactory(Connection connection,
108:                    Destination replyDestination, JMSAddressPolicyType addrExt,
109:                    JMSServerBehaviorPolicyType serverPolicy, Context context) {
110:                theConnection = connection;
111:                theReplyDestination = replyDestination;
112:                addressExtensor = addrExt;
113:                isQueueConnection = addressExtensor.getDestinationStyle()
114:                        .value().equals(JMSConstants.JMS_QUEUE);
115:                jmsServerPolicy = serverPolicy;
116:                initialContext = context;
117:
118:                // create session caches (REVISIT sizes should be configurable)
119:                //
120:                if (isQueueConnection) {
121:                    // the reply capable cache is only required in the point-to-point
122:                    // domain
123:                    //
124:                    replyCapableSessionCache = new AbstractTwoStageCache<PooledSession>(
125:                            PRIMARY_CACHE_MAX, CACHE_HIGH_WATER_MARK, 0, this ) {
126:                        public final PooledSession create() throws JMSException {
127:                            return createPointToPointReplyCapableSession();
128:                        }
129:                    };
130:
131:                    try {
132:                        replyCapableSessionCache.populateCache();
133:                    } catch (Throwable t) {
134:                        LOG.log(Level.FINE,
135:                                "JMS Session cache populate failed: " + t);
136:                    }
137:
138:                    // send-only cache for point-to-point oneway requests and replies
139:                    //
140:                    sendOnlySessionCache = new AbstractTwoStageCache<PooledSession>(
141:                            PRIMARY_CACHE_MAX, CACHE_HIGH_WATER_MARK, 0, this ) {
142:                        public final PooledSession create() throws JMSException {
143:                            return createPointToPointSendOnlySession();
144:                        }
145:                    };
146:
147:                    try {
148:                        sendOnlySessionCache.populateCache();
149:                    } catch (Throwable t) {
150:                        LOG.log(Level.FINE,
151:                                "JMS Session cache populate failed: " + t);
152:                    }
153:                } else {
154:                    // send-only cache for pub-sub oneway requests
155:                    //
156:                    sendOnlySessionCache = new AbstractTwoStageCache<PooledSession>(
157:                            PRIMARY_CACHE_MAX, CACHE_HIGH_WATER_MARK, 0, this ) {
158:                        public final PooledSession create() throws JMSException {
159:                            return createPubSubSession(true, false, null);
160:                        }
161:                    };
162:
163:                    try {
164:                        sendOnlySessionCache.populateCache();
165:                    } catch (Throwable t) {
166:                        LOG.log(Level.FINE,
167:                                "JMS Session cache populate failed: " + t);
168:                    }
169:                }
170:            }
171:
172:            //--java.lang.Object Overrides----------------------------------------------
173:            public String toString() {
174:                return "JMSSessionFactory";
175:            }
176:
177:            //--Methods-----------------------------------------------------------------
178:            protected Connection getConnection() {
179:                return theConnection;
180:            }
181:
182:            public Queue getQueueFromInitialContext(String queueName)
183:                    throws NamingException {
184:                return (Queue) initialContext.lookup(queueName);
185:            }
186:
187:            public PooledSession get(boolean replyCapable) throws JMSException {
188:                return get(null, replyCapable);
189:            }
190:
191:            /**
192:             * Retrieve a new or cached Session.
193:             * @param replyDest Destination name if coming from wsa:Header
194:             * @param replyCapable true iff the session is to be used to receive replies
195:             * (implies client side twoway invocation )
196:             * @return a new or cached Session
197:             */
198:            public PooledSession get(Destination replyDest, boolean replyCapable)
199:                    throws JMSException {
200:                PooledSession ret = null;
201:
202:                synchronized (this ) {
203:                    if (replyCapable) {
204:                        // first try reply capable cache
205:                        //
206:                        ret = replyCapableSessionCache.poll();
207:
208:                        if (ret == null) {
209:                            // fall back to send only cache, creating temporary reply
210:                            // queue and consumer
211:                            //
212:                            ret = sendOnlySessionCache.poll();
213:
214:                            if (ret != null) {
215:                                QueueSession session = (QueueSession) ret
216:                                        .session();
217:                                Queue destination = null;
218:                                String selector = null;
219:
220:                                if (null != theReplyDestination
221:                                        || null != replyDest) {
222:                                    destination = null != replyDest ? (Queue) replyDest
223:                                            : (Queue) theReplyDestination;
224:
225:                                    selector = "JMSCorrelationID = '"
226:                                            + generateUniqueSelector(ret) + "'";
227:                                }
228:
229:                                ret.destination(destination);
230:                                MessageConsumer consumer = session
231:                                        .createReceiver(destination, selector);
232:                                ret.consumer(consumer);
233:                            } else {
234:                                // no pooled session available in either cache => create one in
235:                                // in the reply capable cache
236:                                //
237:                                try {
238:                                    ret = replyCapableSessionCache.get();
239:                                } catch (Throwable t) {
240:                                    // factory method may only throw JMSException
241:                                    //
242:                                    throw (JMSException) t;
243:                                }
244:                            }
245:                        }
246:                    } else {
247:                        // first try send only cache
248:                        //
249:                        ret = sendOnlySessionCache.poll();
250:
251:                        if (ret == null) {
252:                            // fall back to reply capable cache if one exists (only in the
253:                            // point-to-point domain), ignoring temporary reply destination
254:                            // and consumer
255:                            //
256:                            if (replyCapableSessionCache != null) {
257:                                ret = replyCapableSessionCache.poll();
258:                            }
259:
260:                            if (ret == null) {
261:                                // no pooled session available in either cache => create one in
262:                                // in the send only cache
263:                                //
264:                                try {
265:                                    ret = sendOnlySessionCache.get();
266:                                } catch (Throwable t) {
267:                                    // factory method may only throw JMSException
268:                                    //
269:                                    throw (JMSException) t;
270:                                }
271:                            }
272:                        }
273:                    }
274:                }
275:
276:                return ret;
277:            }
278:
279:            /**
280:             * Retrieve a new
281:             *
282:             * @param destination the target JMS queue or topic (non-null implies
283:             * server receive side)
284:             * @return a new or cached Session
285:             */
286:            public PooledSession get(Destination destination)
287:                    throws JMSException {
288:                PooledSession ret = null;
289:
290:                // the destination is only specified on the server receive side,
291:                // in which case a new session is always created
292:                //
293:                if (isQueueConnection) {
294:                    ret = createPointToPointServerSession(destination);
295:                } else {
296:                    ret = createPubSubSession(false, true, destination);
297:                }
298:
299:                return ret;
300:            }
301:
302:            /**
303:             * Return a Session to the pool
304:             *
305:             * @param pooled_session the session to recycle
306:             */
307:            public void recycle(PooledSession pooledSession) {
308:                // sessions used long-term by the server receive side are not cached,
309:                // only non-null destinations are temp queues
310:                final boolean replyCapable = pooledSession.destination() != null;
311:                boolean discard = false;
312:
313:                synchronized (this ) {
314:                    // re-cache session, closing if it cannot be it can be accomodated
315:                    //
316:                    discard = replyCapable ? (!replyCapableSessionCache
317:                            .recycle(pooledSession)) : (!sendOnlySessionCache
318:                            .recycle(pooledSession));
319:                }
320:
321:                if (discard) {
322:                    try {
323:                        pooledSession.close();
324:                    } catch (JMSException e) {
325:                        LOG.log(Level.WARNING, "JMS Session discard failed: "
326:                                + e);
327:                    }
328:                }
329:            }
330:
331:            /**
332:             * Shutdown the session factory.
333:             */
334:            public void shutdown() {
335:                try {
336:                    PooledSession curr;
337:
338:                    if (replyCapableSessionCache != null) {
339:                        curr = replyCapableSessionCache.poll();
340:                        while (curr != null) {
341:                            curr.close();
342:                            curr = replyCapableSessionCache.poll();
343:                        }
344:                    }
345:
346:                    if (sendOnlySessionCache != null) {
347:                        curr = sendOnlySessionCache.poll();
348:                        while (curr != null) {
349:                            curr.close();
350:                            curr = sendOnlySessionCache.poll();
351:                        }
352:                    }
353:
354:                    theConnection.close();
355:                } catch (JMSException e) {
356:                    LOG.log(Level.WARNING, "queue connection close failed: "
357:                            + e);
358:                }
359:
360:                // help GC
361:                //
362:                replyCapableSessionCache = null;
363:                sendOnlySessionCache = null;
364:            }
365:
366:            /**
367:             * Helper method to create a point-to-point pooled session.
368:             *
369:             * @param producer true iff producing
370:             * @param consumer true iff consuming
371:             * @param destination the target destination
372:             * @return an appropriate pooled session
373:             */
374:            PooledSession createPointToPointReplyCapableSession()
375:                    throws JMSException {
376:                QueueSession session = ((QueueConnection) theConnection)
377:                        .createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
378:                Destination destination = null;
379:                String selector = null;
380:
381:                if (null != theReplyDestination) {
382:                    destination = theReplyDestination;
383:
384:                    selector = "JMSCorrelationID = '"
385:                            + generateUniqueSelector(session) + "'";
386:
387:                } else {
388:                    destination = session.createTemporaryQueue();
389:                }
390:
391:                MessageConsumer consumer = session.createReceiver(
392:                        (Queue) destination, selector);
393:                return new PooledSession(session, destination, session
394:                        .createSender(null), consumer);
395:            }
396:
397:            /**
398:             * Helper method to create a point-to-point pooled session.
399:             *
400:             * @return an appropriate pooled session
401:             */
402:            PooledSession createPointToPointSendOnlySession()
403:                    throws JMSException {
404:                QueueSession session = ((QueueConnection) theConnection)
405:                        .createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
406:
407:                return new PooledSession(session, null, session
408:                        .createSender(null), null);
409:            }
410:
411:            /**
412:             * Helper method to create a point-to-point pooled session for consumer only.
413:             *
414:             * @param destination the target destination
415:             * @return an appropriate pooled session
416:             */
417:            private PooledSession createPointToPointServerSession(
418:                    Destination destination) throws JMSException {
419:                QueueSession session = ((QueueConnection) theConnection)
420:                        .createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
421:
422:                return new PooledSession(session, destination, session
423:                        .createSender(null), session.createReceiver(
424:                        (Queue) destination, jmsServerPolicy
425:                                .getMessageSelector()));
426:            }
427:
428:            /**
429:             * Helper method to create a pub-sub pooled session.
430:             *
431:             * @param producer true iff producing
432:             * @param consumer true iff consuming
433:             * @param destination the target destination
434:             * @return an appropriate pooled session
435:             */
436:            PooledSession createPubSubSession(boolean producer,
437:                    boolean consumer, Destination destination)
438:                    throws JMSException {
439:                TopicSession session = ((TopicConnection) theConnection)
440:                        .createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
441:                TopicSubscriber sub = null;
442:                if (consumer) {
443:                    String messageSelector = jmsServerPolicy
444:                            .getMessageSelector();
445:                    String durableName = jmsServerPolicy
446:                            .getDurableSubscriberName();
447:                    if (durableName != null) {
448:                        sub = session.createDurableSubscriber(
449:                                (Topic) destination, durableName,
450:                                messageSelector, false);
451:                    } else {
452:                        sub = session.createSubscriber((Topic) destination,
453:                                messageSelector, false);
454:                    }
455:                }
456:
457:                return new PooledSession(session, null, producer ? session
458:                        .createPublisher(null) : null, sub);
459:            }
460:
461:            private String generateUniqueSelector(Object obj) {
462:                String host = "localhost";
463:
464:                try {
465:                    InetAddress addr = InetAddress.getLocalHost();
466:                    host = addr.getHostName();
467:                } catch (UnknownHostException ukex) {
468:                    //Default to localhost.
469:                }
470:
471:                long time = Calendar.getInstance().getTimeInMillis();
472:                return host + "_" + System.getProperty("user.name") + "_" + obj
473:                        + time;
474:            }
475:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.