001: /*
002: * <copyright>
003: *
004: * Copyright 1997-2004 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026:
027: package org.cougaar.mts.base;
028:
029: import java.util.ArrayList;
030: import java.util.HashMap;
031: import java.util.Iterator;
032:
033: import org.cougaar.core.component.Container;
034: import org.cougaar.core.component.ServiceBroker;
035: import org.cougaar.core.component.ServiceProvider;
036: import org.cougaar.core.mts.MessageAddress;
037: import org.cougaar.mts.std.AttributedMessage;
038: import org.cougaar.util.UnaryPredicate;
039:
040: /**
041: * This Component implements the {@link
042: * DestinationQueueProviderService}, which makes DestinationQueues on
043: * demand, and the {@link DestinationQueueMonitorService}, which
044: * allows clients to be notified of queue events. It also acts the
045: * {@link ServiceProvider} for those services.
046: *
047: * For instantiation of DestinationQueues, it uses the standard
048: * find-or-make approach, where a target address is used for finding.
049: * Since this Component is a subclass of @{link AspectFactory}, aspect
050: * delegates will be attached to {@link DestinationQueue}s when
051: * they're instantiated.
052: *
053: */
054: public class DestinationQueueFactory extends QueueFactory implements
055: DestinationQueueProviderService,
056: DestinationQueueMonitorService, ServiceProvider {
057: private HashMap queues;
058: private ArrayList impls;
059: private Container container;
060:
061: DestinationQueueFactory(Container container) {
062: this .container = container;
063: queues = new HashMap();
064: impls = new ArrayList();
065: }
066:
067: /**
068: * Find a DestinationQueue for the given address, or make a new
069: * one of type DestinationQueueImpl if there isn't one yet. In
070: * the latter case, attach all relevant aspects as part of the
071: * process of creating the queue. The final object returned is
072: * the outermost aspect delegate, or the DestinationQueueImpl itself if
073: * there are no aspects. */
074: public DestinationQueue getDestinationQueue(
075: MessageAddress destination) {
076: MessageAddress dest = destination.getPrimary();
077: DestinationQueue q = null;
078: DestinationQueueImpl qimpl = null;
079: synchronized (queues) {
080: q = (DestinationQueue) queues.get(dest);
081: if (q == null) {
082: qimpl = new DestinationQueueImpl(dest, container);
083: q = (DestinationQueue) attachAspects(qimpl,
084: DestinationQueue.class);
085: qimpl.setDelegate(q);
086: queues.put(dest, q);
087: synchronized (impls) {
088: impls.add(qimpl);
089: }
090: }
091: }
092: return q;
093: }
094:
095: // NB: This does _not_ prevent another thread from adding new
096: // messages while the remove operation is in progress.
097: public void removeMessages(UnaryPredicate pred, ArrayList removed) {
098: ArrayList copy;
099: synchronized (impls) {
100: copy = new ArrayList(impls);
101: }
102: Iterator itr = copy.iterator();
103: while (itr.hasNext()) {
104: MessageQueue queue = (MessageQueue) itr.next();
105: queue.removeMessages(pred, removed);
106: }
107: notifyListeners(removed);
108: }
109:
110: public MessageAddress[] getDestinations() {
111: synchronized (queues) {
112: MessageAddress[] ret = new MessageAddress[queues.size()];
113: queues.keySet().toArray(ret);
114: return ret;
115: }
116: }
117:
118: public AttributedMessage[] snapshotQueue(MessageAddress destination) {
119: DestinationQueue q = null;
120: MessageAddress dest = destination.getPrimary();
121: synchronized (queues) {
122: q = (DestinationQueue) queues.get(dest);
123: }
124: return (q == null ? null : q.snapshot());
125: }
126:
127: public Object getService(ServiceBroker sb, Object requestor,
128: Class serviceClass) {
129: // Restrict this service
130: if (serviceClass == DestinationQueueProviderService.class) {
131: return this ;
132: } else if (serviceClass == DestinationQueueMonitorService.class) {
133: return this ;
134: }
135: return null;
136: }
137:
138: public void releaseService(ServiceBroker sb, Object requestor,
139: Class serviceClass, Object service) {
140: }
141:
142: }
|