001: /*
002: * Copyright (c) 1998-2008 Caucho Technology -- all rights reserved
003: *
004: * This file is part of Resin(R) Open Source
005: *
006: * Each copy or derived work must preserve the copyright notice and this
007: * notice unmodified.
008: *
009: * Resin Open Source is free software; you can redistribute it and/or modify
010: * it under the terms of the GNU General Public License as published by
011: * the Free Software Foundation; either version 2 of the License, or
012: * (at your option) any later version.
013: *
014: * Resin Open Source is distributed in the hope that it will be useful,
015: * but WITHOUT ANY WARRANTY; without even the implied warranty of
016: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE, or any warranty
017: * of NON-INFRINGEMENT. See the GNU General Public License for more
018: * details.
019: *
020: * You should have received a copy of the GNU General Public License
021: * along with Resin Open Source; if not, write to the
022: *
023: * Free Software Foundation, Inc.
024: * 59 Temple Place, Suite 330
025: * Boston, MA 02111-1307 USA
026: *
027: * @author Scott Ferguson
028: */
029:
030: package com.caucho.jms.connection;
031:
032: import javax.jms.*;
033:
034: import com.caucho.jms.queue.*;
035:
036: /**
037: * A sample queue session. Lets the client create queues, browsers, etc.
038: */
039: public class QueueSessionImpl extends JmsSession implements
040: XAQueueSession, QueueSession {
041: /**
042: * Creates the session
043: */
044: public QueueSessionImpl(ConnectionImpl connection,
045: boolean isTransacted, int ackMode, boolean isXA)
046: throws JMSException {
047: super (connection, isTransacted, ackMode, isXA);
048: }
049:
050: /**
051: * Creates a receiver to receive messages.
052: *
053: * @param queue the queue to receive messages from.
054: */
055: public QueueReceiver createReceiver(Queue queue)
056: throws JMSException {
057: checkOpen();
058:
059: return createReceiver(queue, null);
060: }
061:
062: /**
063: * Creates a receiver to receive messages.
064: *
065: * @param queue the queue to receive messages from.
066: * @param messageSelector query to restrict the messages.
067: */
068: public QueueReceiver createReceiver(Queue queue,
069: String messageSelector) throws JMSException {
070: checkOpen();
071:
072: if (queue == null)
073: throw new InvalidDestinationException(
074: L
075: .l("queue is null. Destination may not be null for Session.createReceiver"));
076:
077: if (!(queue instanceof AbstractQueue))
078: throw new InvalidDestinationException(
079: L
080: .l(
081: "'{0}' is an unknown destination. The destination must be a Resin JMS Destination.",
082: queue));
083:
084: AbstractQueue dest = (AbstractQueue) queue;
085:
086: if (dest instanceof TemporaryQueueImpl) {
087: TemporaryQueueImpl temp = (TemporaryQueueImpl) dest;
088:
089: if (temp.getSession() != this ) {
090: throw new javax.jms.IllegalStateException(
091: L
092: .l(
093: "temporary queue '{0}' does not belong to this session '{1}'",
094: queue, this ));
095: }
096: }
097:
098: QueueReceiverImpl receiver = new QueueReceiverImpl(this , dest,
099: messageSelector);
100:
101: addConsumer(receiver);
102:
103: return receiver;
104: }
105:
106: /**
107: * Creates a QueueSender to send messages to a queue.
108: *
109: * @param queue the queue to send messages to.
110: */
111: public QueueSender createSender(Queue queue) throws JMSException {
112: checkOpen();
113:
114: if (queue == null) {
115: return new QueueSenderImpl(this , null);
116: }
117:
118: if (!(queue instanceof AbstractQueue))
119: throw new InvalidDestinationException(
120: L
121: .l(
122: "'{0}' is an unknown destination. The destination must be a Resin JMS destination for Session.createProducer.",
123: queue));
124:
125: AbstractQueue dest = (AbstractQueue) queue;
126:
127: if (dest instanceof TemporaryQueueImpl) {
128: TemporaryQueueImpl temp = (TemporaryQueueImpl) dest;
129:
130: if (temp.getSession() != this ) {
131: throw new javax.jms.IllegalStateException(
132: L
133: .l(
134: "temporary queue '{0}' does not belong to this session '{1}'",
135: queue, this ));
136: }
137: }
138:
139: return new QueueSenderImpl(this , dest);
140: }
141:
142: /**
143: * Creates a new topic.
144: */
145: @Override
146: public Topic createTopic(String topicName) throws JMSException {
147: throw new javax.jms.IllegalStateException(L
148: .l("QueueSession: createTopic() is invalid."));
149: }
150:
151: /**
152: * Creates a temporary topic.
153: */
154: @Override
155: public TemporaryTopic createTemporaryTopic() throws JMSException {
156: throw new javax.jms.IllegalStateException(L
157: .l("QueueSession: createTemporaryTopic() is invalid."));
158: }
159:
160: /**
161: * Creates a durable subscriber to receive messages.
162: *
163: * @param topic the topic to receive messages from.
164: */
165: @Override
166: public TopicSubscriber createDurableSubscriber(Topic topic,
167: String name) throws JMSException {
168: throw new javax.jms.IllegalStateException(
169: L
170: .l("QueueSession: createDurableSubscriber() is invalid."));
171: }
172:
173: /**
174: * Creates a subscriber to receive messages.
175: *
176: * @param topic the topic to receive messages from.
177: * @param messageSelector topic to restrict the messages.
178: * @param noLocal if true, don't receive messages we've sent
179: */
180: @Override
181: public TopicSubscriber createDurableSubscriber(Topic topic,
182: String name, String messageSelector, boolean noLocal)
183: throws JMSException {
184: throw new javax.jms.IllegalStateException(
185: L
186: .l("QueueSession: createDurableSubscriber() is invalid."));
187: }
188:
189: /**
190: * Unsubscribe from a durable subscription.
191: */
192: @Override
193: public void unsubscribe(String name) throws JMSException {
194: throw new javax.jms.IllegalStateException(L
195: .l("QueueSession: unsubscribe() is invalid."));
196: }
197:
198: public QueueSession getQueueSession() {
199: return this;
200: }
201: }
|