001: /*
002: * <copyright>
003: *
004: * Copyright 1997-2004 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026:
027: package org.cougaar.core.blackboard;
028:
029: import java.io.BufferedReader;
030: import java.io.FileNotFoundException;
031: import java.io.InputStreamReader;
032: import java.io.Serializable;
033: import java.util.ArrayList;
034: import java.util.HashMap;
035: import java.util.List;
036: import java.util.Map;
037: import java.util.regex.Pattern;
038:
039: import org.cougaar.core.mts.MessageAddress;
040: import org.cougaar.core.mts.MessageAttributes;
041: import org.cougaar.core.service.QuiescenceReportService;
042: import org.cougaar.util.ConfigFinder;
043: import org.cougaar.util.log.Logger;
044:
045: /**
046: * The QuiescenceMonitor is used by the {@link Distributor} to
047: * determine if an agent is quiescent.
048: * <p>
049: * The QM tracks which blackboard clients we care about quiescence
050: * for, and numbers incoming and outgoing messsages for those we care
051: * about. The QM uses a ConfigFinder accessed text file to identify
052: * which BlackboardClients to count in the quiescence calculation.
053: * Typically this excludes all infrastructure components, and includes
054: * only application-internal components.
055: */
056: class QuiescenceMonitor {
057: private static final String CONFIG_FILE = "quiescencemonitor.dat";
058: private static final String[] defaultExcludedClients = { "exclude .*" };
059:
060: private static final String finalExclusion = "exclude .*";
061:
062: private QuiescenceReportService quiescenceReportService;
063: private Logger logger;
064: private boolean isQuiescent = false;
065: private String messageNumbersChangedFor = null;
066:
067: private static class State implements Serializable {
068: State(Map imn, Map omn, boolean isQ) {
069: outgoingMessageNumbers = omn;
070: incomingMessageNumbers = imn;
071: isQuiescent = isQ;
072: }
073:
074: protected Map outgoingMessageNumbers;
075: protected Map incomingMessageNumbers;
076: protected boolean isQuiescent;
077: }
078:
079: private Map outgoingMessageNumbers = new HashMap();
080: private Map incomingMessageNumbers = new HashMap();
081: private int messageNumberCounter;
082: private Map checkedClients = new HashMap();
083: private List exclusions = new ArrayList();
084:
085: QuiescenceMonitor(QuiescenceReportService qrs, Logger logger) {
086: this .logger = logger;
087: quiescenceReportService = qrs;
088: initMessageNumberCounter();
089: try {
090: BufferedReader is = new BufferedReader(
091: new InputStreamReader(ConfigFinder.getInstance()
092: .open(CONFIG_FILE)));
093: try {
094: String line;
095: while ((line = is.readLine()) != null) {
096: int hash = line.indexOf('#');
097: if (hash >= 0) {
098: line = line.substring(0, hash);
099: }
100: line = line.trim();
101: if (line.length() > 0) {
102: addExclusion(line);
103: }
104: }
105: addExclusion(finalExclusion);
106: } finally {
107: is.close();
108: }
109: } catch (FileNotFoundException e) {
110: if (logger.isInfoEnabled()) {
111: logger.info("File not found: " + e.getMessage()
112: + ". Using defaults");
113: }
114: installDefaultExclusions();
115: } catch (Exception e) {
116: logger.error("Error parsing " + CONFIG_FILE
117: + ". Using defaults", e);
118: installDefaultExclusions();
119: }
120: }
121:
122: void setState(Object newState) {
123: State state = (State) newState;
124: incomingMessageNumbers = state.incomingMessageNumbers;
125: outgoingMessageNumbers = state.outgoingMessageNumbers;
126: isQuiescent = state.isQuiescent;
127: messageNumbersChangedFor = "setState";
128: setSubscribersAreQuiescent(isQuiescent);
129: }
130:
131: Object getState() {
132: return new State(incomingMessageNumbers,
133: outgoingMessageNumbers, isQuiescent);
134: }
135:
136: private void initMessageNumberCounter() {
137: messageNumberCounter = (int) System.currentTimeMillis();
138: nextMessageNumber();
139: }
140:
141: private int nextMessageNumber() {
142: if (++messageNumberCounter == 0)
143: messageNumberCounter++;
144: return messageNumberCounter;
145: }
146:
147: private void installDefaultExclusions() {
148: exclusions.clear();
149: for (int i = 0; i < defaultExcludedClients.length; i++) {
150: addExclusion(defaultExcludedClients[i]);
151: }
152: }
153:
154: private void addExclusion(String line) {
155: exclusions.add(new Exclusion(line));
156: }
157:
158: // Is quiescence required for this blackboard client (by name)
159: // Note that exclusions typically end in .*, so if this is really
160: // a PersistenceSubscriberState.getKey which has extra stuff at the end,
161: // this will still match
162: boolean isQuiescenceRequired(String clientName) {
163: Boolean required = (Boolean) checkedClients.get(clientName);
164: if (required == null) {
165: required = Boolean.TRUE;
166: loop: for (int i = 0, n = exclusions.size(); i < n; i++) {
167: Exclusion p = (Exclusion) exclusions.get(i);
168: switch (p.match(clientName)) {
169: case Exclusion.EXCLUDE:
170: required = Boolean.FALSE;
171: break loop;
172: case Exclusion.INCLUDE:
173: required = Boolean.TRUE;
174: break loop;
175: default:
176: continue loop;
177: }
178: }
179: if (logger.isInfoEnabled()) {
180: logger.info("isQuiescenceRequired(" + clientName + ")="
181: + required);
182: }
183: checkedClients.put(clientName, required);
184: }
185: return required.booleanValue();
186: }
187:
188: // Is quiescence required for this blackboard client. Will use the client name
189: boolean isQuiescenceRequired(BlackboardClient client) {
190: String clientName = client.getBlackboardClientName();
191: return isQuiescenceRequired(clientName);
192: }
193:
194: synchronized void setSubscribersAreQuiescent(
195: boolean subscribersAreQuiescent) {
196: if (subscribersAreQuiescent) {
197: if (messageNumbersChangedFor != null) {
198: if (logger.isDebugEnabled()) {
199: logger.debug("updateMessageNumbers because of "
200: + messageNumbersChangedFor);
201: }
202: quiescenceReportService.setMessageNumbers(
203: outgoingMessageNumbers, incomingMessageNumbers);
204: messageNumbersChangedFor = null;
205: }
206: if (!isQuiescent) {
207: isQuiescent = true;
208: if (logger.isDebugEnabled()) {
209: logger.debug("setQuiescentState");
210: }
211: quiescenceReportService.setQuiescentState();
212: } else {
213: if (logger.isDebugEnabled()) {
214: logger.debug("Still quiescent");
215: }
216: }
217: } else {
218: if (isQuiescent) {
219: isQuiescent = false;
220: if (messageNumbersChangedFor != null) {
221: if (logger.isDebugEnabled()) {
222: logger
223: .debug("clearQuiescentState: messageNumbersChangedFor "
224: + messageNumbersChangedFor);
225: }
226: } else {
227: if (logger.isDebugEnabled()) {
228: logger
229: .debug("clearQuiescentState: subscribers active");
230: }
231: }
232: quiescenceReportService.clearQuiescentState();
233: }
234: }
235: }
236:
237: synchronized boolean numberIncomingMessage(DirectiveMessage msg) {
238: MessageAddress src = msg.getSource();
239: src = src.getPrimary(); // Strip any attributes
240: int messageNumber = msg.getContentsId();
241: if (messageNumber == 0)
242: return false; // Message from plugin not required for quiescence
243: Integer last = (Integer) incomingMessageNumbers.put(src,
244: new Integer(messageNumber));
245: if (logger.isDebugEnabled()) {
246: MessageAttributes ma = msg.getSource()
247: .getMessageAttributes();
248: logger.debug("Numbered incoming message from "
249: + src
250: + ": "
251: + msg
252: + " with number "
253: + messageNumber
254: + ", previous messageNumber was "
255: + last
256: + (ma != null ? (", attributes: " + ma
257: .getAttributesAsString()) : ""));
258: }
259: messageNumbersChangedFor = src.toString();
260: return true;
261: }
262:
263: synchronized void numberOutgoingMessage(DirectiveMessage msg) {
264: MessageAddress dst = msg.getDestination();
265: dst = dst.getPrimary(); // Strip any attributes
266: int messageNumber = nextMessageNumber();
267: msg.setContentsId(messageNumber);
268: Integer last = (Integer) outgoingMessageNumbers.put(dst,
269: new Integer(messageNumber));
270: if (logger.isDebugEnabled()) {
271: MessageAttributes ma = msg.getDestination()
272: .getMessageAttributes();
273: logger.debug("Numbered outgoing message to "
274: + dst
275: + ": "
276: + msg
277: + " with number "
278: + messageNumber
279: + ", previous messageNumber was "
280: + last
281: + (ma != null ? (", attributes: " + ma
282: .getAttributesAsString()) : ""));
283: }
284: messageNumbersChangedFor = dst.toString();
285: }
286:
287: private static class Exclusion {
288: private static final int EXCLUDE = 0;
289: private static final int INCLUDE = 1;
290: private static final int DONT_KNOW = -1;
291: private static final String EXCLUDE_PREFIX = "exclude ";
292: private static final String INCLUDE_PREFIX = "include ";
293: private int matchCode;
294: private Pattern p;
295:
296: public Exclusion(String line) {
297: if (line.startsWith(EXCLUDE_PREFIX)) {
298: matchCode = EXCLUDE;
299: p = Pattern.compile(line.substring(EXCLUDE_PREFIX
300: .length()));
301: } else if (line.startsWith(INCLUDE_PREFIX)) {
302: matchCode = INCLUDE;
303: p = Pattern.compile(line.substring(INCLUDE_PREFIX
304: .length()));
305: } else {
306: throw new IllegalArgumentException("Parse error: "
307: + line);
308: }
309: }
310:
311: public int match(String clientName) {
312: if (p.matcher(clientName).matches()) {
313: return matchCode;
314: }
315: return DONT_KNOW;
316: }
317: }
318: }
|