001: /*
002: * JBoss, Home of Professional Open Source.
003: * Copyright 2006, Red Hat Middleware LLC, and individual contributors
004: * as indicated by the @author tags. See the copyright.txt file in the
005: * distribution for a full listing of individual contributors.
006: *
007: * This is free software; you can redistribute it and/or modify it
008: * under the terms of the GNU Lesser General Public License as
009: * published by the Free Software Foundation; either version 2.1 of
010: * the License, or (at your option) any later version.
011: *
012: * This software is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015: * Lesser General Public License for more details.
016: *
017: * You should have received a copy of the GNU Lesser General Public
018: * License along with this software; if not, write to the Free
019: * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020: * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
021: */
022: package org.jboss.resource.adapter.jms.inflow;
023:
024: import java.security.AccessController;
025: import java.security.PrivilegedAction;
026: import java.util.ArrayList;
027:
028: import javax.jms.Connection;
029: import javax.jms.ConnectionConsumer;
030: import javax.jms.JMSException;
031: import javax.jms.Queue;
032: import javax.jms.ServerSession;
033: import javax.jms.ServerSessionPool;
034: import javax.jms.Topic;
035:
036: import org.jboss.logging.Logger;
037:
038: /**
039: * A generic jms session pool.
040: *
041: * @author <a href="adrian@jboss.com">Adrian Brock</a>
042: * @version $Revision: 60552 $
043: */
044: public class JmsServerSessionPool implements ServerSessionPool {
045: /** The logger */
046: private static final Logger log = Logger
047: .getLogger(JmsServerSessionPool.class);
048:
049: /** The activation */
050: JmsActivation activation;
051:
052: /** The consumer */
053: ConnectionConsumer consumer;
054:
055: /** The server sessions */
056: ArrayList serverSessions = new ArrayList();
057:
058: /** Whether the pool is stopped */
059: boolean stopped = false;
060:
061: /** The number of sessions */
062: int sessionCount = 0;
063:
064: /**
065: * Create a new session pool
066: *
067: * @param activation the jms activation
068: */
069: public JmsServerSessionPool(JmsActivation activation) {
070: this .activation = activation;
071: }
072:
073: /**
074: * @return the activation
075: */
076: public JmsActivation getActivation() {
077: return activation;
078: }
079:
080: /**
081: * Start the server session pool
082: *
083: * @throws Exeption for any error
084: */
085: public void start() throws Exception {
086: setupSessions();
087: setupConsumer();
088: }
089:
090: /**
091: * Stop the server session pool
092: */
093: public void stop() {
094: teardownConsumer();
095: teardownSessions();
096: }
097:
098: public ServerSession getServerSession() throws JMSException {
099: boolean trace = log.isTraceEnabled();
100: if (trace)
101: log.trace("getServerSession");
102:
103: ServerSession result = null;
104:
105: try {
106: synchronized (serverSessions) {
107: while (true) {
108: int sessionsSize = serverSessions.size();
109:
110: if (stopped)
111: throw new Exception(
112: "Cannot get a server session after the pool is stopped");
113:
114: else if (sessionsSize > 0) {
115: result = (ServerSession) serverSessions
116: .remove(sessionsSize - 1);
117: break;
118: }
119:
120: else {
121: try {
122: serverSessions.wait();
123: } catch (InterruptedException ignored) {
124: }
125: }
126: }
127: }
128: } catch (Throwable t) {
129: log.error("Unable to get a server session", t);
130: throw new JMSException("Unable to get a server session "
131: + t);
132: }
133:
134: if (trace)
135: log.trace("Returning server session " + result);
136:
137: return result;
138: }
139:
140: /**
141: * Return the server session
142: *
143: * @param session the session
144: */
145: protected void returnServerSession(JmsServerSession session) {
146: synchronized (serverSessions) {
147: if (stopped) {
148: session.teardown();
149: --sessionCount;
150: } else
151: serverSessions.add(session);
152: serverSessions.notifyAll();
153: }
154: }
155:
156: /**
157: * Setup the sessions
158: *
159: * @throws Exeption for any error
160: */
161: protected void setupSessions() throws Exception {
162: JmsActivationSpec spec = activation.getActivationSpec();
163: ArrayList clonedSessions = null;
164:
165: // Create the sessions
166: synchronized (serverSessions) {
167: for (int i = 0; i < spec.getMaxSessionInt(); ++i) {
168: JmsServerSession session = new JmsServerSession(this );
169: serverSessions.add(session);
170: }
171: sessionCount = serverSessions.size();
172: clonedSessions = (ArrayList) serverSessions.clone();
173:
174: }
175:
176: // Start the sessions
177: for (int i = 0; i < clonedSessions.size(); ++i) {
178: JmsServerSession session = (JmsServerSession) clonedSessions
179: .get(i);
180: session.setup();
181: }
182: }
183:
184: /**
185: * Stop the sessions
186: */
187: protected void teardownSessions() {
188: synchronized (serverSessions) {
189: // Disallow any new sessions
190: stopped = true;
191: serverSessions.notifyAll();
192:
193: // Stop inactive sessions
194: for (int i = 0; i < serverSessions.size(); ++i) {
195: JmsServerSession session = (JmsServerSession) serverSessions
196: .get(i);
197: session.teardown();
198: }
199:
200: sessionCount -= serverSessions.size();
201: serverSessions.clear();
202:
203: // Wait for inuse sessions
204: while (sessionCount > 0) {
205: try {
206: serverSessions.wait();
207: } catch (InterruptedException ignore) {
208: }
209: }
210: }
211: }
212:
213: /**
214: * Setup the connection consumer
215: *
216: * @throws Exeption for any error
217: */
218: protected void setupConsumer() throws Exception {
219: Connection connection = activation.getConnection();
220: JmsActivationSpec spec = activation.getActivationSpec();
221: String selector = spec.getMessageSelector();
222: int maxMessages = spec.getMaxMessagesInt();
223: if (spec.isTopic()) {
224: Topic topic = (Topic) activation.getDestination();
225: String subscriptionName = spec.getSubscriptionName();
226: if (spec.isDurable())
227: consumer = connection.createDurableConnectionConsumer(
228: topic, subscriptionName, selector, this ,
229: maxMessages);
230: else
231: consumer = connection.createConnectionConsumer(topic,
232: selector, this , maxMessages);
233: } else {
234: Queue queue = (Queue) activation.getDestination();
235: consumer = connection.createConnectionConsumer(queue,
236: selector, this , maxMessages);
237: }
238: log.debug("Created consumer " + consumer);
239: }
240:
241: /**
242: * Stop the connection consumer
243: */
244: protected void teardownConsumer() {
245: try {
246: if (consumer != null) {
247: log.debug("Closing the " + consumer);
248: consumer.close();
249: }
250: } catch (Throwable t) {
251: log.debug("Error closing the consumer " + consumer, t);
252: }
253: }
254:
255: }
|