0001: /*
0002: * <copyright>
0003: *
0004: * Copyright 1997-2004 BBNT Solutions, LLC
0005: * under sponsorship of the Defense Advanced Research Projects
0006: * Agency (DARPA).
0007: *
0008: * You can redistribute this software and/or modify it under the
0009: * terms of the Cougaar Open Source License as published on the
0010: * Cougaar Open Source Website (www.cougaar.org).
0011: *
0012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
0013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
0014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
0015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
0016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
0017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
0018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
0019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
0020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
0021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
0022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
0023: *
0024: * </copyright>
0025: */
0026:
0027: package org.cougaar.core.blackboard;
0028:
0029: import java.util.ArrayList;
0030: import java.util.Collection;
0031: import java.util.Collections;
0032: import java.util.Enumeration;
0033: import java.util.HashMap;
0034: import java.util.HashSet;
0035: import java.util.Iterator;
0036: import java.util.List;
0037: import java.util.Map;
0038: import java.util.Set;
0039: import java.util.Vector;
0040:
0041: import org.cougaar.bootstrap.SystemProperties;
0042: import org.cougaar.core.agent.Agent;
0043: import org.cougaar.core.agent.service.MessageSwitchService;
0044: import org.cougaar.core.component.ServiceBroker;
0045: import org.cougaar.core.logging.LoggingServiceWithPrefix;
0046: import org.cougaar.core.mts.MessageAddress;
0047: import org.cougaar.core.mts.MessageAttributes;
0048: import org.cougaar.core.persist.BlackboardPersistence;
0049: import org.cougaar.core.persist.Persistence;
0050: import org.cougaar.core.persist.PersistenceException;
0051: import org.cougaar.core.persist.PersistenceNotEnabledException;
0052: import org.cougaar.core.persist.PersistenceObject;
0053: import org.cougaar.core.service.AlarmService;
0054: import org.cougaar.core.service.DomainForBlackboardService;
0055: import org.cougaar.core.service.LoggingService;
0056: import org.cougaar.core.service.ThreadService;
0057: import org.cougaar.core.service.community.CommunityChangeAdapter;
0058: import org.cougaar.core.service.community.CommunityChangeEvent;
0059: import org.cougaar.core.service.community.CommunityService;
0060: import org.cougaar.core.thread.Schedulable;
0061: import org.cougaar.multicast.AttributeBasedAddress;
0062: import org.cougaar.util.UnaryPredicate;
0063: import org.cougaar.util.log.Logger;
0064: import org.cougaar.util.log.Logging;
0065:
0066: /**
0067: * A {@link Subscriber} created by the {@link StandardBlackboard}
0068: * that maintains a view of all published objects, invokes
0069: * {@link org.cougaar.core.domain.Domain} {@link
0070: * org.cougaar.core.domain.LogicProvider}s, and monitors
0071: * community membership changes.
0072: *
0073: * @property org.cougaar.core.agent.savePriorPublisher
0074: * When set to <em>true</em>, will collect extra
0075: * information on each publish to detect problems with multiple adds,
0076: * deletes, etc by complaining
0077: * about unexpected state changes. This adds significant runtime overhead.
0078: * @property org.cougaar.core.agent.enablePublishException
0079: * When set to <em>true</em>, collects stack frames
0080: * for each published object in order to pinpoint both sides of
0081: * publish conflicts. This is <em>extremely</em>
0082: * expensive.
0083: * @property org.cougaar.core.persistence.enable
0084: * When set to <em>true</em> will enable blackboard persistence.
0085: * @property org.cougaar.core.blackboard.waitForNewCommChangeNotifications Time in
0086: * milliseconds to wait for more community changes before asking the community
0087: * service for them. Default is 1,000.
0088: */
0089: public class Blackboard extends Subscriber implements
0090: BlackboardServesDomain, BlackboardClient, PrivilegedClaimant {
0091: protected CollectionSubscription everything;
0092: protected MessageAddress self;
0093: private Distributor myDistributor;
0094: protected ServiceBroker myServiceBroker;
0095: protected AlarmService alarmService;
0096: protected DomainForBlackboardService myDomainService;
0097: protected ThreadService threadS;
0098: protected LoggingService logger;
0099:
0100: public static final String INSERTION_POINT = Agent.INSERTION_POINT
0101: + ".Blackboard";
0102:
0103: public MessageAddress getCID() {
0104: return self;
0105: }
0106:
0107: public static final boolean isSavePriorPublisher = SystemProperties
0108: .getBoolean("org.cougaar.core.agent.savePriorPublisher");
0109: public static final boolean enablePublishException = SystemProperties
0110: .getBoolean("org.cougaar.core.agent.enablePublishException");
0111:
0112: /**
0113: * @property org.cougaar.core.blackboard.pedantic When true (the default) enables a variety
0114: * of extra checks for suspicious blackboard activity. None of these checks are especially
0115: * expensive, so it is generally recommended that pedantic be left enabled.
0116: *
0117: */
0118: public static final boolean PEDANTIC = SystemProperties.getBoolean(
0119: "org.cougaar.core.blackboard.pedantic", true);
0120:
0121: /** the queue of messages to send */
0122: private List sendQueue = new ArrayList();
0123:
0124: // mark the envelopes which we emit so that we can detect them later.
0125: protected Envelope createEnvelope() {
0126: if (isTimestamped()) {
0127: return new TimestampedPlanEnvelopeImpl();
0128: } else {
0129: return new PlanEnvelopeImpl();
0130: }
0131: }
0132:
0133: /**
0134: * Marked Envelope <i>interface</i> so that we can detect envelopes which we've
0135: * emitted.
0136: *
0137: * This isn't an Envelope, since Envelope is a class, but this interface
0138: * will only be applied to the two Envelope subclasses listed below.
0139: */
0140: interface PlanEnvelope {
0141: }
0142:
0143: private static final class PlanEnvelopeImpl extends Envelope
0144: implements PlanEnvelope {
0145: }
0146:
0147: private static final class TimestampedPlanEnvelopeImpl extends
0148: TimestampedEnvelope implements PlanEnvelope {
0149: public boolean isBlackboard() {
0150: return true;
0151: }
0152: }
0153:
0154: /** override to immediately publish deltas rather than delay until transaction close */
0155: protected EnvelopeTuple clientAddedObject(Object o) {
0156: EnvelopeTuple tup = super .clientAddedObject(o);
0157: consumeTuple(tup);
0158: return tup;
0159: }
0160:
0161: /** override to immediately publish deltas rather than delay until transaction close */
0162: protected EnvelopeTuple clientRemovedObject(Object o) {
0163: EnvelopeTuple tup = super .clientRemovedObject(o);
0164: consumeTuple(tup);
0165: return tup;
0166: }
0167:
0168: /** override to immediately publish deltas rather than delay until transaction close */
0169: protected EnvelopeTuple clientChangedObject(Object o, List changes) {
0170: EnvelopeTuple tup = super .clientChangedObject(o, changes);
0171: consumeTuple(tup);
0172: return tup;
0173: }
0174:
0175: /** invoked via client*Object while executing an LP */
0176: private final boolean consumeTuple(EnvelopeTuple tup) {
0177: boolean somethingFired = false;
0178: synchronized (subscriptions) {
0179: for (int i = 0, n = subscriptions.size(); i < n; i++) {
0180: Subscription subscription = (Subscription) subscriptions
0181: .get(i);
0182: somethingFired |= tup.applyToSubscription(subscription,
0183: true);
0184: }
0185: }
0186: // recurses
0187: callLogicProviders(tup, false);
0188: return somethingFired;
0189: }
0190:
0191: /** is the object non-null? */
0192: private static final UnaryPredicate anythingP = new AnythingPredicate();
0193:
0194: private static final class AnythingPredicate implements
0195: UnaryPredicate {
0196: public boolean execute(Object o) {
0197: return (o != null);
0198: }
0199: }
0200:
0201: public Blackboard(MessageSwitchService msgSwitch, ServiceBroker sb,
0202: Object state) {
0203: myServiceBroker = sb;
0204: self = msgSwitch.getMessageAddress();
0205: myDistributor = createDistributor(msgSwitch, state);
0206: setClientDistributor((BlackboardClient) this , myDistributor);
0207: setName("<blackboard>");
0208: logger = (LoggingService) sb.getService(this ,
0209: LoggingService.class, null);
0210: logger = LoggingServiceWithPrefix.add(logger, self + ": ");
0211: myDomainService = (DomainForBlackboardService) sb.getService(
0212: this , DomainForBlackboardService.class, null);
0213: if (myDomainService == null) {
0214: RuntimeException re = new RuntimeException(
0215: "Couldn't get DomainForBlackboardService!");
0216: re.printStackTrace();
0217: throw re;
0218: }
0219: alarmService = (AlarmService) sb.getService(this ,
0220: AlarmService.class, null);
0221: threadS = (ThreadService) sb.getService(this ,
0222: ThreadService.class, null);
0223: }
0224:
0225: public void stop() {
0226: // FIXME: Stop the cacheClearer thread
0227: // This for bug 3704
0228: myDistributor = null;
0229: }
0230:
0231: private static class AllObjectsSet extends HashSet {
0232: Map stacks = createStackMap();
0233:
0234: protected Map createStackMap() {
0235: if (isSavePriorPublisher) {
0236: return new HashMap();
0237: } else {
0238: return null; // Don't keep prior publishing info
0239: }
0240: }
0241:
0242: public AllObjectsSet(int size) {
0243: super (size);
0244: }
0245:
0246: public boolean add(Object o) {
0247: boolean result = super .add(o);
0248: if (!result) {
0249: PublishStack priorStack = null;
0250: if (stacks != null) {
0251: priorStack = (PublishStack) stacks.get(o);
0252: }
0253: throw new PublishException(
0254: "Blackboard.everything.add object already published: "
0255: + o.toString(), priorStack,
0256: stacks != null);
0257: } else if (stacks != null) {
0258: stacks.put(o, new PublishStack("Prior publisher: "));
0259: }
0260: return result;
0261: }
0262:
0263: public boolean remove(Object o) {
0264: boolean result = super .remove(o);
0265: if (!result) {
0266: PublishStack priorStack = null;
0267: if (stacks != null) {
0268: priorStack = (PublishStack) stacks.get(o);
0269: }
0270: throw new PublishException(
0271: "Blackboard.everything.remove object not published: "
0272: + o.toString(), priorStack,
0273: stacks != null);
0274: } else if (stacks != null) {
0275: stacks.put(o, new PublishStack("Prior remover: "));
0276: }
0277: return result;
0278: }
0279: }
0280:
0281: public final void init() {
0282: everything = new CollectionSubscription(anythingP,
0283: enablePublishException ? new AllObjectsSet(111)
0284: : new HashSet(111));
0285: subscribe(everything);
0286: setReadyToPersist();
0287: }
0288:
0289: // Subscription Client interface
0290: public String getBlackboardClientName() {
0291: return getClass().getName();
0292: }
0293:
0294: /**
0295: * Provide a new subscription with its initial fill. Called under
0296: * the protection of the distributor lock so we are guaranteed that
0297: * the allPlanObjects won't change.
0298: */
0299: public void fillSubscription(Subscription subscription) {
0300: if (subscription == everything) {
0301: return; // Don't fill ourselves
0302: }
0303: Envelope envelope = createQueryEnvelope(subscription);
0304: envelope.bulkAddObject(everything.getCollection());
0305: subscription.fill(envelope);
0306: }
0307:
0308: public void fillQuery(Subscription subscription) {
0309: Envelope envelope = createQueryEnvelope(subscription);
0310: envelope.bulkAddObject(everything.getCollection());
0311: subscription.fill(envelope);
0312: }
0313:
0314: private Envelope createQueryEnvelope(Subscription subscription) {
0315: if (isTimestamped()) {
0316: TimestampedEnvelope te = new TimestampedEnvelope();
0317: Subscriber subscriber = subscription.getSubscriber();
0318: if (subscriber != null) {
0319: te.setName(subscriber.getName());
0320: }
0321: long nowTime = System.currentTimeMillis();
0322: te.setTransactionOpenTime(nowTime);
0323: // should we wait until after the query to set the close time?
0324: te.setTransactionCloseTime(nowTime);
0325: return te;
0326: } else {
0327: return new Envelope();
0328: }
0329: }
0330:
0331: /** Alias for sendDirective(aDirective, null); */
0332: public void sendDirective(Directive aDirective) {
0333: if (aDirective == null) {
0334: throw new IllegalArgumentException(
0335: "directive must not be null.");
0336: } else {
0337: sendQueue.add(aDirective);
0338: }
0339: }
0340:
0341: /**
0342: * Submit a directive with attached ChangeReports for transmission
0343: * from this agent. We fill in the ContentsId with the next available number.
0344: */
0345: public void sendDirective(Directive aDirective, Collection c) {
0346: if (aDirective == null) {
0347: throw new IllegalArgumentException(
0348: "directive must not be null.");
0349: } else {
0350: if (c != null && ((Collection) c).size() > 0) {
0351: DirectiveMessage.DirectiveWithChangeReports dd = new DirectiveMessage.DirectiveWithChangeReports(
0352: aDirective, c);
0353: aDirective = dd;
0354: }
0355: sendQueue.add(aDirective);
0356: }
0357: }
0358:
0359: public long currentTimeMillis() {
0360: return alarmService.currentTimeMillis();
0361: }
0362:
0363: /**
0364: * Add Object to the Blackboard Collection
0365: */
0366: public void add(Object o) {
0367: publishAdd(o);
0368: }
0369:
0370: /** Removed Object to the Blackboard Collection */
0371: public void remove(Object o) {
0372: publishRemove(o);
0373: }
0374:
0375: /** Change Object to the Blackboard Collection */
0376: public void change(Object o) {
0377: publishChange(o, null);
0378: }
0379:
0380: public void change(Object o, Collection changes) {
0381: publishChange(o, changes);
0382: }
0383:
0384: public Enumeration searchBlackboard(UnaryPredicate predicate) {
0385: Vector vec = new Vector();
0386:
0387: for (Iterator i = everything.getCollection().iterator(); i
0388: .hasNext();) {
0389: Object o = i.next();
0390: if (predicate.execute(o)) {
0391: vec.addElement(o);
0392: }
0393: }
0394: return vec.elements();
0395: }
0396:
0397: public int countBlackboard(Class cl) {
0398: // could optimize by maintaining an LRU table
0399: int c = 0;
0400: for (Iterator i = everything.getCollection().iterator(); i
0401: .hasNext();) {
0402: Object o = i.next();
0403: if (o != null && cl.isAssignableFrom(o.getClass())) {
0404: c++;
0405: }
0406: }
0407: return c;
0408: }
0409:
0410: public int countBlackboard(UnaryPredicate predicate) {
0411: int c = 0;
0412: for (Iterator i = everything.getCollection().iterator(); i
0413: .hasNext();) {
0414: Object o = i.next();
0415: if (predicate.execute(o)) {
0416: c++;
0417: }
0418: }
0419: return c;
0420: }
0421:
0422: public int getBlackboardSize() {
0423: return everything.size();
0424: }
0425:
0426: /**
0427: * Process incoming directive messages. All messages have been
0428: * blessed by the message manager. The messages are implicitly
0429: * acknowledged by this method. The envelope of published events
0430: * resulting from handling the messages is returned.
0431: */
0432: public final Envelope receiveMessages(List msgs) {
0433: //try {
0434: // startTransaction();
0435: for (Iterator iter = msgs.iterator(); iter.hasNext();) {
0436: DirectiveMessage msg = (DirectiveMessage) iter.next();
0437: applyMessageAgainstLogicProviders(msg);
0438: }
0439:
0440: checkUnpostedChangeReports();
0441: // There really should not be any change tracking subscriptions, at
0442: // least not in the base classes!!! MT
0443: resetSubscriptionChanges(); // clear change tracking subscriptions
0444:
0445: return privateGetPublishedChanges();
0446: //} finally {
0447: // stopTransaction();
0448: //}
0449: }
0450:
0451: private final List oneEnvelope = new ArrayList(1);
0452:
0453: /**
0454: * called by distributor to prepare for "receiveEnvelope(..)" calls.
0455: */
0456: public final void prepareForEnvelopes() {
0457: setTransactionOpenTime();
0458: }
0459:
0460: /**
0461: * Called by transaction close within the thread of Plugins.
0462: * Also called at the end of an LP pseudo-transaction, but
0463: * most of the logic here is disabled in that case.
0464: */
0465: public final Envelope receiveEnvelope(Envelope envelope) {
0466: oneEnvelope.add(envelope);
0467: super .receiveEnvelopes(oneEnvelope, false); // Move to our inbox
0468: oneEnvelope.clear();
0469:
0470: if (!(envelope instanceof PlanEnvelope)) {
0471: // although we aways consume envelopes, we only act on them
0472: // when we didn't generate 'em
0473: privateUpdateSubscriptions();
0474:
0475: try {
0476: boolean isPersistenceEnvelope = envelope instanceof PersistenceEnvelope;
0477: List tuples = envelope.getRawDeltas();
0478: int l = tuples.size();
0479: for (int i = 0; i < l; i++) {
0480: try {
0481: callLogicProviders((EnvelopeTuple) tuples
0482: .get(i), isPersistenceEnvelope);
0483: } catch (Exception e) {
0484: System.err.println("Caught " + e
0485: + " while running logic providers.");
0486: e.printStackTrace();
0487: }
0488: }
0489: } finally {
0490: // clear subscriptions deltas, just in case.
0491: resetSubscriptionChanges();
0492: }
0493: }
0494:
0495: return privateGetPublishedChanges();
0496: }
0497:
0498: private static class DestinationKey {
0499: public MessageAddress cid;
0500: public MessageAttributes attrs;
0501: private int hc;
0502:
0503: public DestinationKey(MessageAddress cid,
0504: MessageAttributes attrs) {
0505: this .cid = cid;
0506: this .attrs = attrs;
0507: hc = cid.hashCode() + attrs.hashCode();
0508: }
0509:
0510: public int hashCode() {
0511: return hc;
0512: }
0513:
0514: public boolean equals(Object o) {
0515: if (o instanceof DestinationKey) {
0516: DestinationKey that = (DestinationKey) o;
0517: return this .cid.equals(that.cid)
0518: && this .attrs.equals(that.attrs);
0519: }
0520: return false;
0521: }
0522: }
0523:
0524: private MessageAddress getDirectiveDestinationOfKey(Object key) {
0525: if (key instanceof MessageAddress) {
0526: return (MessageAddress) key;
0527: } else {
0528: DestinationKey dkey = (DestinationKey) key;
0529: return dkey.cid;
0530: }
0531: }
0532:
0533: private Object getDirectiveKeyOfDestination(MessageAddress dest) {
0534: MessageAttributes attrs = dest.getMessageAttributes();
0535: if (attrs == null)
0536: return dest;
0537: return new DestinationKey(dest, attrs);
0538: }
0539:
0540: /*
0541: * Builds up hashmap of arrays of directives for each agent, <code>MessageAddress</code>.
0542: * Modified to handle destinations of <code>AttributeBasedAddress</code>es, so that these are
0543: * sent properly as well.
0544: */
0545: public void appendMessagesToSend(List messages) {
0546: HashMap directivesByDestination = new HashMap(89);
0547:
0548: // FIXME - prefill cache of aba roles to addresses here, instead of building up a cache
0549: // fillCache();
0550:
0551: for (Iterator iter = sendQueue.iterator(); iter.hasNext();) {
0552: Directive dir = (Directive) iter.next();
0553: MessageAddress dest = dir.getDestination();
0554:
0555: // get all destinations
0556:
0557: /**
0558: * If dest is an ABA, get all agent_names from cache or
0559: * nameserver and fills in the hashmap of directives
0560: * Short and easy way to handle ABA destinations
0561: */
0562: ArrayList dirs;
0563:
0564: if (dest instanceof AttributeBasedAddress) {
0565: //System.out.println("-------BLACKBOARD ENCOUNTERED ABA-----");
0566: MessageAttributes qosAttributes = dest
0567: .getMessageAttributes();
0568: Collection agents = getABAAddresses((AttributeBasedAddress) dest); // List of CIs
0569: // for all destinations, add a new directive array and insert a new directive, or add to
0570: // an existing array in the destinations hashmap
0571: for (Iterator i = agents.iterator(); i.hasNext();) {
0572: MessageAddress agentAddress = (MessageAddress) i
0573: .next();
0574: if (qosAttributes != null) {
0575: agentAddress = MessageAddress
0576: .getMessageAddress(
0577: (MessageAddress) agentAddress,
0578: qosAttributes);
0579: }
0580: Object key = getDirectiveKeyOfDestination(agentAddress);
0581: dirs = (ArrayList) directivesByDestination.get(key);
0582: if (dirs == null) {
0583: dirs = new ArrayList(1);
0584: directivesByDestination.put(key, dirs);
0585: }
0586: dirs.add(dir);
0587: }
0588: } // done with aba handling
0589:
0590: /**
0591: * dest is regular address so proceed as before
0592: */
0593: else {
0594: Object key = getDirectiveKeyOfDestination(dest);
0595: dirs = (ArrayList) directivesByDestination.get(key);
0596: if (dirs == null) {
0597: dirs = new ArrayList(1);
0598: directivesByDestination.put(key, dirs);
0599: }
0600: dirs.add(dir);
0601: }
0602: }
0603: /**
0604: * By now directivesByDestination only has ArrayLists of MessageAddresss,
0605: * so we can set their directives as before.
0606: */
0607: for (Iterator iter = directivesByDestination.entrySet()
0608: .iterator(); iter.hasNext();) {
0609: Map.Entry entry = (Map.Entry) iter.next();
0610: MessageAddress tmpci = getDirectiveDestinationOfKey(entry
0611: .getKey());
0612: ArrayList dirs = (ArrayList) entry.getValue();
0613: int size = dirs.size();
0614: if (size > 0) {
0615: Directive[] directives = (Directive[]) dirs
0616: .toArray(new Directive[size]);
0617: DirectiveMessage ndm = new DirectiveMessage(directives);
0618: ndm.setDestination(tmpci);
0619: ndm.setSource(self);
0620: messages.add(ndm);
0621: dirs.clear();
0622: }
0623: }
0624: directivesByDestination.clear(); // maybe help gc a bit.
0625: sendQueue.clear();
0626: }
0627:
0628: public void restart(MessageAddress cid) {
0629: myDomainService.invokeRestartLogicProviders(cid);
0630: }
0631:
0632: private void applyMessageAgainstLogicProviders(DirectiveMessage m) {
0633: myDomainService.invokeMessageLogicProviders(m);
0634: }
0635:
0636: /**
0637: * called by receiveEnvelope (on behalf of a plugin) and
0638: * consumeTuple (on behalf of an LP).
0639: */
0640: private void callLogicProviders(EnvelopeTuple obj,
0641: boolean isPersistenceEnvelope) {
0642: if (!isPersistenceEnvelope) {
0643: handleActiveSubscriptionObjects(obj);
0644: }
0645: myDomainService.invokeEnvelopeLogicProviders(obj,
0646: isPersistenceEnvelope);
0647: }
0648:
0649: private void handleActiveSubscriptionObjects(EnvelopeTuple tup) {
0650: if (ActiveSubscriptionObject.deferCommit) {
0651: Object o = tup.getObject();
0652: if (o instanceof ActiveSubscriptionObject) {
0653: ActiveSubscriptionObject aso = (ActiveSubscriptionObject) o;
0654: try {
0655: if (tup.isAdd()) {
0656: aso.addingToBlackboard(this , true);
0657: } else if (tup.isChange()) {
0658: aso.changingInBlackboard(this , true);
0659: } else if (tup.isRemove()) {
0660: aso.removingFromBlackboard(this , true);
0661: } // else ignore: bulk and event are uneffected by ASOs
0662: } catch (BlackboardException be) {
0663: logger.error(
0664: "Deferred ActiveSubscriptionObject action could"
0665: + " not be vetoed", be);
0666: }
0667: }
0668: }
0669: }
0670:
0671: /**
0672: * If {@link ActiveSubscrptionObject#deferCommit} is enabled, this
0673: * class tracks objects that would be modified by an {@link
0674: * ActiveSubscriptionObject} at publish time and logs a warning
0675: * if they are accessed before the transaction close time.
0676: * <p>
0677: * For details, see bug 3663.
0678: */
0679: private final static ObjectTracker tracker = new ObjectTracker();
0680:
0681: public static final ObjectTracker getTracker() {
0682: return tracker;
0683: }
0684:
0685: public static class ObjectTracker {
0686: private final static Logger log = Logging
0687: .getLogger(ObjectTracker.class);
0688: private static final Set globalSet = new HashSet(11);
0689: private final ThreadLocal localSet = new ThreadLocal() {
0690: protected synchronized Object initialValue() {
0691: return new HashSet(11);
0692: }
0693: };
0694:
0695: public Set getLocalSet() {
0696: return (Set) (localSet.get());
0697: }
0698:
0699: public void checkpoint(boolean commit, Object ob, Object a) {
0700: if (ActiveSubscriptionObject.deferCommit) {
0701: // short circuit if we aren't actually tracking ASO gaps
0702: if (commit) {
0703: resolve(ob, a);
0704: } else {
0705: track(ob, a);
0706: }
0707: }
0708: }
0709:
0710: private void track(Object ob, Object a) {
0711: Object o = new Traversal(ob, a);
0712: if (log.isDebugEnabled())
0713: log.debug("Tracking " + o);
0714: synchronized (globalSet) {
0715: globalSet.add(o);
0716: }
0717: getLocalSet().add(o);
0718: }
0719:
0720: private void resolve(Object ob, Object a) {
0721: Object o = new Traversal(ob, a);
0722: if (log.isDebugEnabled())
0723: log.debug("Resolving " + o);
0724: synchronized (globalSet) {
0725: globalSet.remove(o);
0726: }
0727: getLocalSet().remove(o);
0728: }
0729:
0730: public void clearLocalSet() {
0731: if (ActiveSubscriptionObject.deferCommit) {
0732: // short circuit if we aren't actually tracking ASO gaps
0733: getLocalSet().clear();
0734: }
0735: }
0736:
0737: public void checkAccess(Object ob, Object a) {
0738: if (ActiveSubscriptionObject.deferCommit) {
0739: // short circuit if we aren't actually tracking ASO gaps
0740: Object o = new Traversal(ob, a);
0741: if (log.isDebugEnabled())
0742: log.debug("Checking " + o);
0743: boolean locP = getLocalSet().contains(o);
0744: if (locP) {
0745: log.warn(
0746: "Local access of uncommitted ActiveSubscriptionObject"
0747: + " data " + o, new Throwable());
0748: } else {
0749: if (log.isDebugEnabled()) {
0750: boolean gloP;
0751: synchronized (globalSet) {
0752: gloP = globalSet.contains(o);
0753: }
0754: if (gloP) {
0755: log.debug(
0756: "Global access of uncommitted ActiveSubscriptionObject"
0757: + " data " + o,
0758: new Throwable());
0759: }
0760: }
0761: }
0762: }
0763: }
0764: }
0765:
0766: private static class Traversal {
0767: private final Object o;
0768: private final Object a;
0769:
0770: public Traversal(Object o, Object a) {
0771: this .o = o;
0772: this .a = a;
0773: }
0774:
0775: public boolean equals(Object thing) {
0776: if (thing instanceof Traversal) {
0777: return o.equals(((Traversal) thing).o)
0778: && a.equals(((Traversal) thing).a);
0779: } else {
0780: return false;
0781: }
0782: }
0783:
0784: public int hashCode() {
0785: return o.hashCode();
0786: } /*don't bother to spread 'em out*/
0787:
0788: public String toString() {
0789: return "Traversal(" + a + ") " + o;
0790: }
0791: }
0792:
0793: public PublishHistory getHistory() {
0794: return myDistributor.history;
0795: }
0796:
0797: protected Envelope executeDelayedLPActions() {
0798: myDomainService.invokeDelayedLPActions();
0799: return privateGetPublishedChanges();
0800: }
0801:
0802: public PersistenceObject getPersistenceObject()
0803: throws PersistenceNotEnabledException {
0804: return myDistributor.getPersistenceObject();
0805: }
0806:
0807: /** Ensure that all the domains know that this is THE blackboard */
0808: protected void connectDomains() {
0809: myDomainService.setBlackboard(this );
0810: setReadyToPersist();
0811: }
0812:
0813: //
0814: // Distributor
0815: //
0816: private Distributor createDistributor(
0817: MessageSwitchService msgSwitch, Object state) {
0818: Distributor d = new Distributor(this , myServiceBroker, self
0819: .getAddress());
0820: Persistence persistence = createPersistence();
0821: boolean lazyPersistence = SystemProperties.getBoolean(
0822: "org.cougaar.core.persistence.lazy", true);
0823: d.setPersistence(persistence, lazyPersistence);
0824: d.start(msgSwitch, state); // msgSwitch, state
0825:
0826: return d;
0827: }
0828:
0829: public Distributor getDistributor() {
0830: return myDistributor;
0831: }
0832:
0833: protected Persistence createPersistence() {
0834: try {
0835: return BlackboardPersistence.find(myServiceBroker);
0836: } catch (PersistenceException e) {
0837: e.printStackTrace();
0838: }
0839: return null;
0840: }
0841:
0842: // -------- Methods for ABA Handling Below -------- needs work //
0843:
0844: // AttributeBasedAddress to ABATranslation cache
0845: private Map cache = new HashMap(89);
0846:
0847: private CacheClearer cacheClearer = new CacheClearer();
0848: private Object cacheClearerLock = new Object();
0849:
0850: private static class ABATranslationImpl implements ABATranslation {
0851: Collection old, current;
0852:
0853: ABATranslationImpl(Collection current) {
0854: this .current = current;
0855: }
0856:
0857: public Collection getOldTranslation() {
0858: return old;
0859: }
0860:
0861: public Collection getCurrentTranslation() {
0862: return current;
0863: }
0864:
0865: void setCurrentTranslation(Collection newCurrentTranslation) {
0866: current = newCurrentTranslation;
0867: }
0868:
0869: void setOldTranslation(Collection newOldTranslation) {
0870: old = newOldTranslation;
0871: }
0872:
0873: boolean isEmpty() {
0874: return old == null && current == null;
0875: }
0876: }
0877:
0878: private class MyCommunityChangeListener extends
0879: CommunityChangeAdapter {
0880: public void communityChanged(CommunityChangeEvent e) {
0881: if (logger.isDebugEnabled())
0882: logger.debug(e.toString());
0883: clearCache(e.getCommunityName());
0884: }
0885: }
0886:
0887: /*
0888: * Loops through the cache of ABAs and returns MessageAddresss,
0889: * else it querries the nameserver for all agents with the ABA's role attribute,
0890: * and builds the cache.
0891: * @return list (copy) of the addresses of the agents matching the ABA
0892: */
0893: public Collection getABAAddresses(AttributeBasedAddress aba) {
0894: // first look in cache
0895: Collection matches = null;
0896: synchronized (cache) {
0897: ABATranslation abaTranslation = (ABATranslation) cache
0898: .get(aba);
0899: if (abaTranslation != null) {
0900: matches = abaTranslation.getCurrentTranslation();
0901: }
0902: }
0903: if (matches == null) {
0904: // Not in cache. Get it the hard way from community service
0905: matches = lookupABA(aba);
0906: if (logger.isDebugEnabled()) {
0907: logger.debug("lookupABA: " + aba + "->" + matches);
0908: }
0909: matches = Collections.unmodifiableCollection(matches);
0910: synchronized (cache) {
0911: ABATranslationImpl abaTranslation = (ABATranslationImpl) cache
0912: .get(aba);
0913: if (abaTranslation == null) {
0914: abaTranslation = new ABATranslationImpl(matches);
0915: cache.put(aba, abaTranslation);
0916: } else {
0917: abaTranslation.setCurrentTranslation(matches);
0918: }
0919: }
0920: }
0921: return matches; // matches is unmodifiable - no need to copy it
0922: }
0923:
0924: // get the CommunityService when possible
0925: private CommunityService _myCommunityService = null;
0926:
0927: private CommunityService getCommunityService() {
0928: if (_myCommunityService != null) {
0929: return _myCommunityService;
0930: } else {
0931: _myCommunityService = (CommunityService) myServiceBroker
0932: .getService(this , CommunityService.class, null);
0933: if (_myCommunityService == null) {
0934: logger
0935: .warn(
0936: "Warning: Blackboard had no CommunityService -"
0937: + " will fall back to dynamic service lookup."
0938: + " Risk of Deadlock!",
0939: new Throwable());
0940: }
0941: _myCommunityService
0942: .addListener(new MyCommunityChangeListener());
0943: return _myCommunityService;
0944: }
0945: }
0946:
0947: /*
0948: * Queries NameServer and gets a collection MessageAddresss of
0949: * all agents having the attribute type and value specified by the
0950: * ABA and stores the collection in the ABA cache. Returns a List
0951: * (copy) of the addresses found.
0952: * @return list (copy) of the addresses of the agents matching the ABA
0953: */
0954: private Collection lookupABA(AttributeBasedAddress aba) {
0955: CommunityService cs = getCommunityService();
0956: String communitySpec = getCommunitySpec(aba);
0957: String roleValue = aba.getAttributeValue();
0958: String roleName = aba.getAttributeType();
0959: String filter = "(" + roleName + "=" + roleValue + ")";
0960:
0961: if (cs == null) {
0962: return Collections.EMPTY_SET;
0963: }
0964: Collection matches = cs.search(communitySpec, filter);
0965: // MIK - do we really need to copy this?. No, but we're also filtering for MAs
0966: List cis = new ArrayList(matches.size());
0967: for (Iterator i = matches.iterator(); i.hasNext();) {
0968: Object o = i.next();
0969: if (o instanceof MessageAddress) {
0970: cis.add(o);
0971: }
0972: }
0973: return cis;
0974: }
0975:
0976: private void clearCache(String communityName) {
0977: synchronized (cacheClearerLock) {
0978: if (cacheClearer == null) {
0979: cacheClearer = new CacheClearer();
0980: }
0981: }
0982: cacheClearer.add(communityName);
0983: }
0984:
0985: private class CacheClearer implements Runnable {
0986: private Set changedCommunities = new HashSet();
0987: private Schedulable thread;
0988:
0989: // The delay time spent waiting for additional community
0990: // change notifications to arrive before processing
0991: // all such notifications. A trade-off between
0992: // time for ABA translations to be updated
0993: // and time for the local Comm Service to update
0994: // its cache from the NameService
0995: // This is in milliseconds
0996: private long waitForNewCommChangeNotifications = SystemProperties
0997: .getLong(
0998: "org.cougaar.core.blackboard.waitForNewCommChangeNotifications",
0999: 1000L);
1000:
1001: private void reschedule() {
1002: if (thread != null) {
1003: thread.schedule(waitForNewCommChangeNotifications);
1004: }
1005: }
1006:
1007: public synchronized void add(String communityName) {
1008: changedCommunities.add(communityName);
1009: if (thread == null) {
1010: thread = threadS.getThread(this , this ,
1011: "ABA Cache Clearer");
1012: }
1013: reschedule();
1014: }
1015:
1016: public void run() {
1017: Set changes = new HashSet(11);
1018: synchronized (this ) {
1019: if (changedCommunities.size() == 0) {
1020: // exit without rescheduling
1021: return;
1022: } else {
1023: changes.addAll(changedCommunities);
1024: changedCommunities.clear();
1025: } // end of synch block
1026: }
1027:
1028: if (myDistributor == null) {
1029: // Blackboard was stopped?
1030: if (logger != null && logger.isInfoEnabled())
1031: logger
1032: .info("ABA Cache clearer dropping received changes cause"
1033: + " Distributor is null -- assuming Blackboard is stopping");
1034: thread = null;
1035: return;
1036: }
1037:
1038: // Process the community changes
1039: myDistributor.invokeABAChangeLPs(changes);
1040: changes.clear();
1041: reschedule(); // take another pass in a bit in case something else comes in
1042: }
1043: }
1044:
1045: /**
1046: * Tell all the ABA interested LPs about the new
1047: * community memberships, using the local cache of ABA translations.
1048: */
1049: public void invokeABAChangeLPs(Set communities) {
1050: synchronized (cache) {
1051: for (Iterator i = cache.values().iterator(); i.hasNext();) {
1052: ABATranslationImpl at = (ABATranslationImpl) i.next();
1053: at.setOldTranslation(at.getCurrentTranslation());
1054: at.setCurrentTranslation(null); // Filled in when needed
1055: }
1056: myDomainService.invokeABAChangeLogicProviders(communities);
1057: for (Iterator i = cache.values().iterator(); i.hasNext();) {
1058: ABATranslationImpl at = (ABATranslationImpl) i.next();
1059: at.setOldTranslation(null);
1060: if (at.isEmpty())
1061: i.remove();
1062: }
1063: }
1064: }
1065:
1066: public ABATranslation getABATranslation(AttributeBasedAddress aba) {
1067: synchronized (cache) {
1068: ABATranslationImpl ret = (ABATranslationImpl) cache
1069: .get(aba);
1070: if (ret == null)
1071: return null;
1072: if (ret.getOldTranslation() == null)
1073: return null;
1074: if (ret.getCurrentTranslation() == null) {
1075: ret.setCurrentTranslation(lookupABA(aba));
1076: }
1077: return ret;
1078: }
1079: }
1080:
1081: // Stub - should be replaced when we figure out semantics for
1082: // community name spec in the aba.d
1083: protected String getCommunitySpec(AttributeBasedAddress aba) {
1084: String abaComm = aba.getCommunityName();
1085:
1086: if ((abaComm == null) || (abaComm.equals(""))
1087: || (abaComm.equals("*"))) {
1088: return "";
1089: } else {
1090: return abaComm;
1091: }
1092: }
1093: }
|