001: /*
002: * JBoss, Home of Professional Open Source.
003: * Copyright 2006, Red Hat Middleware LLC, and individual contributors
004: * as indicated by the @author tags. See the copyright.txt file in the
005: * distribution for a full listing of individual contributors.
006: *
007: * This is free software; you can redistribute it and/or modify it
008: * under the terms of the GNU Lesser General Public License as
009: * published by the Free Software Foundation; either version 2.1 of
010: * the License, or (at your option) any later version.
011: *
012: * This software is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015: * Lesser General Public License for more details.
016: *
017: * You should have received a copy of the GNU Lesser General Public
018: * License along with this software; if not, write to the Free
019: * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020: * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
021: */
022: package org.jboss.jms.asf;
023:
024: import java.util.ArrayList;
025: import java.util.Iterator;
026: import java.util.List;
027:
028: import javax.jms.Connection;
029: import javax.jms.Destination;
030: import javax.jms.JMSException;
031: import javax.jms.MessageListener;
032: import javax.jms.Queue;
033: import javax.jms.QueueConnection;
034: import javax.jms.ServerSession;
035: import javax.jms.ServerSessionPool;
036: import javax.jms.Session;
037: import javax.jms.Topic;
038: import javax.jms.TopicConnection;
039: import javax.jms.XAQueueConnection;
040: import javax.jms.XAQueueSession;
041: import javax.jms.XASession;
042: import javax.jms.XATopicConnection;
043: import javax.jms.XATopicSession;
044: import javax.transaction.TransactionManager;
045:
046: import org.jboss.logging.Logger;
047: import org.jboss.tm.XidFactoryMBean;
048:
049: import EDU.oswego.cs.dl.util.concurrent.Executor;
050: import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
051: import EDU.oswego.cs.dl.util.concurrent.ThreadFactory;
052:
053: /**
054: * Implementation of ServerSessionPool.
055: *
056: * @author <a href="mailto:peter.antman@tim.se">Peter Antman</a> .
057: * @author <a href="mailto:hiram.chirino@jboss.org">Hiram Chirino</a> .
058: * @author <a href="mailto:adrian@jboss.com">Adrian Brock</a>
059: * @version $Revision: 57209 $
060: */
061: public class StdServerSessionPool implements ServerSessionPool {
062: /** The thread group which session workers will run. */
063: private static ThreadGroup threadGroup = new ThreadGroup(
064: "ASF Session Pool Threads");
065:
066: /** Instance logger. */
067: private final Logger log = Logger.getLogger(this .getClass());
068:
069: /** The minimum size of the pool */
070: private int minSize;
071:
072: /** The size of the pool. */
073: private int poolSize;
074:
075: /** The message acknowledgment mode. */
076: private int ack;
077:
078: /** Is the bean container managed? */
079: private boolean useLocalTX;
080:
081: /** True if this is a transacted session. */
082: private boolean transacted;
083:
084: /** The destination. */
085: private Destination destination;
086:
087: /** The session connection. */
088: private Connection con;
089:
090: /** The message listener for the session. */
091: private MessageListener listener;
092:
093: /** The list of ServerSessions. */
094: private List sessionPool;
095:
096: /** The executor for processing messages? */
097: private PooledExecutor executor;
098:
099: /** Used to signal when the Pool is being closed down */
100: private boolean closing = false;
101:
102: /** Used during close down to wait for all server sessions to be returned and closed. */
103: private int numServerSessions = 0;
104:
105: private XidFactoryMBean xidFactory;
106:
107: private TransactionManager tm;
108:
109: /**
110: * Construct a <tt>StdServerSessionPool</tt>. Note the maxSession parameter controls
111: * both the maximum number of sessions in the pool, as well as the number of listener
112: * threads assigned to service requests from the JMS Provider.
113: *
114: *
115: *
116: * @param destination the destination
117: * @param con connection to get sessions from
118: * @param transacted transaction mode when not XA (
119: * @param ack ackmode when not XA
120: * @param listener the listener the sessions will call
121: * @param minSession minumum number of sessions in the pool
122: * @param maxSession maximum number of sessions in the pool
123: * @param keepAlive the time to keep sessions alive
124: * @param xidFactory the xid factory
125: * @param tm the transaction manager
126: * @exception JMSException Description of Exception
127: */
128: public StdServerSessionPool(final Destination destination,
129: final Connection con, final boolean transacted,
130: final int ack, final boolean useLocalTX,
131: final MessageListener listener, final int minSession,
132: final int maxSession, final long keepAlive,
133: final XidFactoryMBean xidFactory,
134: final TransactionManager tm) throws JMSException {
135: this .destination = destination;
136: this .con = con;
137: this .ack = ack;
138: this .listener = listener;
139: this .transacted = transacted;
140: this .minSize = minSession;
141: this .poolSize = maxSession;
142: this .sessionPool = new ArrayList(maxSession);
143: this .useLocalTX = useLocalTX;
144: this .xidFactory = xidFactory;
145: this .tm = tm;
146: // setup the worker pool
147: executor = new MyPooledExecutor(poolSize);
148: executor.setMinimumPoolSize(minSize);
149: executor.setKeepAliveTime(keepAlive);
150: executor.waitWhenBlocked();
151: executor.setThreadFactory(new DefaultThreadFactory());
152:
153: // finish initializing the session
154: create();
155: log.debug("Server Session pool set up");
156: }
157:
158: /**
159: * Get a server session.
160: *
161: * @return A server session.
162: * @throws JMSException Failed to get a server session.
163: */
164: public ServerSession getServerSession() throws JMSException {
165: if (log.isTraceEnabled())
166: log.trace("getting a server session");
167: ServerSession session = null;
168:
169: try {
170: while (true) {
171: synchronized (sessionPool) {
172: if (closing) {
173: throw new JMSException(
174: "Cannot get session after pool has been closed down.");
175: } else if (sessionPool.size() > 0) {
176: session = (ServerSession) sessionPool.remove(0);
177: break;
178: } else {
179: try {
180: sessionPool.wait();
181: } catch (InterruptedException ignore) {
182: }
183: }
184: }
185: }
186: } catch (Exception e) {
187: throw new JMSException("Failed to get a server session: "
188: + e);
189: }
190:
191: if (log.isTraceEnabled())
192: log.trace("using server session: " + session);
193: return session;
194: }
195:
196: /**
197: * Clear the pool, clear out both threads and ServerSessions,
198: * connection.stop() should be run before this method.
199: */
200: public void clear() {
201: synchronized (sessionPool) {
202: // FIXME - is there a runaway condition here. What if a
203: // ServerSession are taken by a ConnecionConsumer? Should we set
204: // a flag somehow so that no ServerSessions are recycled and the
205: // ThreadPool won't leave any more threads out.
206: closing = true;
207:
208: log.debug("Clearing " + sessionPool.size()
209: + " from ServerSessionPool");
210:
211: Iterator iter = sessionPool.iterator();
212: while (iter.hasNext()) {
213: StdServerSession ses = (StdServerSession) iter.next();
214: // Should we do anything to the server session?
215: ses.close();
216: numServerSessions--;
217: }
218:
219: sessionPool.clear();
220: sessionPool.notifyAll();
221: }
222:
223: //Must be outside synchronized block because of recycle method.
224: executor.shutdownAfterProcessingCurrentlyQueuedTasks();
225:
226: //wait for all server sessions to be returned.
227: synchronized (sessionPool) {
228: while (numServerSessions > 0) {
229: try {
230: sessionPool.wait();
231: } catch (InterruptedException ignore) {
232: }
233: }
234: }
235: }
236:
237: /**
238: * Get the executor we are using.
239: *
240: * @return The Executor value
241: */
242: Executor getExecutor() {
243: return executor;
244: }
245:
246: // --- Protected messages for StdServerSession to use
247:
248: /**
249: * Returns true if this server session is transacted.
250: *
251: * @return The Transacted value
252: */
253: boolean isTransacted() {
254: return transacted;
255: }
256:
257: /**
258: * Recycle a server session.
259: *
260: * @param session Description of Parameter
261: */
262: void recycle(StdServerSession session) {
263: synchronized (sessionPool) {
264: if (closing) {
265: session.close();
266: numServerSessions--;
267: if (numServerSessions == 0) {
268: //notify clear thread.
269: sessionPool.notifyAll();
270: }
271: } else {
272: sessionPool.add(session);
273: sessionPool.notifyAll();
274: if (log.isTraceEnabled())
275: log.trace("recycled server session: " + session);
276: }
277: }
278: }
279:
280: private void create() throws JMSException {
281: for (int index = 0; index < poolSize; index++) {
282: // Here is the meat, that MUST follow the spec
283: Session ses = null;
284: XASession xaSes = null;
285:
286: log.debug("initializing with connection: " + con);
287:
288: if (destination instanceof Topic
289: && con instanceof XATopicConnection) {
290: xaSes = ((XATopicConnection) con)
291: .createXATopicSession();
292: ses = ((XATopicSession) xaSes).getTopicSession();
293: } else if (destination instanceof Queue
294: && con instanceof XAQueueConnection) {
295: xaSes = ((XAQueueConnection) con)
296: .createXAQueueSession();
297: ses = ((XAQueueSession) xaSes).getQueueSession();
298: } else if (destination instanceof Topic
299: && con instanceof TopicConnection) {
300: ses = ((TopicConnection) con).createTopicSession(
301: transacted, ack);
302: log
303: .warn("Using a non-XA TopicConnection. "
304: + "It will not be able to participate in a Global UOW");
305: } else if (destination instanceof Queue
306: && con instanceof QueueConnection) {
307: ses = ((QueueConnection) con).createQueueSession(
308: transacted, ack);
309: log
310: .warn("Using a non-XA QueueConnection. "
311: + "It will not be able to participate in a Global UOW");
312: } else {
313: throw new JMSException(
314: "Connection was not reconizable: " + con
315: + " for destination " + destination);
316: }
317:
318: // create the server session and add it to the pool - it is up to the
319: // server session to set the listener
320: StdServerSession serverSession = new StdServerSession(this ,
321: ses, xaSes, listener, useLocalTX, xidFactory, tm);
322:
323: sessionPool.add(serverSession);
324: numServerSessions++;
325:
326: log.debug("added server session to the pool: "
327: + serverSession);
328: }
329: }
330:
331: /**
332: * A pooled executor where the minimum pool size
333: * threads are kept alive
334: */
335: private static class MyPooledExecutor extends PooledExecutor {
336: public MyPooledExecutor(int poolSize) {
337: super (poolSize);
338: }
339:
340: protected Runnable getTask() throws InterruptedException {
341: Runnable task = null;
342: while ((task = super .getTask()) == null && keepRunning())
343: ;
344: return task;
345: }
346:
347: /**
348: * We keep running unless we are told to shutdown
349: * or there are more than minimumPoolSize_ threads in the pool
350: *
351: * @return whether to keep running
352: */
353: protected synchronized boolean keepRunning() {
354: if (shutdown_)
355: return false;
356:
357: return poolSize_ <= minimumPoolSize_;
358: }
359: }
360:
361: private static class DefaultThreadFactory implements ThreadFactory {
362: private static int count = 0;
363:
364: private static synchronized int nextCount() {
365: return count++;
366: }
367:
368: /**
369: * Create a new Thread for the given Runnable
370: *
371: * @param command The Runnable to pass to Thread
372: * @return The newly created Thread
373: */
374: public Thread newThread(final Runnable command) {
375: String name = "JMS SessionPool Worker-" + nextCount();
376: Thread thread = new Thread(threadGroup, command, name);
377: thread.setDaemon(true);
378: return thread;
379: }
380: }
381: }
|