001: /*
002: * JBoss, Home of Professional Open Source.
003: * Copyright 2006, Red Hat Middleware LLC, and individual contributors
004: * as indicated by the @author tags. See the copyright.txt file in the
005: * distribution for a full listing of individual contributors.
006: *
007: * This is free software; you can redistribute it and/or modify it
008: * under the terms of the GNU Lesser General Public License as
009: * published by the Free Software Foundation; either version 2.1 of
010: * the License, or (at your option) any later version.
011: *
012: * This software is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015: * Lesser General Public License for more details.
016: *
017: * You should have received a copy of the GNU Lesser General Public
018: * License along with this software; if not, write to the Free
019: * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020: * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
021: */
022: package org.jboss.mq;
023:
024: import java.io.Serializable;
025:
026: import javax.jms.ConnectionConsumer;
027: import javax.jms.Destination;
028: import javax.jms.IllegalStateException;
029: import javax.jms.InvalidDestinationException;
030: import javax.jms.JMSException;
031: import javax.jms.Queue;
032: import javax.jms.QueueConnection;
033: import javax.jms.QueueSession;
034: import javax.jms.ServerSessionPool;
035: import javax.jms.Session;
036: import javax.jms.TemporaryQueue;
037: import javax.jms.TemporaryTopic;
038: import javax.jms.Topic;
039: import javax.jms.TopicConnection;
040: import javax.jms.TopicSession;
041:
042: import org.jboss.util.UnreachableStatementException;
043:
044: /**
045: * This class implements javax.jms.QueueConnection and
046: * javax.jms.TopicConnection
047: *
048: * @author Norbert Lataille (Norbert.Lataille@m4x.org)
049: * @author Hiram Chirino (Cojonudo14@hotmail.com)
050: * @author <a href="mailto:adrian@jboss.org">Adrian Brock</a>
051: * @version $Revision: 57198 $
052: */
053: public class SpyConnection extends Connection implements Serializable,
054: TopicConnection, QueueConnection {
055: private static final long serialVersionUID = -6227193901482445607L;
056:
057: /** Unified */
058: public static final int UNIFIED = 0;
059:
060: /** Queue */
061: public static final int QUEUE = 1;
062:
063: /** Topic */
064: public static final int TOPIC = 2;
065:
066: /** The type of connection */
067: private int type = UNIFIED;
068:
069: /**
070: * Create a new SpyConnection
071: *
072: * @param userId the user
073: * @param password the password
074: * @param gcf the constructing class
075: * @throws JMSException for any error
076: */
077: public SpyConnection(String userId, String password,
078: GenericConnectionFactory gcf) throws JMSException {
079: super (userId, password, gcf);
080: }
081:
082: /**
083: * Create a new SpyConnection
084: *
085: * @param gcf the constructing class
086: * @throws JMSException for any error
087: */
088: public SpyConnection(GenericConnectionFactory gcf)
089: throws JMSException {
090: super (gcf);
091: }
092:
093: /**
094: * Create a new SpyConnection
095: *
096: * @param type the type of connection
097: * @param userId the user
098: * @param password the password
099: * @param gcf the constructing class
100: * @throws JMSException for any error
101: */
102: public SpyConnection(int type, String userId, String password,
103: GenericConnectionFactory gcf) throws JMSException {
104: super (userId, password, gcf);
105: this .type = type;
106: }
107:
108: /**
109: * Create a new SpyConnection
110: * @param type the type of connection
111: * @param gcf the constructing class
112: * @throws JMSException for any error
113: */
114: public SpyConnection(int type, GenericConnectionFactory gcf)
115: throws JMSException {
116: super (gcf);
117: this .type = type;
118: }
119:
120: public ConnectionConsumer createConnectionConsumer(
121: Destination destination, String messageSelector,
122: ServerSessionPool sessionPool, int maxMessages)
123: throws JMSException {
124: checkClosed();
125: if (destination == null)
126: throw new InvalidDestinationException("Null destination");
127: checkTemporary(destination);
128:
129: return new SpyConnectionConsumer(this , destination,
130: messageSelector, sessionPool, maxMessages);
131: }
132:
133: public Session createSession(boolean transacted, int acknowledgeMode)
134: throws JMSException {
135: checkClosed();
136: checkClientID();
137:
138: if (transacted)
139: acknowledgeMode = 0;
140: Session session = new SpySession(this , transacted,
141: acknowledgeMode, false);
142:
143: //add the new session to the createdSessions list
144: synchronized (createdSessions) {
145: createdSessions.add(session);
146: }
147:
148: return session;
149: }
150:
151: public TopicSession createTopicSession(boolean transacted,
152: int acknowledgeMode) throws JMSException {
153: checkClosed();
154: checkClientID();
155:
156: if (transacted)
157: acknowledgeMode = 0;
158: TopicSession session = new SpyTopicSession(this , transacted,
159: acknowledgeMode);
160:
161: //add the new session to the createdSessions list
162: synchronized (createdSessions) {
163: createdSessions.add(session);
164: }
165:
166: return session;
167: }
168:
169: public ConnectionConsumer createConnectionConsumer(Topic topic,
170: String messageSelector, ServerSessionPool sessionPool,
171: int maxMessages) throws JMSException {
172: checkClosed();
173: if (type == QUEUE)
174: throw new IllegalStateException(
175: "Cannot create a topic consumer on a QueueConnection");
176: if (topic == null)
177: throw new InvalidDestinationException("Null topic");
178: checkClientID();
179: checkTemporary(topic);
180:
181: return new SpyConnectionConsumer(this , topic, messageSelector,
182: sessionPool, maxMessages);
183: }
184:
185: public ConnectionConsumer createDurableConnectionConsumer(
186: Topic topic, String subscriptionName,
187: String messageSelector, ServerSessionPool sessionPool,
188: int maxMessages) throws JMSException {
189: checkClosed();
190: if (type == QUEUE)
191: throw new IllegalStateException(
192: "Cannot create a topic consumer on a QueueConnection");
193: if (topic == null)
194: throw new InvalidDestinationException("Null topic");
195: if (topic instanceof TemporaryTopic)
196: throw new InvalidDestinationException(
197: "Attempt to create a durable subscription for a temporary topic");
198:
199: if (subscriptionName == null
200: || subscriptionName.trim().length() == 0)
201: throw new JMSException("Null or empty subscription");
202:
203: SpyTopic t = new SpyTopic((SpyTopic) topic, getClientID(),
204: subscriptionName, messageSelector);
205: return new SpyConnectionConsumer(this , t, messageSelector,
206: sessionPool, maxMessages);
207: }
208:
209: public ConnectionConsumer createConnectionConsumer(Queue queue,
210: String messageSelector, ServerSessionPool sessionPool,
211: int maxMessages) throws JMSException {
212: checkClosed();
213: if (type == TOPIC)
214: throw new IllegalStateException(
215: "Cannot create a queue consumer on a TopicConnection");
216: if (queue == null)
217: throw new InvalidDestinationException("Null queue");
218: checkTemporary(queue);
219:
220: return new SpyConnectionConsumer(this , queue, messageSelector,
221: sessionPool, maxMessages);
222: }
223:
224: public QueueSession createQueueSession(boolean transacted,
225: int acknowledgeMode) throws JMSException {
226: checkClosed();
227: checkClientID();
228: if (transacted)
229: acknowledgeMode = 0;
230: QueueSession session = new SpyQueueSession(this , transacted,
231: acknowledgeMode);
232:
233: //add the new session to the createdSessions list
234: synchronized (createdSessions) {
235: createdSessions.add(session);
236: }
237:
238: return session;
239: }
240:
241: TemporaryTopic getTemporaryTopic() throws JMSException {
242: checkClosed();
243: checkClientID();
244: try {
245: SpyTemporaryTopic temp = (SpyTemporaryTopic) serverIL
246: .getTemporaryTopic(connectionToken);
247: temp.setConnection(this );
248: synchronized (temps) {
249: temps.add(temp);
250: }
251: return temp;
252: } catch (Throwable t) {
253: SpyJMSException.rethrowAsJMSException(
254: "Cannot create a Temporary Topic", t);
255: throw new UnreachableStatementException();
256: }
257: }
258:
259: Topic createTopic(String name) throws JMSException {
260: checkClosed();
261: checkClientID();
262: try {
263: return serverIL.createTopic(connectionToken, name);
264: } catch (Throwable t) {
265: SpyJMSException.rethrowAsJMSException(
266: "Cannot get the Topic from the provider", t);
267: throw new UnreachableStatementException();
268: }
269: }
270:
271: TemporaryQueue getTemporaryQueue() throws JMSException {
272: checkClosed();
273: checkClientID();
274: try {
275: SpyTemporaryQueue temp = (SpyTemporaryQueue) serverIL
276: .getTemporaryQueue(connectionToken);
277: temp.setConnection(this );
278: synchronized (temps) {
279: temps.add(temp);
280: }
281: return temp;
282: } catch (Throwable t) {
283: SpyJMSException.rethrowAsJMSException(
284: "Cannot create a Temporary Queue", t);
285: throw new UnreachableStatementException();
286: }
287: }
288:
289: Queue createQueue(String name) throws JMSException {
290: checkClosed();
291: checkClientID();
292: try {
293:
294: return serverIL.createQueue(connectionToken, name);
295: } catch (Throwable t) {
296: SpyJMSException.rethrowAsJMSException(
297: "Cannot get the Queue from the provider", t);
298: throw new UnreachableStatementException();
299: }
300: }
301: }
|