Source Code Cross Referenced for JMSSessionFactory.java in  » Web-Services-apache-cxf-2.0.1 » transports » org » apache » cxf » transport » jms » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Web Services apache cxf 2.0.1 » transports » org.apache.cxf.transport.jms 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        /**
002:         * Licensed to the Apache Software Foundation (ASF) under one
003:         * or more contributor license agreements. See the NOTICE file
004:         * distributed with this work for additional information
005:         * regarding copyright ownership. The ASF licenses this file
006:         * to you under the Apache License, Version 2.0 (the
007:         * "License"); you may not use this file except in compliance
008:         * with the License. You may obtain a copy of the License at
009:         *
010:         * http://www.apache.org/licenses/LICENSE-2.0
011:         *
012:         * Unless required by applicable law or agreed to in writing,
013:         * software distributed under the License is distributed on an
014:         * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015:         * KIND, either express or implied. See the License for the
016:         * specific language governing permissions and limitations
017:         * under the License.
018:         */package org.apache.cxf.transport.jms;
019:
020:        import java.net.InetAddress;
021:        import java.net.UnknownHostException;
022:        import java.util.Calendar;
023:        import java.util.logging.Level;
024:        import java.util.logging.Logger;
025:
026:        import javax.jms.Connection;
027:        import javax.jms.Destination;
028:        import javax.jms.JMSException;
029:        import javax.jms.MessageConsumer;
030:        import javax.jms.Queue;
031:        import javax.jms.QueueConnection;
032:        import javax.jms.QueueSession;
033:        import javax.jms.Session;
034:        import javax.jms.Topic;
035:        import javax.jms.TopicConnection;
036:        import javax.jms.TopicSession;
037:        import javax.jms.TopicSubscriber;
038:        import javax.naming.Context;
039:        import javax.naming.NamingException;
040:
041:        import org.apache.cxf.common.logging.LogUtils;
042:        import org.apache.cxf.common.util.AbstractTwoStageCache;
043:
044:        /**
045:         * This class encapsulates the creation and pooling logic for JMS Sessions.
046:         * The usage patterns for sessions, producers & consumers are as follows ...
047:         * <p>
048:         * client-side: an invoking thread requires relatively short-term exclusive
049:         * use of a session, an unidentified producer to send the request message,
050:         * and in the point-to-point domain a consumer for the temporary ReplyTo
051:         * destination to synchronously receive the reply if the operation is twoway
052:         * (in the pub-sub domain only oneway operations are supported, so a there
053:         * is never a requirement for a reply destination)
054:         * <p>
055:         * server-side receive: each port based on <jms:address> requires relatively
056:         * long-term exclusive use of a session, a consumer with a MessageListener for
057:         * the JMS destination specified for the port, and an unidentified producer
058:         * to send the request message
059:         * <p>
060:         * server-side send: each dispatch of a twoway request requires relatively
061:         * short-term exclusive use of a session and an indentified producer (but
062:         * not a consumer) - note that the session used for the recieve side cannot
063:         * be re-used for the send, as MessageListener usage precludes any synchronous
064:         * sends or receives on that session
065:         * <p>
066:         * So on the client-side, pooling of sessions is bound up with pooling
067:         * of temporary reply destinations, whereas on the server receive side
068:         * the benefit of pooling is marginal as the session is required from
069:         * the point at which the port was activated until the Bus is shutdown
070:         * The server send side resembles the client side,
071:         * except that a consumer for the temporary destination is never required.
072:         * Hence different pooling strategies make sense ...
073:         * <p>
074:         * client-side: a SoftReference-based cache of send/receive sessions is
075:         * maintained containing an aggregate of a session, indentified producer,
076:         * temporary reply destination & consumer for same
077:         * <p>
078:         * server-side receive: as sessions cannot be usefully recycled, they are
079:         * simply created on demand and closed when no longer required
080:         * <p>
081:         * server-side send: a SoftReference-based cache of send-only sessions is
082:         * maintained containing an aggregate of a session and an indentified producer
083:         * <p>
084:         * In a pure client or pure server, only a single cache is ever
085:         * populated.  Where client and server logic is co-located, a client
086:         * session retrieval for a twoway invocation checks the reply-capable
087:         * cache first and then the send-only cache - if a session is
088:         * available in the later then its used after a tempory destination is
089:         * created before being recycled back into the reply-capable cache. A
090:         * server send side retrieval or client retrieval for a oneway
091:         * invocation checks the send-only cache first and then the
092:         * reply-capable cache - if a session is available in the later then
093:         * its used and the tempory destination is ignored. So in the
094:         * co-located case, sessions migrate from the send-only cache to the
095:         * reply-capable cache as necessary.
096:         * <p>
097:         *
098:         * @author Eoghan Glynn
099:         */
100:        public class JMSSessionFactory {
101:
102:            private static final Logger LOG = LogUtils
103:                    .getL7dLogger(JMSSessionFactory.class);
104:
105:            private int lowWaterMark;
106:            private int highWaterMark;
107:
108:            private final Context initialContext;
109:            private final Connection theConnection;
110:            private AbstractTwoStageCache<PooledSession> replyCapableSessionCache;
111:            private AbstractTwoStageCache<PooledSession> sendOnlySessionCache;
112:            private final Destination theReplyDestination;
113:            private final JMSTransport jmsTransport;
114:            private final ServerBehaviorPolicyType runtimePolicy;
115:
116:            /**
117:             * Constructor.
118:             *
119:             * @param connection the shared {Queue|Topic}Connection
120:             */
121:            public JMSSessionFactory(Connection connection,
122:                    Destination replyDestination, Context context,
123:                    JMSTransport tbb, ServerBehaviorPolicyType runtimePolicy) {
124:                theConnection = connection;
125:                theReplyDestination = replyDestination;
126:                initialContext = context;
127:                jmsTransport = tbb;
128:                this .runtimePolicy = runtimePolicy;
129:
130:                SessionPoolType sessionPoolConfig = jmsTransport
131:                        .getSessionPool();
132:
133:                lowWaterMark = sessionPoolConfig.getLowWaterMark();
134:                highWaterMark = sessionPoolConfig.getHighWaterMark();
135:
136:                // create session caches (REVISIT sizes should be configurable)
137:                //
138:
139:                if (isDestinationStyleQueue()) {
140:                    // the reply capable cache is only required in the point-to-point
141:                    // domain
142:                    //
143:                    replyCapableSessionCache = new AbstractTwoStageCache<PooledSession>(
144:                            lowWaterMark, highWaterMark, 0, this ) {
145:                        public final PooledSession create() throws JMSException {
146:                            return createPointToPointReplyCapableSession();
147:                        }
148:                    };
149:
150:                    try {
151:                        replyCapableSessionCache.populateCache();
152:                    } catch (Throwable t) {
153:                        LOG.log(Level.FINE,
154:                                "JMS Session cache populate failed: " + t);
155:                    }
156:
157:                    // send-only cache for point-to-point oneway requests and replies
158:                    //
159:                    sendOnlySessionCache = new AbstractTwoStageCache<PooledSession>(
160:                            lowWaterMark, highWaterMark, 0, this ) {
161:                        public final PooledSession create() throws JMSException {
162:                            return createPointToPointSendOnlySession();
163:                        }
164:                    };
165:
166:                    try {
167:                        sendOnlySessionCache.populateCache();
168:                    } catch (Throwable t) {
169:                        LOG.log(Level.FINE,
170:                                "JMS Session cache populate failed: " + t);
171:                    }
172:                } else {
173:                    // send-only cache for pub-sub oneway requests
174:                    //
175:                    sendOnlySessionCache = new AbstractTwoStageCache<PooledSession>(
176:                            lowWaterMark, highWaterMark, 0, this ) {
177:                        public final PooledSession create() throws JMSException {
178:                            return createPubSubSession(true, false, null);
179:                        }
180:                    };
181:
182:                    try {
183:                        sendOnlySessionCache.populateCache();
184:                    } catch (Throwable t) {
185:                        LOG.log(Level.FINE,
186:                                "JMS Session cache populate failed: " + t);
187:                    }
188:                }
189:            }
190:
191:            //--java.lang.Object Overrides----------------------------------------------
192:            public String toString() {
193:                return "JMSSessionFactory";
194:            }
195:
196:            //--Methods-----------------------------------------------------------------
197:            protected Connection getConnection() {
198:                return theConnection;
199:            }
200:
201:            public Queue getQueueFromInitialContext(String queueName)
202:                    throws NamingException {
203:                return (Queue) initialContext.lookup(queueName);
204:            }
205:
206:            public PooledSession get(boolean replyCapable) throws JMSException {
207:                return get(null, replyCapable);
208:            }
209:
210:            /**
211:             * Retrieve a new or cached Session.
212:             * @param replyDest Destination name if coming from wsa:Header
213:             * @param replyCapable true iff the session is to be used to receive replies
214:             * (implies client side twoway invocation )
215:             * @return a new or cached Session
216:             */
217:            public PooledSession get(Destination replyDest, boolean replyCapable)
218:                    throws JMSException {
219:                PooledSession ret = null;
220:
221:                synchronized (this ) {
222:                    if (replyCapable) {
223:                        // first try reply capable cache
224:                        //
225:                        ret = replyCapableSessionCache.poll();
226:
227:                        if (ret == null) {
228:                            // fall back to send only cache, creating temporary reply
229:                            // queue and consumer
230:                            //
231:                            ret = sendOnlySessionCache.poll();
232:
233:                            if (ret != null) {
234:                                QueueSession session = (QueueSession) ret
235:                                        .session();
236:                                Queue destination = null;
237:                                String selector = null;
238:
239:                                if (null != theReplyDestination
240:                                        || null != replyDest) {
241:                                    destination = null != replyDest ? (Queue) replyDest
242:                                            : (Queue) theReplyDestination;
243:
244:                                    selector = "JMSCorrelationID = '"
245:                                            + generateUniqueSelector(ret) + "'";
246:                                }
247:
248:                                ret.destination(destination);
249:                                MessageConsumer consumer = session
250:                                        .createReceiver(destination, selector);
251:                                ret.consumer(consumer);
252:                            } else {
253:                                // no pooled session available in either cache => create one in
254:                                // in the reply capable cache
255:                                //
256:                                try {
257:                                    ret = replyCapableSessionCache.get();
258:                                } catch (Throwable t) {
259:                                    // factory method may only throw JMSException
260:                                    //
261:                                    throw (JMSException) t;
262:                                }
263:                            }
264:                        }
265:                    } else {
266:                        // first try send only cache
267:                        //
268:                        ret = sendOnlySessionCache.poll();
269:
270:                        if (ret == null) {
271:                            // fall back to reply capable cache if one exists (only in the
272:                            // point-to-point domain), ignoring temporary reply destination
273:                            // and consumer
274:                            //
275:                            if (replyCapableSessionCache != null) {
276:                                ret = replyCapableSessionCache.poll();
277:                            }
278:
279:                            if (ret == null) {
280:                                // no pooled session available in either cache => create one in
281:                                // in the send only cache
282:                                //
283:                                try {
284:                                    ret = sendOnlySessionCache.get();
285:                                } catch (Throwable t) {
286:                                    // factory method may only throw JMSException
287:                                    //
288:                                    throw (JMSException) t;
289:                                }
290:                            }
291:                        }
292:                    }
293:                }
294:
295:                return ret;
296:            }
297:
298:            /**
299:             * Retrieve a new
300:             *
301:             * @param destination the target JMS queue or topic (non-null implies
302:             * server receive side)
303:             * @return a new or cached Session
304:             */
305:            public PooledSession get(Destination destination)
306:                    throws JMSException {
307:                PooledSession ret = null;
308:
309:                // the destination is only specified on the server receive side,
310:                // in which case a new session is always created
311:                //
312:                if (isDestinationStyleQueue()) {
313:                    ret = createPointToPointServerSession(destination);
314:                } else {
315:                    ret = createPubSubSession(false, true, destination);
316:                }
317:
318:                return ret;
319:            }
320:
321:            /**
322:             * Return a Session to the pool
323:             *
324:             * @param pooled_session the session to recycle
325:             */
326:            public void recycle(PooledSession pooledSession) {
327:                // sessions used long-term by the server receive side are not cached,
328:                // only non-null destinations are temp queues
329:                final boolean replyCapable = pooledSession.destination() != null;
330:                boolean discard = false;
331:
332:                synchronized (this ) {
333:                    // re-cache session, closing if it cannot be it can be accomodated
334:                    //
335:                    discard = replyCapable ? (!replyCapableSessionCache
336:                            .recycle(pooledSession)) : (!sendOnlySessionCache
337:                            .recycle(pooledSession));
338:                }
339:
340:                if (discard) {
341:                    try {
342:                        pooledSession.close();
343:                    } catch (JMSException e) {
344:                        LOG.log(Level.WARNING, "JMS Session discard failed: "
345:                                + e);
346:                    }
347:                }
348:            }
349:
350:            /**
351:             * Shutdown the session factory.
352:             */
353:            public void shutdown() {
354:                try {
355:                    PooledSession curr;
356:
357:                    if (replyCapableSessionCache != null) {
358:                        curr = replyCapableSessionCache.poll();
359:                        while (curr != null) {
360:                            curr.close();
361:                            curr = replyCapableSessionCache.poll();
362:                        }
363:                    }
364:
365:                    if (sendOnlySessionCache != null) {
366:                        curr = sendOnlySessionCache.poll();
367:                        while (curr != null) {
368:                            curr.close();
369:                            curr = sendOnlySessionCache.poll();
370:                        }
371:                    }
372:
373:                    theConnection.close();
374:                } catch (JMSException e) {
375:                    LOG.log(Level.WARNING, "queue connection close failed: "
376:                            + e);
377:                }
378:
379:                // help GC
380:                //
381:                replyCapableSessionCache = null;
382:                sendOnlySessionCache = null;
383:            }
384:
385:            /**
386:             * Helper method to create a point-to-point pooled session.
387:             *
388:             * @param producer true iff producing
389:             * @param consumer true iff consuming
390:             * @param destination the target destination
391:             * @return an appropriate pooled session
392:             */
393:            PooledSession createPointToPointReplyCapableSession()
394:                    throws JMSException {
395:                QueueSession session = ((QueueConnection) theConnection)
396:                        .createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
397:                Destination destination = null;
398:                String selector = null;
399:
400:                if (null != theReplyDestination) {
401:                    destination = theReplyDestination;
402:
403:                    selector = "JMSCorrelationID = '"
404:                            + generateUniqueSelector(session) + "'";
405:
406:                } else {
407:                    destination = session.createTemporaryQueue();
408:                }
409:
410:                MessageConsumer consumer = session.createReceiver(
411:                        (Queue) destination, selector);
412:                return new PooledSession(session, destination, session
413:                        .createSender(null), consumer);
414:            }
415:
416:            /**
417:             * Helper method to create a point-to-point pooled session.
418:             *
419:             * @return an appropriate pooled session
420:             */
421:            PooledSession createPointToPointSendOnlySession()
422:                    throws JMSException {
423:                QueueSession session = ((QueueConnection) theConnection)
424:                        .createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
425:
426:                return new PooledSession(session, null, session
427:                        .createSender(null), null);
428:            }
429:
430:            /**
431:             * Helper method to create a point-to-point pooled session for consumer only.
432:             *
433:             * @param destination the target destination
434:             * @return an appropriate pooled session
435:             */
436:            private PooledSession createPointToPointServerSession(
437:                    Destination destination) throws JMSException {
438:                QueueSession session = ((QueueConnection) theConnection)
439:                        .createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
440:
441:                return new PooledSession(session, destination, session
442:                        .createSender(null), session
443:                        .createReceiver((Queue) destination, runtimePolicy
444:                                .getMessageSelector()));
445:            }
446:
447:            /**
448:             * Helper method to create a pub-sub pooled session.
449:             *
450:             * @param producer true iff producing
451:             * @param consumer true iff consuming
452:             * @param destination the target destination
453:             * @return an appropriate pooled session
454:             */
455:            PooledSession createPubSubSession(boolean producer,
456:                    boolean consumer, Destination destination)
457:                    throws JMSException {
458:                TopicSession session = ((TopicConnection) theConnection)
459:                        .createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
460:                TopicSubscriber sub = null;
461:                if (consumer) {
462:                    String messageSelector = runtimePolicy.getMessageSelector();
463:                    String durableName = runtimePolicy
464:                            .getDurableSubscriberName();
465:                    if (durableName != null) {
466:                        sub = session.createDurableSubscriber(
467:                                (Topic) destination, durableName,
468:                                messageSelector, false);
469:                    } else {
470:                        sub = session.createSubscriber((Topic) destination,
471:                                messageSelector, false);
472:                    }
473:                }
474:
475:                return new PooledSession(session, null, producer ? session
476:                        .createPublisher(null) : null, sub);
477:            }
478:
479:            private String generateUniqueSelector(Object obj) {
480:                String host = "localhost";
481:
482:                try {
483:                    InetAddress addr = InetAddress.getLocalHost();
484:                    host = addr.getHostName();
485:                } catch (UnknownHostException ukex) {
486:                    //Default to localhost.
487:                }
488:
489:                long time = Calendar.getInstance().getTimeInMillis();
490:                return host + "_" + System.getProperty("user.name") + "_" + obj
491:                        + time;
492:            }
493:
494:            private boolean isDestinationStyleQueue() {
495:                return JMSConstants.JMS_QUEUE.equals(jmsTransport
496:                        .getJMSAddress().getDestinationStyle().value());
497:            }
498:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.