001: /*
002: * Copyright (c) 1998-2008 Caucho Technology -- all rights reserved
003: *
004: * This file is part of Resin(R) Open Source
005: *
006: * Each copy or derived work must preserve the copyright notice and this
007: * notice unmodified.
008: *
009: * Resin Open Source is free software; you can redistribute it and/or modify
010: * it under the terms of the GNU General Public License as published by
011: * the Free Software Foundation; either version 2 of the License, or
012: * (at your option) any later version.
013: *
014: * Resin Open Source is distributed in the hope that it will be useful,
015: * but WITHOUT ANY WARRANTY; without even the implied warranty of
016: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE, or any warranty
017: * of NON-INFRINGEMENT. See the GNU General Public License for more
018: * details.
019: *
020: * You should have received a copy of the GNU General Public License
021: * along with Resin Open Source; if not, write to the
022: *
023: * Free Software Foundation, Inc.
024: * 59 Temple Place, Suite 330
025: * Boston, MA 02111-1307 USA
026: *
027: * @author Scott Ferguson
028: */
029:
030: package com.caucho.jms.file;
031:
032: import java.util.ArrayList;
033: import java.util.HashMap;
034: import java.util.logging.*;
035:
036: import javax.jms.*;
037:
038: import com.caucho.jms.message.*;
039: import com.caucho.jms.queue.*;
040: import com.caucho.jms.memory.MemoryQueue;
041: import com.caucho.jms.connection.*;
042: import com.caucho.vfs.*;
043:
044: /**
045: * Implements a file topic.
046: */
047: public class FileTopic extends AbstractTopic {
048: private static final Logger log = Logger.getLogger(FileTopic.class
049: .getName());
050:
051: private final FileQueueStore _store;
052:
053: private HashMap<String, AbstractQueue> _durableSubscriptionMap = new HashMap<String, AbstractQueue>();
054:
055: private ArrayList<AbstractQueue> _subscriptionList = new ArrayList<AbstractQueue>();
056:
057: private int _id;
058:
059: public FileTopic() {
060: _store = new FileQueueStore(_messageFactory);
061: }
062:
063: //
064: // Configuration
065: //
066:
067: /**
068: * Sets the path to the backing database
069: */
070: public void setPath(Path path) {
071: _store.setPath(path);
072: }
073:
074: //
075: // JMX configuration attributes
076: //
077:
078: /**
079: * Returns the JMS configuration url.
080: */
081: public String getUrl() {
082: return "file:name=" + getName() + ";path="
083: + _store.getPath().getURL();
084: }
085:
086: public void init() {
087: }
088:
089: @Override
090: public AbstractQueue createSubscriber(JmsSession session,
091: String name, boolean noLocal) {
092: AbstractQueue queue;
093:
094: if (name != null) {
095: queue = _durableSubscriptionMap.get(name);
096:
097: if (queue == null) {
098: queue = new FileSubscriberQueue(this , session, noLocal);
099: queue.setName(getName() + ":sub-" + name);
100:
101: _subscriptionList.add(queue);
102: _durableSubscriptionMap.put(name, queue);
103: }
104:
105: return queue;
106: } else {
107: queue = new FileSubscriberQueue(this , session, noLocal);
108: queue.setName(getName() + ":sub-" + _id++);
109:
110: _subscriptionList.add(queue);
111: }
112:
113: return queue;
114: }
115:
116: @Override
117: public void closeSubscriber(AbstractQueue queue) {
118: if (!_durableSubscriptionMap.values().contains(queue))
119: _subscriptionList.remove(queue);
120: }
121:
122: @Override
123: public void send(JmsSession session, MessageImpl msg, long timeout)
124: throws JMSException {
125: for (int i = 0; i < _subscriptionList.size(); i++) {
126: _subscriptionList.get(i).send(session, msg, timeout);
127: }
128: }
129:
130: public String toString() {
131: return "FileTopic[" + getTopicName() + "]";
132: }
133: }
|