001: /*
002: * JFox - The most lightweight Java EE Application Server!
003: * more details please visit http://www.huihoo.org/jfox or http://www.jfox.org.cn.
004: *
005: * JFox is licenced and re-distributable under GNU LGPL.
006: */
007:
008: /* JFox, the OpenSource J2EE Application Server
009: *
010: * Distributable under GNU LGPL license by gun.org
011: * more details please visit http://www.huihoo.org/jfox
012: */
013:
014: package org.jfox.jms;
015:
016: import java.io.Serializable;
017: import java.util.HashMap;
018: import java.util.Map;
019: import java.util.UUID;
020: import javax.jms.*;
021: import javax.transaction.xa.XAResource;
022:
023: import org.jfox.jms.message.BytesMessageImpl;
024: import org.jfox.jms.message.JMSMessage;
025: import org.jfox.jms.message.MapMessageImpl;
026: import org.jfox.jms.message.ObjectMessageImpl;
027: import org.jfox.jms.message.StreamMessageImpl;
028: import org.jfox.jms.message.TextMessageImpl;
029:
030: /**
031: * @author <a href="mailto:young_yy@hotmail.com">Young Yang</a>
032: */
033:
034: public class JMSSession implements Session, QueueSession, TopicSession,
035: XASession, XAQueueSession, XATopicSession {
036:
037: private JMSConnection connection;
038:
039: private boolean transacted;
040:
041: private int acknowledgeMode;
042:
043: private boolean isXA;
044:
045: private boolean closed = false;
046:
047: private MessageListener listener;
048:
049: private Map<String, JMSConsumer> consumerMap = new HashMap<String, JMSConsumer>();
050: private Map<String, JMSProducer> producerMap = new HashMap<String, JMSProducer>();
051:
052: private String sessionId = UUID.randomUUID().toString();
053:
054: public JMSSession(JMSConnection conn, boolean transacted,
055: int acknowledgeMode, boolean isXA) {
056: this .connection = conn;
057: this .transacted = transacted;
058: this .acknowledgeMode = acknowledgeMode;
059: this .isXA = isXA;
060: }
061:
062: public BytesMessage createBytesMessage() throws JMSException {
063: checkClosed();
064: return new BytesMessageImpl();
065: }
066:
067: public MapMessage createMapMessage() throws JMSException {
068: checkClosed();
069: return new MapMessageImpl();
070: }
071:
072: public Message createMessage() throws JMSException {
073: checkClosed();
074: return new JMSMessage();
075: }
076:
077: public ObjectMessage createObjectMessage() throws JMSException {
078: checkClosed();
079: return new ObjectMessageImpl();
080: }
081:
082: public ObjectMessage createObjectMessage(Serializable object)
083: throws JMSException {
084: checkClosed();
085: ObjectMessageImpl om = new ObjectMessageImpl();
086: om.setObject(object);
087: return om;
088: }
089:
090: public StreamMessage createStreamMessage() throws JMSException {
091: checkClosed();
092: return new StreamMessageImpl();
093: }
094:
095: public TextMessage createTextMessage() throws JMSException {
096: checkClosed();
097: return new TextMessageImpl();
098: }
099:
100: public TextMessage createTextMessage(String text)
101: throws JMSException {
102: checkClosed();
103: TextMessageImpl message = new TextMessageImpl();
104: message.setText(text);
105: return message;
106: }
107:
108: public boolean getTransacted() throws JMSException {
109: return transacted;
110: }
111:
112: public int getAcknowledgeMode() throws JMSException {
113: return acknowledgeMode;
114: }
115:
116: public synchronized void commit() throws JMSException {
117: checkClosed();
118: throw new JMSException("not support now!");
119: }
120:
121: public synchronized void rollback() throws JMSException {
122: checkClosed();
123: throw new JMSException("not support now!");
124: }
125:
126: public synchronized void recover() throws JMSException {
127: throw new JMSException("not support now!");
128: }
129:
130: public MessageListener getMessageListener() throws JMSException {
131: return listener;
132: }
133:
134: public void setMessageListener(MessageListener listener)
135: throws JMSException {
136: checkClosed();
137: this .listener = listener;
138: }
139:
140: public MessageProducer createProducer(Destination destination)
141: throws JMSException {
142: if (destination == null) {
143: throw new InvalidDestinationException("destination is null");
144: }
145: JMSProducer producer = new JMSProducer(this , destination);
146: return producer;
147: }
148:
149: public MessageConsumer createConsumer(Destination destination)
150: throws JMSException {
151: return createConsumer(destination, null);
152: }
153:
154: public MessageConsumer createConsumer(Destination destination,
155: String messageSelector) throws JMSException {
156: return createConsumer(destination, messageSelector, false);
157: }
158:
159: public synchronized MessageConsumer createConsumer(
160: Destination destination, String messageSelector,
161: boolean NoLocal) throws JMSException {
162: if (destination == null) {
163: throw new InvalidDestinationException("destination is null");
164: }
165: JMSConsumer consumer = new JMSConsumer(this , destination,
166: messageSelector, NoLocal);
167: consumerMap.put(consumer.getConsumerId(), consumer);
168: return consumer;
169: }
170:
171: public Queue createQueue(String queueName) throws JMSException {
172: Queue queue = getJMSConnection().getConnectionFactory()
173: .createQueue(queueName);
174: return queue;
175: }
176:
177: public Topic createTopic(String topicName) throws JMSException {
178: Topic topic = getJMSConnection().getConnectionFactory()
179: .createTopic(topicName);
180: return topic;
181: }
182:
183: public TopicSubscriber createDurableSubscriber(Topic topic,
184: String name) throws JMSException {
185: throw new JMSException("not support now!");
186: }
187:
188: public TopicSubscriber createDurableSubscriber(Topic topic,
189: String name, String messageSelector, boolean noLocal)
190: throws JMSException {
191: throw new JMSException("not support now!");
192: }
193:
194: public QueueBrowser createBrowser(Queue queue) throws JMSException {
195: throw new JMSException("not support now!");
196: }
197:
198: public QueueBrowser createBrowser(Queue queue,
199: String messageSelector) throws JMSException {
200: throw new JMSException("not support now!");
201: }
202:
203: public TemporaryQueue createTemporaryQueue() throws JMSException {
204: throw new JMSException("not support now!");
205: }
206:
207: public TemporaryTopic createTemporaryTopic() throws JMSException {
208: throw new JMSException("not support now!");
209: }
210:
211: public void unsubscribe(String name) throws JMSException {
212: throw new JMSException("not support now!");
213: }
214:
215: public QueueReceiver createReceiver(Queue queue)
216: throws JMSException {
217: return createReceiver(queue, null);
218: }
219:
220: public QueueReceiver createReceiver(Queue queue,
221: String messageSelector) throws JMSException {
222: return (QueueReceiver) createConsumer(queue, messageSelector);
223: }
224:
225: public QueueSender createSender(Queue queue) throws JMSException {
226: return (QueueSender) createProducer(queue);
227: }
228:
229: public TopicSubscriber createSubscriber(Topic topic)
230: throws JMSException {
231: return createSubscriber(topic, null, false);
232: }
233:
234: public TopicSubscriber createSubscriber(Topic topic,
235: String messageSelector, boolean noLocal)
236: throws JMSException {
237: return (TopicSubscriber) createConsumer(topic, messageSelector,
238: noLocal);
239: }
240:
241: public TopicPublisher createPublisher(Topic topic)
242: throws JMSException {
243: return (TopicPublisher) createProducer(topic);
244: }
245:
246: public Session getSession() throws JMSException {
247: return this ;
248: }
249:
250: public XAResource getXAResource() {
251: if (!isXA) {
252: throw new java.lang.IllegalStateException(
253: "current session " + this + " is not an XASession");
254: }
255: throw new UnsupportedOperationException(
256: "Not support getXAResource.");
257: }
258:
259: public QueueSession getQueueSession() throws JMSException {
260: return (QueueSession) getSession();
261: }
262:
263: public TopicSession getTopicSession() throws JMSException {
264: return (TopicSession) getSession();
265: }
266:
267: public void run() {
268: // do nothing
269: }
270:
271: private void checkClosed() throws javax.jms.IllegalStateException {
272: if (closed) {
273: throw new javax.jms.IllegalStateException(
274: "connection closed");
275: }
276: }
277:
278: /**
279: * get session id
280: *
281: * @return String
282: */
283: protected String getSessionId() {
284: return sessionId;
285: }
286:
287: JMSConnection getJMSConnection() {
288: return connection;
289: }
290:
291: public synchronized void close() throws JMSException {
292: if (closed)
293: return;
294: this .closed = true;
295: connection.removeSession(sessionId);
296: stop();
297: consumerMap.clear();
298: producerMap.clear();
299: }
300:
301: // start all consumers
302: void start() {
303: for (JMSConsumer consumer : consumerMap.values()) {
304: consumer.start();
305: }
306: }
307:
308: // stop all consumers
309: void stop() {
310:
311: for (JMSConsumer consumer : consumerMap.values()) {
312: consumer.stop();
313: }
314: }
315:
316: void removeConsumer(String consumerId) {
317: consumerMap.remove(consumerId);
318: }
319:
320: void removeProducer(String producerId) {
321: producerMap.remove(producerId);
322: }
323:
324: public static void main(String[] args) {
325:
326: }
327: }
|