001: package com.mockrunner.mock.jms;
002:
003: import java.io.Serializable;
004: import java.util.ArrayList;
005: import java.util.Collection;
006: import java.util.Collections;
007: import java.util.HashSet;
008: import java.util.Iterator;
009: import java.util.List;
010: import java.util.Set;
011:
012: import javax.jms.BytesMessage;
013: import javax.jms.Destination;
014: import javax.jms.InvalidDestinationException;
015: import javax.jms.JMSException;
016: import javax.jms.MapMessage;
017: import javax.jms.Message;
018: import javax.jms.MessageConsumer;
019: import javax.jms.MessageListener;
020: import javax.jms.MessageProducer;
021: import javax.jms.ObjectMessage;
022: import javax.jms.Queue;
023: import javax.jms.QueueBrowser;
024: import javax.jms.Session;
025: import javax.jms.StreamMessage;
026: import javax.jms.TemporaryQueue;
027: import javax.jms.TemporaryTopic;
028: import javax.jms.TextMessage;
029: import javax.jms.Topic;
030: import javax.jms.TopicSubscriber;
031:
032: import com.mockrunner.jms.GenericTransmissionManager;
033: import com.mockrunner.jms.MessageManager;
034: import com.mockrunner.jms.QueueTransmissionManager;
035: import com.mockrunner.jms.TopicTransmissionManager;
036: import com.mockrunner.jms.TransmissionManagerWrapper;
037:
038: /**
039: * Mock implementation of JMS <code>Session</code>.
040: *
041: * Please note that this implementation does not
042: * implement transaction isolation at the moment.
043: * Messages are immediately sent. If acknowledge
044: * mode is AUTO_ACKNOWLEDGE or DUPS_OK_ACKNOWLEDGE,
045: * the message will be automatically acknowledged,
046: * otherwise, it will not be acknowledged. According
047: * to JMS specification, the acknowledged mode must
048: * be ignored for transacted sessions. This is currently
049: * not implemented, i.e. transacted sessions behave like
050: * sessions with acknowledge mode AUTO_ACKNOWLEDGE.
051: * Messages are acknowledged even if the transaction is
052: * rolled back. However, the framework keeps track if a
053: * transaction is committed or rolled back, so you can test
054: * this and rely on the container for the rest.
055: * You can set a <code>MessageListener</code> directly to
056: * the session. This is an application server internal feature
057: * and not meant for application use in JMS.
058: * This mock session dispatches any message of any
059: * known <code>Queue</code> and <code>Topic</code> to the
060: * distinguished <code>MessageListener</code>, if such
061: * a <code>MessageListener</code> is registered.
062: */
063: public class MockSession implements Session {
064: private MockConnection connection;
065: private QueueTransmissionManager queueTransManager;
066: private TopicTransmissionManager topicTransManager;
067: private GenericTransmissionManager genericTransManager;
068: private TransmissionManagerWrapper transManager;
069: private MessageManager messageManager;
070: private MessageListener messageListener;
071: private List tempQueues;
072: private List tempTopics;
073: private Set queues;
074: private Set topics;
075: private boolean transacted;
076: private int acknowledgeMode;
077: private int numberCommits;
078: private int numberRollbacks;
079: private boolean recovered;
080: private boolean closed;
081:
082: public MockSession(MockConnection connection, boolean transacted,
083: int acknowledgeMode) {
084: this .connection = connection;
085: this .transacted = transacted;
086: this .acknowledgeMode = acknowledgeMode;
087: queueTransManager = new QueueTransmissionManager(connection,
088: this );
089: topicTransManager = new TopicTransmissionManager(connection,
090: this );
091: genericTransManager = new GenericTransmissionManager(
092: connection, this );
093: transManager = new TransmissionManagerWrapper(
094: queueTransManager, topicTransManager,
095: genericTransManager);
096: messageManager = new MessageManager();
097: tempQueues = new ArrayList();
098: tempTopics = new ArrayList();
099: queues = new HashSet();
100: topics = new HashSet();
101: messageListener = null;
102: numberCommits = 0;
103: numberRollbacks = 0;
104: recovered = false;
105: closed = false;
106: }
107:
108: /**
109: * Returns the {@link com.mockrunner.jms.QueueTransmissionManager}.
110: * @return the {@link com.mockrunner.jms.QueueTransmissionManager}
111: */
112: public QueueTransmissionManager getQueueTransmissionManager() {
113: return queueTransManager;
114: }
115:
116: /**
117: * Returns the {@link com.mockrunner.jms.TopicTransmissionManager}.
118: * @return the {@link com.mockrunner.jms.TopicTransmissionManager}
119: */
120: public TopicTransmissionManager getTopicTransmissionManager() {
121: return topicTransManager;
122: }
123:
124: /**
125: * Returns the {@link com.mockrunner.jms.GenericTransmissionManager}.
126: * @return the {@link com.mockrunner.jms.GenericTransmissionManager}
127: */
128: public GenericTransmissionManager getGenericTransmissionManager() {
129: return genericTransManager;
130: }
131:
132: /**
133: * @deprecated use {@link #getTransmissionManagerWrapper}
134: */
135: public TransmissionManagerWrapper getTransmissionManager() {
136: return getTransmissionManagerWrapper();
137: }
138:
139: /**
140: * Returns the {@link com.mockrunner.jms.TransmissionManagerWrapper}.
141: * @return the {@link com.mockrunner.jms.TransmissionManagerWrapper}
142: */
143: public TransmissionManagerWrapper getTransmissionManagerWrapper() {
144: return transManager;
145: }
146:
147: /**
148: * Returns the {@link MessageManager} for this session.
149: * @return the {@link MessageManager}
150: */
151: public MessageManager getMessageManager() {
152: return messageManager;
153: }
154:
155: /**
156: * Returns the list of temporary queues.
157: * @return the <code>TemporaryQueue</code> list
158: */
159: public List getTemporaryQueueList() {
160: return Collections.unmodifiableList(tempQueues);
161: }
162:
163: /**
164: * Returns a <code>TemporaryQueue</code> by its index. The
165: * index represent the number of the queue. Returns <code>null</code>
166: * if no such <code>TemporaryQueue</code> is present.
167: * @param index the index
168: * @return the <code>TemporaryQueue</code>
169: */
170: public MockTemporaryQueue getTemporaryQueue(int index) {
171: if (tempQueues.size() <= index || index < 0)
172: return null;
173: return (MockTemporaryQueue) tempQueues.get(index);
174: }
175:
176: /**
177: * Returns the list of temporary topics.
178: * @return the <code>TemporaryTopic</code> list
179: */
180: public List getTemporaryTopicList() {
181: return Collections.unmodifiableList(tempTopics);
182: }
183:
184: /**
185: * Returns a <code>TemporaryTopic</code> by its index. The
186: * index represent the number of the topic. Returns <code>null</code>
187: * if no such <code>TemporaryTopic</code> is present.
188: * @param index the index
189: * @return the <code>TemporaryTopic</code>
190: */
191: public MockTemporaryTopic getTemporaryTopic(int index) {
192: if (tempTopics.size() <= index || index < 0)
193: return null;
194: return (MockTemporaryTopic) tempTopics.get(index);
195: }
196:
197: /**
198: * Returns if this session was closed.
199: * @return <code>true</code> if this session is closed
200: */
201: public boolean isClosed() {
202: return closed;
203: }
204:
205: /**
206: * Returns if this session was recovered.
207: * @return <code>true</code> if this session was recovered
208: */
209: public boolean isRecovered() {
210: return recovered;
211: }
212:
213: /**
214: * Returns if the current transaction was committed.
215: * @return <code>true</code> if the transaction was committed
216: */
217: public boolean isCommitted() {
218: return (numberCommits > 0);
219: }
220:
221: /**
222: * Returns the number of commits.
223: * @return the number of commits
224: */
225: public int getNumberCommits() {
226: return numberCommits;
227: }
228:
229: /**
230: * Returns if the current transaction was rolled back.
231: * @return <code>true</code> if the transaction was rolled back
232: */
233: public boolean isRolledBack() {
234: return (numberRollbacks > 0);
235: }
236:
237: /**
238: * Returns the number of rollbacks.
239: * @return the number of rollbacks
240: */
241: public int getNumberRollbacks() {
242: return numberRollbacks;
243: }
244:
245: /**
246: * Returns if messages should be automatically acknowledged,
247: * i.e. if the acknowledge mode is not <code>CLIENT_ACKNOWLEDGE</code>.
248: * @return <code>true</code> if messages are automatically acknowledged
249: */
250: public boolean isAutoAcknowledge() {
251: return acknowledgeMode != CLIENT_ACKNOWLEDGE;
252: }
253:
254: /**
255: * Note: Returns <code>0</code> if the session is transacted.
256: * This method does not exist in JMS 1.0.2. In JMS 1.1 it
257: * should return <code>Session.SESSION_TRANSACTED</code>
258: * which is specified as <code>0</code>. In order to avoid
259: * different versions for JMS 1.0.2 and 1.1
260: * (<code>Session.SESSION_TRANSACTED</code> does not
261: * exist in 1.0.2) this method returns hardcoded <code>0</code>,
262: * if the session is transacted.
263: * @return the acknowledge mode
264: */
265: public int getAcknowledgeMode() throws JMSException {
266: if (getTransacted())
267: return 0;
268: return acknowledgeMode;
269: }
270:
271: public boolean getTransacted() throws JMSException {
272: connection.throwJMSException();
273: return transacted;
274: }
275:
276: public BytesMessage createBytesMessage() throws JMSException {
277: connection.throwJMSException();
278: return getMessageManager().createBytesMessage();
279: }
280:
281: public MapMessage createMapMessage() throws JMSException {
282: connection.throwJMSException();
283: return getMessageManager().createMapMessage();
284: }
285:
286: public Message createMessage() throws JMSException {
287: connection.throwJMSException();
288: return getMessageManager().createMessage();
289: }
290:
291: public ObjectMessage createObjectMessage() throws JMSException {
292: connection.throwJMSException();
293: return createObjectMessage(null);
294: }
295:
296: public ObjectMessage createObjectMessage(Serializable object)
297: throws JMSException {
298: connection.throwJMSException();
299: return getMessageManager().createObjectMessage(object);
300: }
301:
302: public StreamMessage createStreamMessage() throws JMSException {
303: connection.throwJMSException();
304: return getMessageManager().createStreamMessage();
305: }
306:
307: public TextMessage createTextMessage() throws JMSException {
308: connection.throwJMSException();
309: return createTextMessage(null);
310: }
311:
312: public TextMessage createTextMessage(String text)
313: throws JMSException {
314: connection.throwJMSException();
315: return getMessageManager().createTextMessage(text);
316: }
317:
318: public MessageListener getMessageListener() throws JMSException {
319: connection.throwJMSException();
320: return messageListener;
321: }
322:
323: public void setMessageListener(MessageListener messageListener)
324: throws JMSException {
325: connection.throwJMSException();
326: this .messageListener = messageListener;
327: }
328:
329: public void run() {
330:
331: }
332:
333: public void commit() throws JMSException {
334: connection.throwJMSException();
335: numberCommits++;
336: }
337:
338: public void rollback() throws JMSException {
339: connection.throwJMSException();
340: recover();
341: numberRollbacks++;
342: }
343:
344: public void close() throws JMSException {
345: connection.throwJMSException();
346: if (getTransacted() && !isCommitted()) {
347: rollback();
348: }
349: getQueueTransmissionManager().closeAll();
350: getTopicTransmissionManager().closeAll();
351: getGenericTransmissionManager().closeAll();
352: removeSessionFromDestinations(tempQueues);
353: removeSessionFromDestinations(tempTopics);
354: removeSessionFromDestinations(queues);
355: removeSessionFromDestinations(topics);
356: queues.clear();
357: topics.clear();
358: closed = true;
359: }
360:
361: private void removeSessionFromDestinations(Collection destinations) {
362: Iterator iterator = destinations.iterator();
363: while (iterator.hasNext()) {
364: Object currentDestination = (Object) iterator.next();
365: if (currentDestination instanceof MockDestination)
366: ((MockDestination) currentDestination)
367: .removeSession(this );
368: }
369: }
370:
371: public void recover() throws JMSException {
372: connection.throwJMSException();
373: recovered = true;
374: }
375:
376: public void unsubscribe(String name) throws JMSException {
377: getConnection().throwJMSException();
378: topicTransManager.removeTopicDurableSubscriber(name);
379: }
380:
381: public Queue createQueue(String name) throws JMSException {
382: getConnection().throwJMSException();
383: MockQueue queue = ((MockConnection) getConnection())
384: .getDestinationManager().getQueue(name);
385: if (null == queue) {
386: throw new JMSException("Queue with name " + name
387: + " not found");
388: }
389: addSessionToQueue(queue);
390: return queue;
391: }
392:
393: public TemporaryQueue createTemporaryQueue() throws JMSException {
394: getConnection().throwJMSException();
395: MockTemporaryQueue queue = new MockTemporaryQueue();
396: tempQueues.add(queue);
397: addSessionToQueue(queue);
398: return queue;
399: }
400:
401: public Topic createTopic(String name) throws JMSException {
402: getConnection().throwJMSException();
403: MockTopic topic = ((MockConnection) getConnection())
404: .getDestinationManager().getTopic(name);
405: if (null == topic) {
406: throw new JMSException("Topic with name " + name
407: + " not found");
408: }
409: addSessionToTopic(topic);
410: return topic;
411: }
412:
413: public TemporaryTopic createTemporaryTopic() throws JMSException {
414: getConnection().throwJMSException();
415: MockTemporaryTopic topic = new MockTemporaryTopic();
416: tempTopics.add(topic);
417: addSessionToTopic(topic);
418: return topic;
419: }
420:
421: public MessageConsumer createConsumer(Destination destination)
422: throws JMSException {
423: getConnection().throwJMSException();
424: return createConsumer(destination, null);
425: }
426:
427: public MessageConsumer createConsumer(Destination destination,
428: String messageSelector) throws JMSException {
429: getConnection().throwJMSException();
430: return createConsumer(destination, messageSelector, false);
431: }
432:
433: public MessageConsumer createConsumer(Destination destination,
434: String messageSelector, boolean noLocal)
435: throws JMSException {
436: if (null == destination) {
437: throw new IllegalArgumentException(
438: "destination must not be null");
439: }
440: getConnection().throwJMSException();
441: if (destination instanceof MockQueue) {
442: addSessionToQueue((Queue) destination);
443: return getQueueTransmissionManager().createQueueReceiver(
444: (MockQueue) destination, messageSelector);
445: } else if (destination instanceof MockTopic) {
446: addSessionToTopic((Topic) destination);
447: return getTopicTransmissionManager().createTopicSubscriber(
448: (MockTopic) destination, messageSelector, noLocal);
449: } else {
450: throw new InvalidDestinationException(
451: "destination must be an instance of MockQueue or MockTopic");
452: }
453: }
454:
455: public MessageProducer createProducer(Destination destination)
456: throws JMSException {
457: getConnection().throwJMSException();
458: if (null == destination) {
459: return createProducerForNullDestination();
460: }
461: if (destination instanceof MockQueue) {
462: addSessionToQueue((Queue) destination);
463: return getQueueTransmissionManager().createQueueSender(
464: (MockQueue) destination);
465: } else if (destination instanceof MockTopic) {
466: addSessionToTopic((Topic) destination);
467: return getTopicTransmissionManager().createTopicPublisher(
468: (MockTopic) destination);
469: } else {
470: throw new InvalidDestinationException(
471: "destination must be an instance of MockQueue or MockTopic");
472: }
473: }
474:
475: public QueueBrowser createBrowser(Queue queue) throws JMSException {
476: getConnection().throwJMSException();
477: return createBrowser(queue, null);
478: }
479:
480: public QueueBrowser createBrowser(Queue queue,
481: String messageSelector) throws JMSException {
482: getConnection().throwJMSException();
483: if (!(queue instanceof MockQueue)) {
484: throw new InvalidDestinationException(
485: "queue must be an instance of MockQueue");
486: }
487: addSessionToQueue(queue);
488: return queueTransManager.createQueueBrowser((MockQueue) queue,
489: messageSelector);
490: }
491:
492: public TopicSubscriber createDurableSubscriber(Topic topic,
493: String name) throws JMSException {
494: getConnection().throwJMSException();
495: return createDurableSubscriber(topic, name, null, false);
496: }
497:
498: public TopicSubscriber createDurableSubscriber(Topic topic,
499: String name, String messageSelector, boolean noLocal)
500: throws JMSException {
501: getConnection().throwJMSException();
502: if (!(topic instanceof MockTopic)) {
503: throw new InvalidDestinationException(
504: "topic must be an instance of MockTopic");
505: }
506: addSessionToTopic(topic);
507: return topicTransManager.createDurableTopicSubscriber(
508: (MockTopic) topic, name, messageSelector, noLocal);
509: }
510:
511: protected MockConnection getConnection() {
512: return connection;
513: }
514:
515: public void addSessionToQueue(Queue queue) {
516: if (queue instanceof MockQueue) {
517: ((MockQueue) queue).addSession(this );
518: queues.add(queue);
519: }
520: }
521:
522: public void addSessionToTopic(Topic topic) {
523: if (topic instanceof MockTopic) {
524: ((MockTopic) topic).addSession(this );
525: topics.add(topic);
526: }
527: }
528:
529: protected MessageProducer createProducerForNullDestination() {
530: return getGenericTransmissionManager().createMessageProducer();
531: }
532: }
|