001: package org.objectweb.celtix.bus.transports.jms;
002:
003: import java.net.InetAddress;
004: import java.net.UnknownHostException;
005: import java.util.Calendar;
006: import java.util.logging.Level;
007: import java.util.logging.Logger;
008:
009: import javax.jms.Connection;
010: import javax.jms.Destination;
011: import javax.jms.JMSException;
012: import javax.jms.MessageConsumer;
013: import javax.jms.Queue;
014: import javax.jms.QueueConnection;
015: import javax.jms.QueueSession;
016: import javax.jms.Session;
017: import javax.jms.Topic;
018: import javax.jms.TopicConnection;
019: import javax.jms.TopicSession;
020: import javax.jms.TopicSubscriber;
021: import javax.naming.Context;
022: import javax.naming.NamingException;
023:
024: import org.objectweb.celtix.common.logging.LogUtils;
025: import org.objectweb.celtix.common.util.AbstractTwoStageCache;
026: import org.objectweb.celtix.transports.jms.JMSAddressPolicyType;
027: import org.objectweb.celtix.transports.jms.JMSServerBehaviorPolicyType;
028:
029: /**
030: * This class encapsulates the creation and pooling logic for JMS Sessions.
031: * The usage patterns for sessions, producers & consumers are as follows ...
032: * <p>
033: * client-side: an invoking thread requires relatively short-term exclusive
034: * use of a session, an unidentified producer to send the request message,
035: * and in the point-to-point domain a consumer for the temporary ReplyTo
036: * destination to synchronously receive the reply if the operation is twoway
037: * (in the pub-sub domain only oneway operations are supported, so a there
038: * is never a requirement for a reply destination)
039: * <p>
040: * server-side receive: each port based on <jms:address> requires relatively
041: * long-term exclusive use of a session, a consumer with a MessageListener for
042: * the JMS destination specified for the port, and an unidentified producer
043: * to send the request message
044: * <p>
045: * server-side send: each dispatch of a twoway request requires relatively
046: * short-term exclusive use of a session and an indentified producer (but
047: * not a consumer) - note that the session used for the recieve side cannot
048: * be re-used for the send, as MessageListener usage precludes any synchronous
049: * sends or receives on that session
050: * <p>
051: * So on the client-side, pooling of sessions is bound up with pooling
052: * of temporary reply destinations, whereas on the server receive side
053: * the benefit of pooling is marginal as the session is required from
054: * the point at which the port was activated until the Bus is shutdown
055: * The server send side resembles the client side,
056: * except that a consumer for the temporary destination is never required.
057: * Hence different pooling strategies make sense ...
058: * <p>
059: * client-side: a SoftReference-based cache of send/receive sessions is
060: * maintained containing an aggregate of a session, indentified producer,
061: * temporary reply destination & consumer for same
062: * <p>
063: * server-side receive: as sessions cannot be usefully recycled, they are
064: * simply created on demand and closed when no longer required
065: * <p>
066: * server-side send: a SoftReference-based cache of send-only sessions is
067: * maintained containing an aggregate of a session and an indentified producer
068: * <p>
069: * In a pure client or pure server, only a single cache is ever
070: * populated. Where client and server logic is co-located, a client
071: * session retrieval for a twoway invocation checks the reply-capable
072: * cache first and then the send-only cache - if a session is
073: * available in the later then its used after a tempory destination is
074: * created before being recycled back into the reply-capable cache. A
075: * server send side retrieval or client retrieval for a oneway
076: * invocation checks the send-only cache first and then the
077: * reply-capable cache - if a session is available in the later then
078: * its used and the tempory destination is ignored. So in the
079: * co-located case, sessions migrate from the send-only cache to the
080: * reply-capable cache as necessary.
081: * <p>
082: *
083: * @author Eoghan Glynn
084: */
085: public class JMSSessionFactory {
086:
087: private static final int CACHE_HIGH_WATER_MARK = 500;
088: private static final Logger LOG = LogUtils
089: .getL7dLogger(JMSSessionFactory.class);
090: private static final int PRIMARY_CACHE_MAX = 20;
091:
092: private final Context initialContext;
093: private final Connection theConnection;
094: private AbstractTwoStageCache<PooledSession> replyCapableSessionCache;
095: private AbstractTwoStageCache<PooledSession> sendOnlySessionCache;
096: private final Destination theReplyDestination;
097: private final boolean isQueueConnection;
098:
099: private final JMSAddressPolicyType addressExtensor;
100: private final JMSServerBehaviorPolicyType jmsServerPolicy;
101:
102: /**
103: * Constructor.
104: *
105: * @param connection the shared {Queue|Topic}Connection
106: */
107: public JMSSessionFactory(Connection connection,
108: Destination replyDestination, JMSAddressPolicyType addrExt,
109: JMSServerBehaviorPolicyType serverPolicy, Context context) {
110: theConnection = connection;
111: theReplyDestination = replyDestination;
112: addressExtensor = addrExt;
113: isQueueConnection = addressExtensor.getDestinationStyle()
114: .value().equals(JMSConstants.JMS_QUEUE);
115: jmsServerPolicy = serverPolicy;
116: initialContext = context;
117:
118: // create session caches (REVISIT sizes should be configurable)
119: //
120: if (isQueueConnection) {
121: // the reply capable cache is only required in the point-to-point
122: // domain
123: //
124: replyCapableSessionCache = new AbstractTwoStageCache<PooledSession>(
125: PRIMARY_CACHE_MAX, CACHE_HIGH_WATER_MARK, 0, this ) {
126: public final PooledSession create() throws JMSException {
127: return createPointToPointReplyCapableSession();
128: }
129: };
130:
131: try {
132: replyCapableSessionCache.populateCache();
133: } catch (Throwable t) {
134: LOG.log(Level.FINE,
135: "JMS Session cache populate failed: " + t);
136: }
137:
138: // send-only cache for point-to-point oneway requests and replies
139: //
140: sendOnlySessionCache = new AbstractTwoStageCache<PooledSession>(
141: PRIMARY_CACHE_MAX, CACHE_HIGH_WATER_MARK, 0, this ) {
142: public final PooledSession create() throws JMSException {
143: return createPointToPointSendOnlySession();
144: }
145: };
146:
147: try {
148: sendOnlySessionCache.populateCache();
149: } catch (Throwable t) {
150: LOG.log(Level.FINE,
151: "JMS Session cache populate failed: " + t);
152: }
153: } else {
154: // send-only cache for pub-sub oneway requests
155: //
156: sendOnlySessionCache = new AbstractTwoStageCache<PooledSession>(
157: PRIMARY_CACHE_MAX, CACHE_HIGH_WATER_MARK, 0, this ) {
158: public final PooledSession create() throws JMSException {
159: return createPubSubSession(true, false, null);
160: }
161: };
162:
163: try {
164: sendOnlySessionCache.populateCache();
165: } catch (Throwable t) {
166: LOG.log(Level.FINE,
167: "JMS Session cache populate failed: " + t);
168: }
169: }
170: }
171:
172: //--java.lang.Object Overrides----------------------------------------------
173: public String toString() {
174: return "JMSSessionFactory";
175: }
176:
177: //--Methods-----------------------------------------------------------------
178: protected Connection getConnection() {
179: return theConnection;
180: }
181:
182: public Queue getQueueFromInitialContext(String queueName)
183: throws NamingException {
184: return (Queue) initialContext.lookup(queueName);
185: }
186:
187: public PooledSession get(boolean replyCapable) throws JMSException {
188: return get(null, replyCapable);
189: }
190:
191: /**
192: * Retrieve a new or cached Session.
193: * @param replyDest Destination name if coming from wsa:Header
194: * @param replyCapable true iff the session is to be used to receive replies
195: * (implies client side twoway invocation )
196: * @return a new or cached Session
197: */
198: public PooledSession get(Destination replyDest, boolean replyCapable)
199: throws JMSException {
200: PooledSession ret = null;
201:
202: synchronized (this ) {
203: if (replyCapable) {
204: // first try reply capable cache
205: //
206: ret = replyCapableSessionCache.poll();
207:
208: if (ret == null) {
209: // fall back to send only cache, creating temporary reply
210: // queue and consumer
211: //
212: ret = sendOnlySessionCache.poll();
213:
214: if (ret != null) {
215: QueueSession session = (QueueSession) ret
216: .session();
217: Queue destination = null;
218: String selector = null;
219:
220: if (null != theReplyDestination
221: || null != replyDest) {
222: destination = null != replyDest ? (Queue) replyDest
223: : (Queue) theReplyDestination;
224:
225: selector = "JMSCorrelationID = '"
226: + generateUniqueSelector(ret) + "'";
227: }
228:
229: ret.destination(destination);
230: MessageConsumer consumer = session
231: .createReceiver(destination, selector);
232: ret.consumer(consumer);
233: } else {
234: // no pooled session available in either cache => create one in
235: // in the reply capable cache
236: //
237: try {
238: ret = replyCapableSessionCache.get();
239: } catch (Throwable t) {
240: // factory method may only throw JMSException
241: //
242: throw (JMSException) t;
243: }
244: }
245: }
246: } else {
247: // first try send only cache
248: //
249: ret = sendOnlySessionCache.poll();
250:
251: if (ret == null) {
252: // fall back to reply capable cache if one exists (only in the
253: // point-to-point domain), ignoring temporary reply destination
254: // and consumer
255: //
256: if (replyCapableSessionCache != null) {
257: ret = replyCapableSessionCache.poll();
258: }
259:
260: if (ret == null) {
261: // no pooled session available in either cache => create one in
262: // in the send only cache
263: //
264: try {
265: ret = sendOnlySessionCache.get();
266: } catch (Throwable t) {
267: // factory method may only throw JMSException
268: //
269: throw (JMSException) t;
270: }
271: }
272: }
273: }
274: }
275:
276: return ret;
277: }
278:
279: /**
280: * Retrieve a new
281: *
282: * @param destination the target JMS queue or topic (non-null implies
283: * server receive side)
284: * @return a new or cached Session
285: */
286: public PooledSession get(Destination destination)
287: throws JMSException {
288: PooledSession ret = null;
289:
290: // the destination is only specified on the server receive side,
291: // in which case a new session is always created
292: //
293: if (isQueueConnection) {
294: ret = createPointToPointServerSession(destination);
295: } else {
296: ret = createPubSubSession(false, true, destination);
297: }
298:
299: return ret;
300: }
301:
302: /**
303: * Return a Session to the pool
304: *
305: * @param pooled_session the session to recycle
306: */
307: public void recycle(PooledSession pooledSession) {
308: // sessions used long-term by the server receive side are not cached,
309: // only non-null destinations are temp queues
310: final boolean replyCapable = pooledSession.destination() != null;
311: boolean discard = false;
312:
313: synchronized (this ) {
314: // re-cache session, closing if it cannot be it can be accomodated
315: //
316: discard = replyCapable ? (!replyCapableSessionCache
317: .recycle(pooledSession)) : (!sendOnlySessionCache
318: .recycle(pooledSession));
319: }
320:
321: if (discard) {
322: try {
323: pooledSession.close();
324: } catch (JMSException e) {
325: LOG.log(Level.WARNING, "JMS Session discard failed: "
326: + e);
327: }
328: }
329: }
330:
331: /**
332: * Shutdown the session factory.
333: */
334: public void shutdown() {
335: try {
336: PooledSession curr;
337:
338: if (replyCapableSessionCache != null) {
339: curr = replyCapableSessionCache.poll();
340: while (curr != null) {
341: curr.close();
342: curr = replyCapableSessionCache.poll();
343: }
344: }
345:
346: if (sendOnlySessionCache != null) {
347: curr = sendOnlySessionCache.poll();
348: while (curr != null) {
349: curr.close();
350: curr = sendOnlySessionCache.poll();
351: }
352: }
353:
354: theConnection.close();
355: } catch (JMSException e) {
356: LOG.log(Level.WARNING, "queue connection close failed: "
357: + e);
358: }
359:
360: // help GC
361: //
362: replyCapableSessionCache = null;
363: sendOnlySessionCache = null;
364: }
365:
366: /**
367: * Helper method to create a point-to-point pooled session.
368: *
369: * @param producer true iff producing
370: * @param consumer true iff consuming
371: * @param destination the target destination
372: * @return an appropriate pooled session
373: */
374: PooledSession createPointToPointReplyCapableSession()
375: throws JMSException {
376: QueueSession session = ((QueueConnection) theConnection)
377: .createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
378: Destination destination = null;
379: String selector = null;
380:
381: if (null != theReplyDestination) {
382: destination = theReplyDestination;
383:
384: selector = "JMSCorrelationID = '"
385: + generateUniqueSelector(session) + "'";
386:
387: } else {
388: destination = session.createTemporaryQueue();
389: }
390:
391: MessageConsumer consumer = session.createReceiver(
392: (Queue) destination, selector);
393: return new PooledSession(session, destination, session
394: .createSender(null), consumer);
395: }
396:
397: /**
398: * Helper method to create a point-to-point pooled session.
399: *
400: * @return an appropriate pooled session
401: */
402: PooledSession createPointToPointSendOnlySession()
403: throws JMSException {
404: QueueSession session = ((QueueConnection) theConnection)
405: .createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
406:
407: return new PooledSession(session, null, session
408: .createSender(null), null);
409: }
410:
411: /**
412: * Helper method to create a point-to-point pooled session for consumer only.
413: *
414: * @param destination the target destination
415: * @return an appropriate pooled session
416: */
417: private PooledSession createPointToPointServerSession(
418: Destination destination) throws JMSException {
419: QueueSession session = ((QueueConnection) theConnection)
420: .createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
421:
422: return new PooledSession(session, destination, session
423: .createSender(null), session.createReceiver(
424: (Queue) destination, jmsServerPolicy
425: .getMessageSelector()));
426: }
427:
428: /**
429: * Helper method to create a pub-sub pooled session.
430: *
431: * @param producer true iff producing
432: * @param consumer true iff consuming
433: * @param destination the target destination
434: * @return an appropriate pooled session
435: */
436: PooledSession createPubSubSession(boolean producer,
437: boolean consumer, Destination destination)
438: throws JMSException {
439: TopicSession session = ((TopicConnection) theConnection)
440: .createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
441: TopicSubscriber sub = null;
442: if (consumer) {
443: String messageSelector = jmsServerPolicy
444: .getMessageSelector();
445: String durableName = jmsServerPolicy
446: .getDurableSubscriberName();
447: if (durableName != null) {
448: sub = session.createDurableSubscriber(
449: (Topic) destination, durableName,
450: messageSelector, false);
451: } else {
452: sub = session.createSubscriber((Topic) destination,
453: messageSelector, false);
454: }
455: }
456:
457: return new PooledSession(session, null, producer ? session
458: .createPublisher(null) : null, sub);
459: }
460:
461: private String generateUniqueSelector(Object obj) {
462: String host = "localhost";
463:
464: try {
465: InetAddress addr = InetAddress.getLocalHost();
466: host = addr.getHostName();
467: } catch (UnknownHostException ukex) {
468: //Default to localhost.
469: }
470:
471: long time = Calendar.getInstance().getTimeInMillis();
472: return host + "_" + System.getProperty("user.name") + "_" + obj
473: + time;
474: }
475: }
|