001: /*
002: * <copyright>
003: *
004: * Copyright 1997-2004 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026:
027: package org.cougaar.mts.std;
028:
029: import java.io.FileInputStream;
030: import java.util.Comparator;
031: import java.util.HashMap;
032: import java.util.Iterator;
033: import java.util.Map;
034: import java.util.Properties;
035:
036: import org.cougaar.bootstrap.SystemProperties;
037: import org.cougaar.core.component.ServiceBroker;
038: import org.cougaar.core.mts.MessageAddress;
039: import org.cougaar.core.service.ThreadControlService;
040: import org.cougaar.core.service.ThreadListenerService;
041: import org.cougaar.core.thread.Schedulable;
042: import org.cougaar.core.thread.ThreadListener;
043: import org.cougaar.mts.base.DestinationQueue;
044: import org.cougaar.mts.base.StandardAspect;
045:
046: /**
047: * Not an aspect, but the aspect mechanism provides a simple way to
048: * load classes on demand. This is actually a ThreadListener which
049: * alters the ThreadService's queuing behavior.
050: */
051: public class PrioritizedThreadsAspect extends StandardAspect implements
052: ThreadListener {
053:
054: private HashMap agent_priorities;
055: private HashMap thread_priorities;
056:
057: private Comparator priorityComparator = new Comparator() {
058: public boolean equals(Object x) {
059: return x == this ;
060: }
061:
062: public int compare(Object o1, Object o2) {
063: Object x = thread_priorities.get(o1);
064: Object y = thread_priorities.get(o2);
065:
066: // Entries placed on the queue before our listener
067: // starts won't be found by the map. Deal with
068: // that here.
069: if (x == null && y == null)
070: return 0;
071: if (x == null)
072: return -1;
073: if (y == null)
074: return 1;
075:
076: // Higher priority should precede lower, so reverse
077: // the arguments.
078: return ((Comparable) y).compareTo(x);
079: }
080: };
081:
082: public void load() {
083: super .load();
084:
085: thread_priorities = new HashMap();
086: agent_priorities = new HashMap();
087: Properties p = new Properties();
088: String priorities_file = SystemProperties
089: .getProperty("org.cougaar.lib.quo.priorities");
090: if (priorities_file != null) {
091: try {
092: FileInputStream fis = new FileInputStream(
093: priorities_file);
094: p.load(fis);
095: fis.close();
096: } catch (java.io.IOException ex) {
097: System.err.println(ex);
098: }
099: }
100:
101: Iterator itr = p.entrySet().iterator();
102: while (itr.hasNext()) {
103: Map.Entry entry = (Map.Entry) itr.next();
104: Object key = entry.getKey();
105: String priority = (String) entry.getValue();
106: if (priority != null) {
107: agent_priorities.put(key, new Integer(Integer
108: .parseInt(priority)));
109: }
110: }
111:
112: ServiceBroker sb = getServiceBroker();
113: ThreadListenerService listenerService = (ThreadListenerService) sb
114: .getService(this , ThreadListenerService.class, null);
115: ThreadControlService controlService = (ThreadControlService) sb
116: .getService(this , ThreadControlService.class, null);
117:
118: // Whack the queue
119: listenerService.addListener(this );
120: controlService.setQueueComparator(priorityComparator);
121:
122: // we should release the services now.
123:
124: }
125:
126: // MessageTransportAspect
127: public Object getDelegate(Object delegatee, Class type) {
128: // not a real aspect, so no delegates
129: return null;
130: }
131:
132: // ThreadListener
133: public void threadQueued(Schedulable thread, Object consumer) {
134: if (consumer instanceof DestinationQueue) {
135: // Note the thread's priority just before it goes on the
136: // queue. The comparator will use this later.
137: DestinationQueue q = (DestinationQueue) consumer;
138: MessageAddress address = q.getDestination();
139: Object priority = agent_priorities.get(address.toString());
140: thread_priorities.put(thread, priority);
141: }
142: }
143:
144: public void threadDequeued(Schedulable thread, Object consumer) {
145: }
146:
147: public void threadStarted(Schedulable thread, Object consumer) {
148: }
149:
150: public void threadStopped(Schedulable thread, Object consumer) {
151: }
152:
153: public void rightGiven(String consumer) {
154: }
155:
156: public void rightReturned(String consumer) {
157: }
158:
159: }
|