001: /*
002: * <copyright>
003: *
004: * Copyright 2001-2004 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026:
027: package org.cougaar.core.plugin.freeze;
028:
029: import java.util.Collection;
030: import java.util.Collections;
031: import java.util.HashSet;
032: import java.util.Iterator;
033: import java.util.Set;
034:
035: import org.cougaar.core.blackboard.IncrementalSubscription;
036: import org.cougaar.core.component.ServiceBroker;
037: import org.cougaar.core.plugin.PluginBase;
038: import org.cougaar.core.service.ThreadControlService;
039: import org.cougaar.core.service.ThreadListenerService;
040: import org.cougaar.core.thread.Schedulable;
041: import org.cougaar.core.thread.ThreadListener;
042: import org.cougaar.util.UnaryPredicate;
043:
044: /**
045: * This component implements the actual freezing of an agent. Freezing is
046: * accomplished by preventing the ThreadService from running certain
047: * classes of components. The relevant object is the so-called
048: * "consumer" of the ThreadService. For plugins, this is the plugin
049: * itself. For other uses of the ThreadService, the "consumer" may be
050: * different.
051: * <p>
052: * Generally, all plugins except those involved in the freeze process
053: * are prevented from running, but this can be modified by rules
054: * specified as plugin parameters. The rules are applied in this
055: * order:
056: * <pre>
057: * "allow " + FreezePlugin.class.getName()
058: * first plugin parameter
059: * second plugin parameter
060: * etc.
061: * "deny " + PluginBase.class.getName()
062: * </pre>
063: * <p>
064: * The form of the rule is one of the words, "deny" or "allow",
065: * followed by a space followed by the name of the class or interface
066: * that should be affected by the rule. The rule matches if it is
067: * legal to assign the consumer to a variable of the type named in the
068: * rule. This includes the class of the consumer itself, all
069: * interfaces implemented by the consumer or their superinterfaces,
070: * all superclasses of the consumer, and all interfaces implemented by
071: * any superclass or their superinterfaces.
072: * <p>
073: * The first rule is built-in and cannot be overridden. It allows all
074: * the freeze plugins to run while frozen. This is obviously necessary
075: * to handle thawing a frozen society. The last rule is always added
076: * and prevents all plugins that extend PluginBase from running except
077: * those allowed by preceding rules. While it is possible to write a
078: * component that behaves as a plugin but does not extend PluginBase,
079: * this does not happen in practice.
080: * <p>
081: * The effect of this final rule can be nullified by including rules
082: * (as plugin parameters) that specifically allow individual plugins.
083: * Indeed, the whole class of plugins extending PluginBase could be
084: * allowed. It is possible to prevent anything from being frozen in an
085: * agent by making the first plugin parameter be "allow
086: * java.lang.Object". Since every class extends java.lang.Object, this
087: * will allow every class to run.
088: * <p>
089: * NOTE: This is part of the older mechanism for freezing the society. The
090: * current mechanism uses FreezeServlet located on every agent in the society,
091: * and depends on some external process to tell all agents to freeze. This older
092: * mechanism has not been removed so that people can continue to use a single servlet
093: * to freeze the entire society, but the FreezeServlet mechanism is preferred now.
094: */
095: public class FreezeTargetPlugin extends FreezePlugin implements
096: ThreadListener {
097: private static class BadGuy {
098: private Thread thread;
099: private Schedulable schedulable;
100: int hc;
101:
102: public BadGuy(Schedulable s, Thread t) {
103: thread = t;
104: schedulable = s;
105: hc = System.identityHashCode(t)
106: + System.identityHashCode(s);
107: }
108:
109: public int hashCode() {
110: return hc;
111: }
112:
113: public boolean equals(Object o) {
114: if (o == this )
115: return true;
116: if (o instanceof BadGuy) {
117: BadGuy that = (BadGuy) o;
118: return this .thread == that.thread
119: && this .schedulable == that.schedulable;
120: }
121: return false;
122: }
123:
124: public String toString() {
125: return schedulable.getState() + ": "
126: + schedulable.getConsumer().toString();
127: }
128: }
129:
130: private IncrementalSubscription relaySubscription;
131: // True if we have frozen this agent.
132: private boolean isFrozen = false;
133: private boolean isFreezing = false;
134: private ThreadListenerService threadListenerService;
135: private ThreadControlService threadControlService;
136: private Rules rules = new Rules();
137: private Set badGuys = new HashSet(); // Records the bad guys we have
138:
139: // seen enter the run state
140: // that have not left the run
141: // state
142:
143: public void unload() {
144: if (threadControlService != null) {
145: ServiceBroker sb = getServiceBroker();
146: sb.releaseService(this , ThreadListenerService.class,
147: threadListenerService);
148: sb.releaseService(this , ThreadControlService.class,
149: threadControlService);
150: }
151: super .unload();
152: }
153:
154: private UnaryPredicate myThreadQualifier = new UnaryPredicate() {
155: public boolean execute(Object o) {
156: Schedulable schedulable = (Schedulable) o;
157: Object consumer = schedulable.getConsumer();
158: return rules.allow(consumer);
159: }
160: };
161:
162: // Thread control logic. Threads are classified as good or bad. When
163: // frozen, we regulate the max running thread count to be no more
164: // than the number of goodguys that are on the runnable queue. The
165: // number of goodguys is the total of the known good guys (in the
166: // goodGuys set) and the anonymous ones. We have to assume that any
167: // thread we have never seen is a good guy. If an anonymous good guy
168: // steps off the stage we will recognize him and reduce the
169: // anonymousGoodGuys count.
170: public synchronized void threadQueued(Schedulable schedulable,
171: Object consumer) {
172: }
173:
174: public synchronized void threadDequeued(Schedulable schedulable,
175: Object consumer) {
176: }
177:
178: public synchronized void threadStarted(Schedulable schedulable,
179: Object consumer) {
180: if (logger.isDetailEnabled())
181: logger.detail("threadStarted: " + consumer);
182: if (!rules.allow(consumer)) {
183: badGuys
184: .add(new BadGuy(schedulable, Thread.currentThread()));
185: }
186: }
187:
188: public synchronized void threadStopped(Schedulable schedulable,
189: Object consumer) {
190: if (logger.isDetailEnabled())
191: logger.detail("threadStopped: " + consumer);
192: if (!rules.allow(consumer)) {
193: Thread currentThread = Thread.currentThread();
194: badGuys.remove(new BadGuy(schedulable, currentThread));
195: }
196: }
197:
198: public void rightGiven(String consumer) {
199: }
200:
201: public void rightReturned(String consumer) {
202: }
203:
204: private void setThreadLimit() {
205: threadControlService.setQualifier(myThreadQualifier);
206: }
207:
208: private void unsetThreadLimit() {
209: threadControlService.setQualifier(null);
210: }
211:
212: public void setupSubscriptions() {
213: super .setupSubscriptions();
214: rules.addAllowRule(FreezePlugin.class);
215: // Hope this is a List cause order is important.
216: Collection params = getParameters();
217: for (Iterator i = params.iterator(); i.hasNext();) {
218: String ruleSpec = (String) i.next();
219: try {
220: rules.addRule(ruleSpec);
221: } catch (Exception e) {
222: logger.error("Bad parameter: " + ruleSpec, e);
223: }
224: }
225: rules.addDenyRule(PluginBase.class);
226: if (logger.isInfoEnabled())
227: logger.info("rules=" + rules);
228: ServiceBroker sb = getServiceBroker();
229: threadControlService = (ThreadControlService) sb.getService(
230: this , ThreadControlService.class, null);
231: threadListenerService = (ThreadListenerService) sb.getService(
232: this , ThreadListenerService.class, null);
233: threadListenerService.addListener(this );
234: relaySubscription = (IncrementalSubscription) blackboard
235: .subscribe(targetRelayPredicate);
236: }
237:
238: public void execute() {
239: if (timerExpired()) {
240: cancelTimer();
241: if (isFreezing)
242: checkStopped();
243: }
244: if (relaySubscription.hasChanged()) {
245: if (relaySubscription.isEmpty()) {
246: if (logger.isDebugEnabled()) {
247: logger.debug(relaySubscription
248: .getRemovedCollection().size()
249: + " removes");
250: }
251: if (isFrozen) {
252: if (logger.isDebugEnabled())
253: logger.debug("thawed");
254: unsetThreadLimit(); // Unset thread limit
255: isFrozen = false;
256: }
257: } else {
258: if (!isFrozen) {
259: if (logger.isDebugEnabled())
260: logger.debug("freezing");
261: setThreadLimit();
262: isFrozen = true;
263: isFreezing = true;
264: checkStopped();
265: }
266: }
267: }
268: }
269:
270: private synchronized void checkStopped() {
271: int stillRunning = badGuys.size();
272: Set unfrozenAgents;
273: if (stillRunning <= 0) {
274: if (logger.isDebugEnabled()) {
275: logger.debug("Frozen");
276: isFreezing = false;
277: }
278: unfrozenAgents = Collections.EMPTY_SET;
279: } else {
280: if (logger.isDebugEnabled()) {
281: Set consumerSet = new HashSet();
282: for (Iterator i = badGuys.iterator(); i.hasNext();) {
283: BadGuy bg = (BadGuy) i.next();
284: consumerSet.add(bg.toString());
285: }
286: logger.debug("Still running: " + consumerSet);
287: }
288: unfrozenAgents = Collections
289: .singleton(getAgentIdentifier());
290: resetTimer(5000);
291: }
292: for (Iterator i = relaySubscription.iterator(); i.hasNext();) {
293: FreezeRelayTarget relay = (FreezeRelayTarget) i.next();
294: relay.setUnfrozenAgents(unfrozenAgents);
295: blackboard.publishChange(relay);
296: }
297: }
298: }
|