001: /*
002: * Copyright 2002-2007 the original author or authors.
003: *
004: * Licensed under the Apache License, Version 2.0 (the "License");
005: * you may not use this file except in compliance with the License.
006: * You may obtain a copy of the License at
007: *
008: * http://www.apache.org/licenses/LICENSE-2.0
009: *
010: * Unless required by applicable law or agreed to in writing, software
011: * distributed under the License is distributed on an "AS IS" BASIS,
012: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013: * See the License for the specific language governing permissions and
014: * limitations under the License.
015: */
016:
017: package org.springframework.jms.listener.serversession;
018:
019: import javax.jms.JMSException;
020: import javax.jms.ServerSession;
021: import javax.jms.Session;
022:
023: import org.apache.commons.logging.Log;
024: import org.apache.commons.logging.LogFactory;
025:
026: import org.springframework.core.task.TaskExecutor;
027: import org.springframework.jms.support.JmsUtils;
028: import org.springframework.scheduling.timer.TimerTaskExecutor;
029:
030: /**
031: * Abstract base class for ServerSessionFactory implementations
032: * that pool ServerSessionFactory instances.
033: *
034: * <p>Provides a factory method that creates a poolable ServerSession
035: * (to be added as new instance to a pool), a callback method invoked
036: * when a ServerSession finished an execution of its listener (to return
037: * an instance to the pool), and a method to destroy a ServerSession instance
038: * (after removing an instance from the pool).
039: *
040: * @author Juergen Hoeller
041: * @since 2.0
042: * @see org.springframework.jms.listener.serversession.CommonsPoolServerSessionFactory
043: */
044: public abstract class AbstractPoolingServerSessionFactory implements
045: ServerSessionFactory {
046:
047: protected final Log logger = LogFactory.getLog(getClass());
048:
049: private TaskExecutor taskExecutor;
050:
051: private int maxSize;
052:
053: /**
054: * Specify the TaskExecutor to use for executing ServerSessions
055: * (and consequently, the underlying MessageListener).
056: * <p>Default is a {@link org.springframework.scheduling.timer.TimerTaskExecutor}
057: * for each pooled ServerSession, using one Thread per pooled JMS Session.
058: * Alternatives are a shared TimerTaskExecutor, sharing a single Thread
059: * for the execution of all ServerSessions, or a TaskExecutor
060: * implementation backed by a thread pool.
061: */
062: public void setTaskExecutor(TaskExecutor taskExecutor) {
063: this .taskExecutor = taskExecutor;
064: }
065:
066: /**
067: * Return the TaskExecutor to use for executing ServerSessions.
068: */
069: protected TaskExecutor getTaskExecutor() {
070: return this .taskExecutor;
071: }
072:
073: /**
074: * Set the maximum size of the pool.
075: */
076: public void setMaxSize(int maxSize) {
077: this .maxSize = maxSize;
078: }
079:
080: /**
081: * Return the maximum size of the pool.
082: */
083: public int getMaxSize() {
084: return this .maxSize;
085: }
086:
087: /**
088: * Create a new poolable ServerSession.
089: * To be called when a new instance should be added to the pool.
090: * @param sessionManager the listener session manager to create the
091: * poolable ServerSession for
092: * @return the new poolable ServerSession
093: * @throws JMSException if creation failed
094: */
095: protected final ServerSession createServerSession(
096: ListenerSessionManager sessionManager) throws JMSException {
097: return new PoolableServerSession(sessionManager);
098: }
099:
100: /**
101: * Destroy the given poolable ServerSession.
102: * To be called when an instance got removed from the pool.
103: * @param serverSession the poolable ServerSession to destroy
104: */
105: protected final void destroyServerSession(
106: ServerSession serverSession) {
107: if (serverSession != null) {
108: ((PoolableServerSession) serverSession).close();
109: }
110: }
111:
112: /**
113: * Template method called by a ServerSession if it finished
114: * execution of its listener and is ready to go back into the pool.
115: * <p>Subclasses should implement the actual returning of the instance
116: * to the pool.
117: * @param serverSession the ServerSession that finished its execution
118: * @param sessionManager the session manager that the ServerSession belongs to
119: */
120: protected abstract void serverSessionFinished(
121: ServerSession serverSession,
122: ListenerSessionManager sessionManager);
123:
124: /**
125: * ServerSession implementation designed to be pooled.
126: * Creates a new JMS Session on instantiation, reuses it
127: * for all executions, and closes it on <code>close</code>.
128: * <p>Creates a TimerTaskExecutor (using a single Thread) per
129: * ServerSession, unless given a specific TaskExecutor to use.
130: */
131: private class PoolableServerSession implements ServerSession {
132:
133: private final ListenerSessionManager sessionManager;
134:
135: private final Session session;
136:
137: private TaskExecutor taskExecutor;
138:
139: private TimerTaskExecutor internalExecutor;
140:
141: public PoolableServerSession(
142: final ListenerSessionManager sessionManager)
143: throws JMSException {
144: this .sessionManager = sessionManager;
145: this .session = sessionManager.createListenerSession();
146: this .taskExecutor = getTaskExecutor();
147: if (this .taskExecutor == null) {
148: this .internalExecutor = new TimerTaskExecutor();
149: this .internalExecutor.afterPropertiesSet();
150: this .taskExecutor = this .internalExecutor;
151: }
152: }
153:
154: public Session getSession() {
155: return this .session;
156: }
157:
158: public void start() {
159: this .taskExecutor.execute(new Runnable() {
160: public void run() {
161: try {
162: sessionManager.executeListenerSession(session);
163: } finally {
164: serverSessionFinished(
165: PoolableServerSession.this ,
166: sessionManager);
167: }
168: }
169: });
170: }
171:
172: public void close() {
173: if (this.internalExecutor != null) {
174: this.internalExecutor.destroy();
175: }
176: JmsUtils.closeSession(this.session);
177: }
178: }
179:
180: }
|