001: /**
002: * Licensed to the Apache Software Foundation (ASF) under one
003: * or more contributor license agreements. See the NOTICE file
004: * distributed with this work for additional information
005: * regarding copyright ownership. The ASF licenses this file
006: * to you under the Apache License, Version 2.0 (the
007: * "License"); you may not use this file except in compliance
008: * with the License. You may obtain a copy of the License at
009: *
010: * http://www.apache.org/licenses/LICENSE-2.0
011: *
012: * Unless required by applicable law or agreed to in writing,
013: * software distributed under the License is distributed on an
014: * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015: * KIND, either express or implied. See the License for the
016: * specific language governing permissions and limitations
017: * under the License.
018: */package org.apache.cxf.transport.jms;
019:
020: import java.net.InetAddress;
021: import java.net.UnknownHostException;
022: import java.util.Calendar;
023: import java.util.logging.Level;
024: import java.util.logging.Logger;
025:
026: import javax.jms.Connection;
027: import javax.jms.Destination;
028: import javax.jms.JMSException;
029: import javax.jms.MessageConsumer;
030: import javax.jms.Queue;
031: import javax.jms.QueueConnection;
032: import javax.jms.QueueSession;
033: import javax.jms.Session;
034: import javax.jms.Topic;
035: import javax.jms.TopicConnection;
036: import javax.jms.TopicSession;
037: import javax.jms.TopicSubscriber;
038: import javax.naming.Context;
039: import javax.naming.NamingException;
040:
041: import org.apache.cxf.common.logging.LogUtils;
042: import org.apache.cxf.common.util.AbstractTwoStageCache;
043:
044: /**
045: * This class encapsulates the creation and pooling logic for JMS Sessions.
046: * The usage patterns for sessions, producers & consumers are as follows ...
047: * <p>
048: * client-side: an invoking thread requires relatively short-term exclusive
049: * use of a session, an unidentified producer to send the request message,
050: * and in the point-to-point domain a consumer for the temporary ReplyTo
051: * destination to synchronously receive the reply if the operation is twoway
052: * (in the pub-sub domain only oneway operations are supported, so a there
053: * is never a requirement for a reply destination)
054: * <p>
055: * server-side receive: each port based on <jms:address> requires relatively
056: * long-term exclusive use of a session, a consumer with a MessageListener for
057: * the JMS destination specified for the port, and an unidentified producer
058: * to send the request message
059: * <p>
060: * server-side send: each dispatch of a twoway request requires relatively
061: * short-term exclusive use of a session and an indentified producer (but
062: * not a consumer) - note that the session used for the recieve side cannot
063: * be re-used for the send, as MessageListener usage precludes any synchronous
064: * sends or receives on that session
065: * <p>
066: * So on the client-side, pooling of sessions is bound up with pooling
067: * of temporary reply destinations, whereas on the server receive side
068: * the benefit of pooling is marginal as the session is required from
069: * the point at which the port was activated until the Bus is shutdown
070: * The server send side resembles the client side,
071: * except that a consumer for the temporary destination is never required.
072: * Hence different pooling strategies make sense ...
073: * <p>
074: * client-side: a SoftReference-based cache of send/receive sessions is
075: * maintained containing an aggregate of a session, indentified producer,
076: * temporary reply destination & consumer for same
077: * <p>
078: * server-side receive: as sessions cannot be usefully recycled, they are
079: * simply created on demand and closed when no longer required
080: * <p>
081: * server-side send: a SoftReference-based cache of send-only sessions is
082: * maintained containing an aggregate of a session and an indentified producer
083: * <p>
084: * In a pure client or pure server, only a single cache is ever
085: * populated. Where client and server logic is co-located, a client
086: * session retrieval for a twoway invocation checks the reply-capable
087: * cache first and then the send-only cache - if a session is
088: * available in the later then its used after a tempory destination is
089: * created before being recycled back into the reply-capable cache. A
090: * server send side retrieval or client retrieval for a oneway
091: * invocation checks the send-only cache first and then the
092: * reply-capable cache - if a session is available in the later then
093: * its used and the tempory destination is ignored. So in the
094: * co-located case, sessions migrate from the send-only cache to the
095: * reply-capable cache as necessary.
096: * <p>
097: *
098: * @author Eoghan Glynn
099: */
100: public class JMSSessionFactory {
101:
102: private static final Logger LOG = LogUtils
103: .getL7dLogger(JMSSessionFactory.class);
104:
105: private int lowWaterMark;
106: private int highWaterMark;
107:
108: private final Context initialContext;
109: private final Connection theConnection;
110: private AbstractTwoStageCache<PooledSession> replyCapableSessionCache;
111: private AbstractTwoStageCache<PooledSession> sendOnlySessionCache;
112: private final Destination theReplyDestination;
113: private final JMSTransport jmsTransport;
114: private final ServerBehaviorPolicyType runtimePolicy;
115:
116: /**
117: * Constructor.
118: *
119: * @param connection the shared {Queue|Topic}Connection
120: */
121: public JMSSessionFactory(Connection connection,
122: Destination replyDestination, Context context,
123: JMSTransport tbb, ServerBehaviorPolicyType runtimePolicy) {
124: theConnection = connection;
125: theReplyDestination = replyDestination;
126: initialContext = context;
127: jmsTransport = tbb;
128: this .runtimePolicy = runtimePolicy;
129:
130: SessionPoolType sessionPoolConfig = jmsTransport
131: .getSessionPool();
132:
133: lowWaterMark = sessionPoolConfig.getLowWaterMark();
134: highWaterMark = sessionPoolConfig.getHighWaterMark();
135:
136: // create session caches (REVISIT sizes should be configurable)
137: //
138:
139: if (isDestinationStyleQueue()) {
140: // the reply capable cache is only required in the point-to-point
141: // domain
142: //
143: replyCapableSessionCache = new AbstractTwoStageCache<PooledSession>(
144: lowWaterMark, highWaterMark, 0, this ) {
145: public final PooledSession create() throws JMSException {
146: return createPointToPointReplyCapableSession();
147: }
148: };
149:
150: try {
151: replyCapableSessionCache.populateCache();
152: } catch (Throwable t) {
153: LOG.log(Level.FINE,
154: "JMS Session cache populate failed: " + t);
155: }
156:
157: // send-only cache for point-to-point oneway requests and replies
158: //
159: sendOnlySessionCache = new AbstractTwoStageCache<PooledSession>(
160: lowWaterMark, highWaterMark, 0, this ) {
161: public final PooledSession create() throws JMSException {
162: return createPointToPointSendOnlySession();
163: }
164: };
165:
166: try {
167: sendOnlySessionCache.populateCache();
168: } catch (Throwable t) {
169: LOG.log(Level.FINE,
170: "JMS Session cache populate failed: " + t);
171: }
172: } else {
173: // send-only cache for pub-sub oneway requests
174: //
175: sendOnlySessionCache = new AbstractTwoStageCache<PooledSession>(
176: lowWaterMark, highWaterMark, 0, this ) {
177: public final PooledSession create() throws JMSException {
178: return createPubSubSession(true, false, null);
179: }
180: };
181:
182: try {
183: sendOnlySessionCache.populateCache();
184: } catch (Throwable t) {
185: LOG.log(Level.FINE,
186: "JMS Session cache populate failed: " + t);
187: }
188: }
189: }
190:
191: //--java.lang.Object Overrides----------------------------------------------
192: public String toString() {
193: return "JMSSessionFactory";
194: }
195:
196: //--Methods-----------------------------------------------------------------
197: protected Connection getConnection() {
198: return theConnection;
199: }
200:
201: public Queue getQueueFromInitialContext(String queueName)
202: throws NamingException {
203: return (Queue) initialContext.lookup(queueName);
204: }
205:
206: public PooledSession get(boolean replyCapable) throws JMSException {
207: return get(null, replyCapable);
208: }
209:
210: /**
211: * Retrieve a new or cached Session.
212: * @param replyDest Destination name if coming from wsa:Header
213: * @param replyCapable true iff the session is to be used to receive replies
214: * (implies client side twoway invocation )
215: * @return a new or cached Session
216: */
217: public PooledSession get(Destination replyDest, boolean replyCapable)
218: throws JMSException {
219: PooledSession ret = null;
220:
221: synchronized (this ) {
222: if (replyCapable) {
223: // first try reply capable cache
224: //
225: ret = replyCapableSessionCache.poll();
226:
227: if (ret == null) {
228: // fall back to send only cache, creating temporary reply
229: // queue and consumer
230: //
231: ret = sendOnlySessionCache.poll();
232:
233: if (ret != null) {
234: QueueSession session = (QueueSession) ret
235: .session();
236: Queue destination = null;
237: String selector = null;
238:
239: if (null != theReplyDestination
240: || null != replyDest) {
241: destination = null != replyDest ? (Queue) replyDest
242: : (Queue) theReplyDestination;
243:
244: selector = "JMSCorrelationID = '"
245: + generateUniqueSelector(ret) + "'";
246: }
247:
248: ret.destination(destination);
249: MessageConsumer consumer = session
250: .createReceiver(destination, selector);
251: ret.consumer(consumer);
252: } else {
253: // no pooled session available in either cache => create one in
254: // in the reply capable cache
255: //
256: try {
257: ret = replyCapableSessionCache.get();
258: } catch (Throwable t) {
259: // factory method may only throw JMSException
260: //
261: throw (JMSException) t;
262: }
263: }
264: }
265: } else {
266: // first try send only cache
267: //
268: ret = sendOnlySessionCache.poll();
269:
270: if (ret == null) {
271: // fall back to reply capable cache if one exists (only in the
272: // point-to-point domain), ignoring temporary reply destination
273: // and consumer
274: //
275: if (replyCapableSessionCache != null) {
276: ret = replyCapableSessionCache.poll();
277: }
278:
279: if (ret == null) {
280: // no pooled session available in either cache => create one in
281: // in the send only cache
282: //
283: try {
284: ret = sendOnlySessionCache.get();
285: } catch (Throwable t) {
286: // factory method may only throw JMSException
287: //
288: throw (JMSException) t;
289: }
290: }
291: }
292: }
293: }
294:
295: return ret;
296: }
297:
298: /**
299: * Retrieve a new
300: *
301: * @param destination the target JMS queue or topic (non-null implies
302: * server receive side)
303: * @return a new or cached Session
304: */
305: public PooledSession get(Destination destination)
306: throws JMSException {
307: PooledSession ret = null;
308:
309: // the destination is only specified on the server receive side,
310: // in which case a new session is always created
311: //
312: if (isDestinationStyleQueue()) {
313: ret = createPointToPointServerSession(destination);
314: } else {
315: ret = createPubSubSession(false, true, destination);
316: }
317:
318: return ret;
319: }
320:
321: /**
322: * Return a Session to the pool
323: *
324: * @param pooled_session the session to recycle
325: */
326: public void recycle(PooledSession pooledSession) {
327: // sessions used long-term by the server receive side are not cached,
328: // only non-null destinations are temp queues
329: final boolean replyCapable = pooledSession.destination() != null;
330: boolean discard = false;
331:
332: synchronized (this ) {
333: // re-cache session, closing if it cannot be it can be accomodated
334: //
335: discard = replyCapable ? (!replyCapableSessionCache
336: .recycle(pooledSession)) : (!sendOnlySessionCache
337: .recycle(pooledSession));
338: }
339:
340: if (discard) {
341: try {
342: pooledSession.close();
343: } catch (JMSException e) {
344: LOG.log(Level.WARNING, "JMS Session discard failed: "
345: + e);
346: }
347: }
348: }
349:
350: /**
351: * Shutdown the session factory.
352: */
353: public void shutdown() {
354: try {
355: PooledSession curr;
356:
357: if (replyCapableSessionCache != null) {
358: curr = replyCapableSessionCache.poll();
359: while (curr != null) {
360: curr.close();
361: curr = replyCapableSessionCache.poll();
362: }
363: }
364:
365: if (sendOnlySessionCache != null) {
366: curr = sendOnlySessionCache.poll();
367: while (curr != null) {
368: curr.close();
369: curr = sendOnlySessionCache.poll();
370: }
371: }
372:
373: theConnection.close();
374: } catch (JMSException e) {
375: LOG.log(Level.WARNING, "queue connection close failed: "
376: + e);
377: }
378:
379: // help GC
380: //
381: replyCapableSessionCache = null;
382: sendOnlySessionCache = null;
383: }
384:
385: /**
386: * Helper method to create a point-to-point pooled session.
387: *
388: * @param producer true iff producing
389: * @param consumer true iff consuming
390: * @param destination the target destination
391: * @return an appropriate pooled session
392: */
393: PooledSession createPointToPointReplyCapableSession()
394: throws JMSException {
395: QueueSession session = ((QueueConnection) theConnection)
396: .createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
397: Destination destination = null;
398: String selector = null;
399:
400: if (null != theReplyDestination) {
401: destination = theReplyDestination;
402:
403: selector = "JMSCorrelationID = '"
404: + generateUniqueSelector(session) + "'";
405:
406: } else {
407: destination = session.createTemporaryQueue();
408: }
409:
410: MessageConsumer consumer = session.createReceiver(
411: (Queue) destination, selector);
412: return new PooledSession(session, destination, session
413: .createSender(null), consumer);
414: }
415:
416: /**
417: * Helper method to create a point-to-point pooled session.
418: *
419: * @return an appropriate pooled session
420: */
421: PooledSession createPointToPointSendOnlySession()
422: throws JMSException {
423: QueueSession session = ((QueueConnection) theConnection)
424: .createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
425:
426: return new PooledSession(session, null, session
427: .createSender(null), null);
428: }
429:
430: /**
431: * Helper method to create a point-to-point pooled session for consumer only.
432: *
433: * @param destination the target destination
434: * @return an appropriate pooled session
435: */
436: private PooledSession createPointToPointServerSession(
437: Destination destination) throws JMSException {
438: QueueSession session = ((QueueConnection) theConnection)
439: .createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
440:
441: return new PooledSession(session, destination, session
442: .createSender(null), session
443: .createReceiver((Queue) destination, runtimePolicy
444: .getMessageSelector()));
445: }
446:
447: /**
448: * Helper method to create a pub-sub pooled session.
449: *
450: * @param producer true iff producing
451: * @param consumer true iff consuming
452: * @param destination the target destination
453: * @return an appropriate pooled session
454: */
455: PooledSession createPubSubSession(boolean producer,
456: boolean consumer, Destination destination)
457: throws JMSException {
458: TopicSession session = ((TopicConnection) theConnection)
459: .createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
460: TopicSubscriber sub = null;
461: if (consumer) {
462: String messageSelector = runtimePolicy.getMessageSelector();
463: String durableName = runtimePolicy
464: .getDurableSubscriberName();
465: if (durableName != null) {
466: sub = session.createDurableSubscriber(
467: (Topic) destination, durableName,
468: messageSelector, false);
469: } else {
470: sub = session.createSubscriber((Topic) destination,
471: messageSelector, false);
472: }
473: }
474:
475: return new PooledSession(session, null, producer ? session
476: .createPublisher(null) : null, sub);
477: }
478:
479: private String generateUniqueSelector(Object obj) {
480: String host = "localhost";
481:
482: try {
483: InetAddress addr = InetAddress.getLocalHost();
484: host = addr.getHostName();
485: } catch (UnknownHostException ukex) {
486: //Default to localhost.
487: }
488:
489: long time = Calendar.getInstance().getTimeInMillis();
490: return host + "_" + System.getProperty("user.name") + "_" + obj
491: + time;
492: }
493:
494: private boolean isDestinationStyleQueue() {
495: return JMSConstants.JMS_QUEUE.equals(jmsTransport
496: .getJMSAddress().getDestinationStyle().value());
497: }
498: }
|