0001: /*
0002: * <copyright>
0003: *
0004: * Copyright 1997-2007 BBNT Solutions, LLC
0005: * under sponsorship of the Defense Advanced Research Projects
0006: * Agency (DARPA).
0007: *
0008: * You can redistribute this software and/or modify it under the
0009: * terms of the Cougaar Open Source License as published on the
0010: * Cougaar Open Source Website (www.cougaar.org).
0011: *
0012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
0013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
0014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
0015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
0016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
0017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
0018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
0019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
0020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
0021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
0022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
0023: *
0024: * </copyright>
0025: */
0026:
0027: package org.cougaar.core.blackboard;
0028:
0029: import java.util.ArrayList;
0030: import java.util.Collection;
0031: import java.util.Enumeration;
0032: import java.util.HashSet;
0033: import java.util.Iterator;
0034: import java.util.List;
0035: import java.util.Map;
0036:
0037: import org.cougaar.bootstrap.SystemProperties;
0038: import org.cougaar.core.persist.Persistence;
0039: import org.cougaar.core.persist.PersistenceNotEnabledException;
0040: import org.cougaar.util.LockFlag;
0041: import org.cougaar.util.UnaryPredicate;
0042: import org.cougaar.util.log.Logger;
0043: import org.cougaar.util.log.Logging;
0044: import org.cougaar.util.CallerTracker;
0045:
0046: /**
0047: * The standard implementation of the {@link
0048: * org.cougaar.core.service.BlackboardService}.
0049: *
0050: * @property org.cougaar.core.blackboard.enforceTransactions
0051: * Set to <em>false</em> to disable checking for clients
0052: * of BlackboardService publishing changes to the blackboard outside
0053: * of a transaction.
0054: *
0055: * @property org.cougaar.core.blackboard.debug
0056: * Set to true to additional checking on blackboard transactions.
0057: * For instance, it will attempt to look for changes to blackboard
0058: * objects which have not been published at transaction close time.
0059: *
0060: * @note Although Subscriber directly implements all the methods of
0061: * BlackboardService, it declines to implement the interface to avoid
0062: * the Subscriber class itself <em>and all extending classes</em> from
0063: * being Services.
0064: *
0065: * @property org.cougaar.core.blackboard.timestamp
0066: * Set to true to enable EnvelopeMetrics and TimestampSubscriptions
0067: * (defaults to false).
0068: *
0069: * @property org.cougaar.core.blackboard.trackPublishers
0070: * Set to true to enable PublisherSubscriptions
0071: * (defaults to false).
0072: */
0073: public class Subscriber {
0074: private static final Logger logger = Logging
0075: .getLogger(Subscriber.class);
0076:
0077: private static final boolean isEnforcing = SystemProperties
0078: .getBoolean(
0079: "org.cougaar.core.blackboard.enforceTransactions",
0080: true);
0081:
0082: private static final boolean warnUnpublishChanges = SystemProperties
0083: .getBoolean("org.cougaar.core.blackboard.debug");
0084:
0085: private static final boolean enableTimestamps = SystemProperties
0086: .getBoolean("org.cougaar.core.blackboard.timestamp")
0087: || SystemProperties
0088: .getBoolean("org.cougaar.core.blackboard.trackPublishers");
0089:
0090: private BlackboardClient theClient = null;
0091: private Distributor theDistributor = null;
0092: private String subscriberName = "";
0093: private boolean shouldBePersisted = true;
0094: private boolean firstTransactionComplete = false;
0095:
0096: protected Subscriber() {
0097: }
0098:
0099: /**
0100: * Create a subscriber that provides subscription services
0101: * to a client and send outgoing messages to a Distributor.
0102: * Plugin clients will use this API.
0103: */
0104: public Subscriber(BlackboardClient client, Distributor distributor) {
0105: this (client, distributor, ((client != null) ? client
0106: .getBlackboardClientName() : null));
0107: }
0108:
0109: public Subscriber(BlackboardClient client, Distributor distributor,
0110: String subscriberName) {
0111: setClientDistributor(client, distributor);
0112: setName(subscriberName);
0113: }
0114:
0115: public void setClientDistributor(BlackboardClient client,
0116: Distributor newDistributor) {
0117: theClient = client;
0118: if (theDistributor != newDistributor) {
0119: if (theDistributor != null) {
0120: theDistributor.unregisterSubscriber(this );
0121: }
0122: theDistributor = newDistributor;
0123: if (theDistributor != null) {
0124: theDistributor.registerSubscriber(this );
0125: }
0126: }
0127: }
0128:
0129: public void setName(String newName) {
0130: subscriberName = newName;
0131: }
0132:
0133: public String getName() {
0134: return subscriberName;
0135: }
0136:
0137: public boolean shouldBePersisted() {
0138: return shouldBePersisted;
0139: }
0140:
0141: public void setShouldBePersisted(boolean shouldBePersisted) {
0142: this .shouldBePersisted = shouldBePersisted;
0143: }
0144:
0145: boolean isReadyToPersist() {
0146: return firstTransactionComplete;
0147: }
0148:
0149: public void setReadyToPersist() {
0150: theDistributor.discardRehydrationInfo(this );
0151: firstTransactionComplete = true;
0152: }
0153:
0154: public boolean didRehydrate() {
0155: boolean result = theDistributor.didRehydrate(this );
0156: return result;
0157: }
0158:
0159: public void persistNow() throws PersistenceNotEnabledException {
0160: boolean inTransaction = (transactionLock.getBusyFlagOwner() == Thread
0161: .currentThread());
0162: if (inTransaction)
0163: closeTransaction();
0164: theDistributor.persistNow();
0165: if (inTransaction)
0166: openTransaction();
0167: }
0168:
0169: public Persistence getPersistence() {
0170: return theDistributor.getPersistence();
0171: }
0172:
0173: /**
0174: * Move inboxes into subscriptions.
0175: */
0176: protected boolean privateUpdateSubscriptions() {
0177: boolean changedp = false;
0178: synchronized (subscriptions) {
0179: transactionAllowsQuiescence = inboxAllowsQuiescence;
0180: transactionEnvelopes = flushInbox();
0181: try {
0182: for (int i = 0, n = subscriptions.size(); i < n; i++) {
0183: Subscription subscription = (Subscription) subscriptions
0184: .get(i);
0185: for (int j = 0, l = transactionEnvelopes.size(); j < l; j++) {
0186: Envelope envelope = (Envelope) transactionEnvelopes
0187: .get(j);
0188: try {
0189: changedp |= subscription.apply(envelope);
0190: } catch (PublishException pe) {
0191: Logger logger = Logging
0192: .getLogger(Subscriber.class);
0193: String message = pe.getMessage();
0194: logger.error(message);
0195:
0196: BlackboardClient currentClient = null;
0197: // if (envelope instanceof OutboxEnvelope) {
0198: // OutboxEnvelope e = (OutboxEnvelope) envelope;
0199: // currentClient = e.theClient;
0200: // }
0201: if (currentClient == null) {
0202: currentClient = BlackboardClient.current
0203: .getClient();
0204: }
0205: String this Publisher = null;
0206: if (currentClient != null) {
0207: this Publisher = currentClient
0208: .getBlackboardClientName();
0209: }
0210: if (envelope instanceof Blackboard.PlanEnvelope) {
0211: if (this Publisher == null) {
0212: this Publisher = "Blackboard";
0213: } else {
0214: this Publisher = "Blackboard after "
0215: + this Publisher;
0216: }
0217: } else if (this Publisher == null) {
0218: this Publisher = "Unknown";
0219: }
0220: pe.printStackTrace(" This publisher: "
0221: + this Publisher);
0222: if (!pe.priorStackUnavailable) {
0223: if (pe.priorStack == null) {
0224: System.err
0225: .println("Prior publisher: Unknown");
0226: }
0227: } else {
0228: if (pe.priorStack == null) {
0229: System.err
0230: .println("Prior publisher: Not set");
0231: } else {
0232: pe.priorStack.printStackTrace();
0233: }
0234: }
0235: } catch (RuntimeException ire) {
0236: BlackboardClient currentClient = null;
0237: if (currentClient == null) {
0238: currentClient = BlackboardClient.current
0239: .getClient();
0240: }
0241: String this Publisher = null;
0242: if (currentClient != null) {
0243: this Publisher = currentClient
0244: .getBlackboardClientName();
0245: }
0246: logger.error(
0247: "Exception while applying envelopes in "
0248: + currentClient + "/"
0249: + this Publisher, ire);
0250: }
0251: }
0252: }
0253: } catch (RuntimeException re) {
0254: re.printStackTrace();
0255: }
0256: }
0257: return changedp;
0258: }
0259:
0260: /**
0261: * Report changes that the plugin published.
0262: * These changes are represented by the outbox.
0263: */
0264: protected Envelope privateGetPublishedChanges() {
0265: Envelope box = flushOutbox();
0266: if (transactionEnvelopes != null) {
0267: recycleInbox(transactionEnvelopes);
0268: transactionEnvelopes = null;
0269: transactionAllowsQuiescence = true;
0270: } else {
0271: recycleInbox(flushInbox());
0272: }
0273: if (enableTimestamps && (box instanceof TimestampedEnvelope)) {
0274: TimestampedEnvelope te = (TimestampedEnvelope) box;
0275: te.setName(getName());
0276: te.setTransactionOpenTime(openTime);
0277: te.setTransactionCloseTime(System.currentTimeMillis());
0278: }
0279: return box;
0280: }
0281:
0282: /**
0283: * Accessors to persist our inbox state
0284: */
0285: public List getTransactionEnvelopes() {
0286: return transactionEnvelopes;
0287: }
0288:
0289: public List getPendingEnvelopes() {
0290: return pendingEnvelopes;
0291: }
0292:
0293: //////////////////////////////////////////////////////
0294: // Subscriptions //
0295: //////////////////////////////////////////////////////
0296: private int publishAddedCount;
0297: private int publishChangedCount;
0298: private int publishRemovedCount;
0299:
0300: // now unused
0301: public int getSubscriptionCount() {
0302: // synchronized (subscriptions) {}
0303: return subscriptions.size();
0304: }
0305:
0306: // unused?
0307: public int getSubscriptionSize() {
0308: int size = 0;
0309: // synchronized (subscriptions) {}
0310: int l = subscriptions.size();
0311: for (int i = 0; i < l; i++) {
0312: Object s = subscriptions.get(i);
0313: if (s instanceof CollectionSubscription) {
0314: size += ((CollectionSubscription) s).size();
0315: }
0316: }
0317: return size;
0318: }
0319:
0320: public int getPublishAddedCount() {
0321: return publishAddedCount;
0322: }
0323:
0324: public int getPublishChangedCount() {
0325: return publishChangedCount;
0326: }
0327:
0328: public int getPublishRemovedCount() {
0329: return publishRemovedCount;
0330: }
0331:
0332: /** our set of active subscriptions. Access must be synchronized on self. */
0333: protected final List subscriptions = new ArrayList(5);
0334:
0335: protected void resetSubscriptionChanges() {
0336: synchronized (subscriptions) {
0337: int l = subscriptions.size();
0338: for (int i = 0; i < l; i++) {
0339: Subscription s = (Subscription) subscriptions.get(i);
0340: s.resetChanges();
0341: }
0342: resetHaveCollectionsChanged();
0343: }
0344: }
0345:
0346: /**
0347: * Subscribe to a collection service with isMember, default inner
0348: * collection and supporting incremental change queries.
0349: * @note Although allowed, use of DynamicUnaryPredicate can be extremely expensive
0350: * and tends to create as many problems as it solves. When in pedantic mode,
0351: * warning are emitted when DynamicUnaryPredicate is used. Disable Blackboard.PEDANTIC to quiet
0352: * such warnings if you are sure you want to do this.
0353: * @see Blackboard#PEDANTIC
0354: */
0355: public Subscription subscribe(UnaryPredicate isMember) {
0356: return subscribe(isMember, null, true);
0357: }
0358:
0359: /**
0360: * Subscribe to a collection service with isMember, default inner
0361: * collection and specifying if you want incremental change query support.
0362: * @note Although allowed, use of DynamicUnaryPredicate can be extremely expensive
0363: * and tends to create as many problems as it solves. When in pedantic mode,
0364: * warning are emitted when DynamicUnaryPredicate is used. Disable Blackboard.PEDANTIC to quiet
0365: * such warnings if you are sure you want to do this.
0366: * @see Blackboard#PEDANTIC
0367: */
0368: public Subscription subscribe(UnaryPredicate isMember,
0369: boolean isIncremental) {
0370: return subscribe(isMember, null, isIncremental);
0371: }
0372:
0373: /**
0374: * Subscribe to a collection service with isMember, specifying inner
0375: * collection and supporting incremental change queries.
0376: * @note Although allowed, use of DynamicUnaryPredicate can be extremely expensive
0377: * and tends to create as many problems as it solves. When in pedantic mode,
0378: * warning are emitted when DynamicUnaryPredicate is used. Disable Blackboard.PEDANTIC to quiet
0379: * such warnings if you are sure you want to do this.
0380: * @see Blackboard#PEDANTIC
0381: */
0382: public Subscription subscribe(UnaryPredicate isMember,
0383: Collection realCollection) {
0384: return subscribe(isMember, realCollection, true);
0385: }
0386:
0387: /**
0388: * Subscribe to a collection service.
0389: * Tells the Distributor about its interest, but should not block,
0390: * even if there are lots of "back issues" to transmit.
0391: * This is the full form.
0392: * @param isMember The defining predicate for the slice of the blackboard.
0393: * @param realCollection The real container wrapped by the returned value.
0394: * @param isIncremental IFF true, returns a container that supports delta
0395: * lists.
0396: * @return The resulting Subscription
0397: * @see IncrementalSubscription
0398: * @note Although allowed, use of DynamicUnaryPredicate can be extremely expensive
0399: * and tends to create as many problems as it solves. When in pedantic mode,
0400: * warning are emitted when DynamicUnaryPredicate is used. Disable Blackboard.PEDANTIC to quiet
0401: * such warnings if you are sure you want to do this.
0402: * @see Blackboard#PEDANTIC
0403: */
0404: public Subscription subscribe(UnaryPredicate isMember,
0405: Collection realCollection, boolean isIncremental) {
0406: Subscription sn;
0407:
0408: if (realCollection == null)
0409: realCollection = new HashSet();
0410:
0411: if (isIncremental) {
0412: sn = new IncrementalSubscription(isMember, realCollection);
0413: } else {
0414: sn = new CollectionSubscription(isMember, realCollection);
0415: }
0416: return subscribe(sn);
0417: }
0418:
0419: /** Primary subscribe method. Register a new subscription. */
0420: public final Subscription subscribe(Subscription subscription) {
0421: // Strictly speaking, subscribe can be done outside a transaction, but the
0422: // state of filled subscription w/rt the rest of the subscriptions
0423: // is suspect if it isn't.
0424: checkTransactionOK("subscribe()");
0425:
0426: synchronized (subscriptions) {
0427: subscription.setSubscriber(this );
0428: subscriptions.add(subscription);
0429: theDistributor.fillSubscription(subscription);
0430: }
0431: setHaveNewSubscriptions(); // make sure we get counted.
0432: return subscription;
0433: }
0434:
0435: /** lightweight query of Blackboard */
0436: public final Collection query(UnaryPredicate isMember) {
0437: checkTransactionOK("query(UnaryPredicate)");
0438: QuerySubscription s = new QuerySubscription(isMember);
0439: s.setSubscriber(this ); // shouldn't really be needed
0440: theDistributor.fillQuery(s);
0441: return s.getCollection();
0442: }
0443:
0444: private static final CallerTracker pTracker = CallerTracker
0445: .getShallowTracker(2);
0446:
0447: final void checkTransactionOK(String methodname, Object arg) {
0448: if (this instanceof Blackboard)
0449: return; // No check for Blackboard
0450:
0451: if (Blackboard.PEDANTIC && arg instanceof Collection
0452: && pTracker.isNew()) {
0453: if (logger.isWarnEnabled())
0454: logger.warn("PEDANTIC: A Collection published by "
0455: + theClient, new Throwable());
0456: }
0457:
0458: if (!isMyTransaction()) {
0459: if (arg != null) {
0460: methodname = methodname + "(" + arg + ")";
0461: }
0462: logger
0463: .error(toString() + "." + methodname
0464: + " called outside of transaction",
0465: new Throwable());
0466: //throw new RuntimeException(methodname+" called outside of transaction boundaries");
0467: }
0468: }
0469:
0470: final void checkTransactionOK(String methodname) {
0471: checkTransactionOK(methodname, null);
0472: }
0473:
0474: /**
0475: * Stop subscribing to a previously obtained Subscription. The
0476: * Subscription must have been returned from a previous call to
0477: * subscribe.
0478: * @param subscription the Subscription that is to be cancelled.
0479: */
0480: public void unsubscribe(Subscription subscription) {
0481: // strictly speaking, this doesn't have to be done inside a transaction, but
0482: // we'll check anyway to be symmetric with subscribe.
0483: checkTransactionOK("unsubscribe()");
0484: synchronized (subscriptions) {
0485: subscriptions.remove(subscription);
0486: }
0487: }
0488:
0489: /*
0490: * Inbox invariants:
0491: * pendingEnvelopes accumulates new envelopes for the next transaction (always).
0492: * transactionEnvelopes has the previous pendingEnvelopes during a
0493: * transaction, null otherwise.
0494: * idleEnvelopes has an empty list when no transaction is active.
0495: *
0496: * The list cycle around from idle to pending to transaction back to
0497: * idle. idle and transaction are never null at the same time; one
0498: * of them always has the list the pendingEnvelopes does not have
0499: *
0500: */
0501: private List pendingEnvelopes = new ArrayList(); // Envelopes to be added at next transaction
0502: private List transactionEnvelopes = null; // Envelopes of current transaction
0503: private List idleEnvelopes = new ArrayList(); // Alternate list
0504: private final Object inboxLock = new Object(); // For synchronized access to inboxes
0505: private boolean inboxAllowsQuiescence = true; // True if inbox allows quiescence
0506: private boolean transactionAllowsQuiescence = true; // True if inbox being processed allowed quiescence.
0507:
0508: /**
0509: * Called by non-client methods to add an envelope to our inboxes.
0510: * This is complicated because we wish to avoid holding envelopes
0511: * when there is no possibility of their ever being used (no
0512: * subscriptions). A simple test of the number of subscriptions is
0513: * insufficient because, if a transaction is open, new subscriptions
0514: * may be created that, in later transactions, need to receive the
0515: * envelopes. So the test includes a test of transactions being
0516: * open. We use transactionLock.tryGetBusyFlag() because we can't
0517: * block and the fact that the lock is busy, is a sufficient
0518: * indication that we must put the new envelopes into the inbox. It
0519: * may turn out that the inbox did not need to be stuffed (because
0520: * there will not be any subscriptions), but this is handled when
0521: * the transaction is closed where the inbox is emptied if there are
0522: * no subscriptions.
0523: */
0524: public void receiveEnvelopes(List envelopes,
0525: boolean envelopeQuiescenceRequired) {
0526: boolean signalActivity = false;
0527: synchronized (inboxLock) {
0528: boolean notBusy = transactionLock.tryGetBusyFlag();
0529: // if notBusy, then the client isn't running (and wont) until we're done.
0530: // if !notBusy, then the client IS running so we need to dump the envelopes
0531: // in regardless (because it might add a watcher or subscription
0532: boolean hasWatchers;
0533: synchronized (watchers) {
0534: hasWatchers = !watchers.isEmpty();
0535: }
0536: boolean hasSubscriptions = !subscriptions.isEmpty();
0537: if (hasSubscriptions || (hasWatchers && !notBusy)) {
0538: pendingEnvelopes.addAll(envelopes);
0539: if (envelopeQuiescenceRequired) {
0540: inboxAllowsQuiescence = false;
0541: }
0542: signalActivity = true;
0543: } else {
0544: if (logger.isInfoEnabled() && !hasSubscriptions
0545: && !notBusy && !hasWatchers) {
0546: logger
0547: .info(this
0548: + ".receiveEnvs: Fix for bug 3328 means"
0549: + " we're not distributing the outbox here cause no watchers.");
0550: }
0551: }
0552: if (notBusy)
0553: transactionLock.freeBusyFlag();
0554: }
0555: if (signalActivity)
0556: signalExternalActivity();
0557: }
0558:
0559: public boolean isBusy() {
0560: synchronized (inboxLock) {
0561: return (pendingEnvelopes.size() > 0);
0562: }
0563: }
0564:
0565: public boolean isQuiescent() {
0566: synchronized (inboxLock) {
0567: return (inboxAllowsQuiescence && transactionAllowsQuiescence);
0568: }
0569: }
0570:
0571: private List flushInbox() {
0572: synchronized (inboxLock) {
0573: List result = pendingEnvelopes;
0574: pendingEnvelopes = idleEnvelopes;
0575: idleEnvelopes = null;
0576: inboxAllowsQuiescence = true;
0577: return result;
0578: }
0579: }
0580:
0581: private void recycleInbox(List old) {
0582: old.clear();
0583: idleEnvelopes = old;
0584: }
0585:
0586: /**
0587: * outbox data structure - an Envelope used to encapsulate
0588: * outgoing changes to collections.
0589: */
0590: private Envelope outbox = createEnvelope();
0591:
0592: protected Envelope flushOutbox() {
0593: if (outbox.size() == 0)
0594: return null;
0595: Envelope result = outbox;
0596: outbox = createEnvelope();
0597: return result;
0598: }
0599:
0600: // This won't work with persistence turned on. Don't _ever_ use operationally (ray)
0601: // public static class OutboxEnvelope extends Envelope {
0602: // public OutboxEnvelope(BlackboardClient client) {
0603: // theClient = client;
0604: // }
0605: // public BlackboardClient theClient;
0606: // }
0607:
0608: /** factory method for creating Envelopes of the correct type */
0609: protected Envelope createEnvelope() {
0610: if (enableTimestamps) {
0611: return new TimestampedEnvelope();
0612: } else {
0613: return new Envelope();
0614: }
0615: // return new OutboxEnvelope(getClient()); // for debugging
0616: }
0617:
0618: // might want to make the syncs finer-grained
0619: /**
0620: * called whenever the client adds an object to a collection
0621: * to notify the rest of the world of the change.
0622: * Actual Changes to the collection only happen via this api.
0623: */
0624: protected EnvelopeTuple clientAddedObject(Object o) {
0625: // attempt to claim the object
0626: claimObject(o);
0627:
0628: return outbox.addObject(o);
0629: }
0630:
0631: /**
0632: * called whenever the client removes an object from a collection
0633: * to notify the rest of the world of the change.
0634: * Actual Changes to the collection only happen via this api.
0635: */
0636: protected EnvelopeTuple clientRemovedObject(Object o) {
0637: // attempt to unclaim the object
0638: unclaimObject(o);
0639:
0640: return outbox.removeObject(o);
0641: }
0642:
0643: /**
0644: * called whenever the client changes an object in a collection
0645: * to notify the rest of the world of the change.
0646: * Actual Changes to the collection only happen via this api.
0647: */
0648: protected EnvelopeTuple clientChangedObject(Object o, List changes) {
0649: return outbox.changeObject(o, changes);
0650: }
0651:
0652: /**
0653: * Add an object to the blackboard.
0654: * <p>
0655: * Behavior is not defined if the object was already on the blackboard.
0656: * @note Although strictly allowed, it takes special care to properly publish a
0657: * raw Collection object to the Blackboard. Disable Blackboard.PEDANTIC to quiet
0658: * such warnings if you are sure you want to do this.
0659: * @see Blackboard#PEDANTIC
0660: */
0661: public final void publishAdd(Object o) {
0662: checkTransactionOK("add", o);
0663:
0664: if (theDistributor.history != null)
0665: theDistributor.history.publishAdd(o);
0666: if (o instanceof ActiveSubscriptionObject) {
0667: ((ActiveSubscriptionObject) o).addingToBlackboard(this ,
0668: false);
0669: if (!ActiveSubscriptionObject.deferCommit) {
0670: ((ActiveSubscriptionObject) o).addingToBlackboard(this ,
0671: true);
0672: }
0673: }
0674:
0675: if (o instanceof Publishable) {
0676: //List crs = // var unused
0677: Transaction.getCurrentTransaction().getChangeReports(o); // side effects
0678: }
0679:
0680: // if we made it this far publish the object and return true.
0681: clientAddedObject(o);
0682: publishAddedCount++;
0683: }
0684:
0685: /**
0686: * Remove an object from the blackboard.
0687: * <p>
0688: * Behavior is not defined if the object was not already on the blackboard.
0689: *
0690: * @note Although strictly allowed, it takes special care to properly publish a
0691: * raw Collection object to the Blackboard. Disable Blackboard.PEDANTIC to quiet
0692: * such warnings if you are sure you want to do this.
0693: * @see Blackboard#PEDANTIC
0694: */
0695: public final void publishRemove(Object o) {
0696: checkTransactionOK("remove", o);
0697:
0698: if (theDistributor.history != null)
0699: theDistributor.history.publishRemove(o);
0700: if (o instanceof ActiveSubscriptionObject) {
0701: ((ActiveSubscriptionObject) o).removingFromBlackboard(this ,
0702: false);
0703: if (!ActiveSubscriptionObject.deferCommit) {
0704: ((ActiveSubscriptionObject) o).removingFromBlackboard(
0705: this , true);
0706: }
0707: }
0708:
0709: if (o instanceof Publishable) {
0710: List crs = Transaction.getCurrentTransaction()
0711: .getChangeReports(o);
0712: if (warnUnpublishChanges) {
0713: if (crs != null && crs.size() > 0) {
0714: if (logger.isWarnEnabled())
0715: logger
0716: .warn("Warning: publishRemove("
0717: + o
0718: + ") is dropping outstanding changes:\n\t"
0719: + crs);
0720: }
0721: }
0722: }
0723:
0724: clientRemovedObject(o);
0725: publishRemovedCount++;
0726: }
0727:
0728: /**
0729: * Convenience function for publishChange(o, null).
0730: * @note Although strictly allowed, it takes special care to properly publish a
0731: * raw Collection object to the Blackboard. Disable Blackboard.PEDANTIC to quiet
0732: * such warnings if you are sure you want to do this.
0733: * @see Blackboard#PEDANTIC
0734: */
0735: public final void publishChange(Object o) {
0736: publishChange(o, null);
0737: }
0738:
0739: /**
0740: * Mark an object on the blackboard as changed.
0741: * <p>
0742: * Behavior is not defined if the object is not on the blackboard.
0743: * <p>
0744: * There is no need to call this if the object was added or removed,
0745: * only if the contents of the object itself has been changed.
0746: * The changes parameter describes a set of changes made to the
0747: * object beyond those tracked automatically by the object class
0748: * (see the object class documentation for a description of which
0749: * types of changes are tracked). Any additional changes are
0750: * merged in <em>after</em> automatically collected reports.
0751: * @param changes a set of ChangeReport instances or null.
0752: * @note Although strictly allowed, it takes special care to properly publish a
0753: * raw Collection object to the Blackboard. Disable Blackboard.PEDANTIC to quiet
0754: * such warnings if you are sure you want to do this.
0755: * @see Blackboard#PEDANTIC
0756: */
0757: public final void publishChange(Object o, Collection changes) {
0758: checkTransactionOK("change", o);
0759:
0760: if (theDistributor.history != null)
0761: theDistributor.history.publishChange(o);
0762: if (o instanceof ActiveSubscriptionObject) {
0763: ((ActiveSubscriptionObject) o).changingInBlackboard(this ,
0764: false);
0765: if (!ActiveSubscriptionObject.deferCommit) {
0766: ((ActiveSubscriptionObject) o).changingInBlackboard(
0767: this , true);
0768: }
0769: }
0770:
0771: List crs = null;
0772: if (o instanceof Publishable) {
0773: crs = Transaction.getCurrentTransaction().getChangeReports(
0774: o);
0775: }
0776:
0777: // convert null or empty changes to the "anonymous" list
0778: if (isZeroChanges(changes)) {
0779: if (isZeroChanges(crs)) {
0780: crs = AnonymousChangeReport.LIST;
0781: } else {
0782: // use crs as-is
0783: }
0784: } else {
0785: if (isZeroChanges(crs)) {
0786: crs = new ArrayList(changes);
0787: } else {
0788: crs.addAll(changes);
0789: }
0790: }
0791:
0792: // if we made it this far publish the change and return true.
0793: clientChangedObject(o, crs);
0794: publishChangedCount++;
0795: }
0796:
0797: private final boolean isZeroChanges(final Collection c) {
0798: return ((c == null) || (c == AnonymousChangeReport.LIST)
0799: || (c == AnonymousChangeReport.SET) || (c.isEmpty()));
0800: }
0801:
0802: /**
0803: * A extension subscriber may call this method to execute bulkAdd transactions.
0804: * This is protected because it is of very limited to other than persistance plugins.
0805: * Note that Blackboard does something like
0806: * this by hand constructing an entire special-purpose envelope. This, however, is
0807: * for use in-band, in-transaction.
0808: * The Collection passed MUST be immutable, since there may be many consumers,
0809: * each running at different times.
0810: */
0811: protected EnvelopeTuple bulkAddObject(Collection c) {
0812: checkTransactionOK("bulkAdd", c);
0813:
0814: EnvelopeTuple t;
0815: t = outbox.bulkAddObject(c);
0816:
0817: return t;
0818: }
0819:
0820: /**
0821: * Safer version of bulkAddObject(Collection).
0822: * Creates a Collection from the Enumeration and passes it into
0823: * the envelope.
0824: */
0825: protected EnvelopeTuple bulkAddObject(Enumeration en) {
0826: checkTransactionOK("bulkAdd", en);
0827:
0828: EnvelopeTuple t;
0829: t = outbox.bulkAddObject(en);
0830:
0831: return t;
0832: }
0833:
0834: protected EnvelopeTuple bulkAddObject(Iterator en) {
0835: checkTransactionOK("bulkAdd", en);
0836:
0837: EnvelopeTuple t;
0838: t = outbox.bulkAddObject(en);
0839:
0840: return t;
0841: }
0842:
0843: //
0844: // Transaction handling.
0845: //
0846: /*
0847: * It would be nice if we could merge the Transaction object with the
0848: * older open/close transaction code somehow - there is some redundancy
0849: * and the current parallel implementations are somewhat confusing.
0850: */
0851:
0852: /**
0853: * The transaction lock. At most one watcher per subscriber gets to
0854: * have an open transaction at one time. We could support multiple
0855: * simultaneously open transactions with multiple subscribers, but
0856: * this is a feature for another day.
0857: */
0858: private LockFlag transactionLock = new LockFlag();
0859:
0860: /**
0861: * The current in-force transaction instance.
0862: * The is only kept around as a check to pass to Transaction.close()
0863: * in order to make sure we're closing the right one.
0864: * In particular, we cannot use this in the publishWhatever methods
0865: * because the Blackboard methods are executing in the wrong thread.
0866: */
0867: private Transaction theTransaction = null;
0868:
0869: /**
0870: * Overridable by extending classes to specify more featureful
0871: * Transaction semantics.
0872: */
0873: protected Transaction newTransaction() {
0874: return new Transaction(this );
0875: }
0876:
0877: /**
0878: * Open a transaction by grabbing the transaction lock and updating
0879: * the subscriptions. This method blocks waiting for the
0880: * transaction lock.
0881: */
0882: public final void openTransaction() {
0883: transactionLock.getBusyFlag();
0884: finishOpenTransaction();
0885: }
0886:
0887: private long openTime = setTransactionOpenTime();
0888:
0889: protected final boolean isTimestamped() {
0890: return enableTimestamps;
0891: }
0892:
0893: protected final long setTransactionOpenTime() {
0894: if (enableTimestamps) {
0895: return (openTime = System.currentTimeMillis());
0896: } else {
0897: return -1;
0898: }
0899: }
0900:
0901: /**
0902: * Common routine for both openTransaction and tryOpenTransaction
0903: * does everything except getting the transactionLock busy flag.
0904: */
0905: private void finishOpenTransaction() {
0906: int count = transactionLock.getBusyCount();
0907: if (count > 1) {
0908: if (isEnforcing) {
0909: logger.error("Opened nested transaction (level="
0910: + count + ")", new Throwable());
0911: }
0912: return;
0913: }
0914:
0915: startTransaction();
0916:
0917: theDistributor.startTransaction();
0918:
0919: setTransactionOpenTime();
0920: if (privateUpdateSubscriptions()) {
0921: setHaveCollectionsChanged();
0922: }
0923: if (haveNewSubscriptions()) {
0924: setHaveCollectionsChanged();
0925: resetHaveNewSubscriptions();
0926: }
0927: noteOpenTransaction(this );
0928: }
0929:
0930: protected final void startTransaction() {
0931: theTransaction = newTransaction();
0932: Transaction.open(theTransaction);
0933: }
0934:
0935: private boolean _haveNewSubscriptions = false;
0936:
0937: private boolean haveNewSubscriptions() {
0938: return _haveNewSubscriptions;
0939: }
0940:
0941: private void setHaveNewSubscriptions() {
0942: _haveNewSubscriptions = true;
0943: }
0944:
0945: private void resetHaveNewSubscriptions() {
0946: _haveNewSubscriptions = false;
0947: }
0948:
0949: /**
0950: * Keep track of whether or not the collections have changed
0951: * since the previous openTransaction.
0952: */
0953: private boolean _haveCollectionsChangedSinceLastTransaction = false;
0954:
0955: /** set haveCollectionsChanged() */
0956: private void setHaveCollectionsChanged() {
0957: _haveCollectionsChangedSinceLastTransaction = true;
0958: }
0959:
0960: /** set haveCollectionsChanged() */
0961: private void resetHaveCollectionsChanged() {
0962: _haveCollectionsChangedSinceLastTransaction = false;
0963: }
0964:
0965: /** can be called by anyone who can open a transaction to decide what to do.
0966: * returned value is only valid/useful inside an open transaction.
0967: */
0968: public boolean haveCollectionsChanged() {
0969: return _haveCollectionsChangedSinceLastTransaction;
0970: }
0971:
0972: /** Attempt to open a transaction by attempting to grab the
0973: * transaction lock and updating the collections (iff we got the
0974: * lock).
0975: *
0976: * This is equivalent to the old (misnamed) tryLockSubscriber method
0977: * in PluginWrapper.
0978: *
0979: * @return true IFF a transaction was opened.
0980: */
0981: public final boolean tryOpenTransaction() {
0982: if (transactionLock.tryGetBusyFlag()) {
0983: finishOpenTransaction();
0984: return true;
0985: }
0986: return false;
0987: }
0988:
0989: /**
0990: * Close a transaction opened by openTransaction() or a successful
0991: * tryOpenTransaction(), but don't reset subscription changes or
0992: * clear delta lists.
0993: * @exception SubscriberException IFF we did not own the transaction
0994: * lock.
0995: */
0996: public final void closeTransactionDontReset() {
0997: closeTransaction(false);
0998: }
0999:
1000: /** check to see if we've already got an open transaction
1001: */
1002: public final boolean isTransactionOpen() {
1003: return (transactionLock.getBusyFlagOwner() == Thread
1004: .currentThread());
1005: }
1006:
1007: /** Close a transaction opened by openTransaction() or a
1008: * successful tryOpenTransaction().
1009: * @param resetSubscriptions IFF true, all subscriptions will have
1010: * their resetChanges() method called to clear any delta lists, etc.
1011: * @exception SubscriberException IFF we did not own the transaction
1012: * lock.
1013: * @deprecated Use {@link #closeTransactionDontReset closeTransactionDontReset}
1014: * This method becomes private after deprecation period expires.
1015: */
1016: public final void closeTransaction(boolean resetSubscriptions)
1017: throws SubscriberException {
1018: if (transactionLock.getBusyFlagOwner() == Thread
1019: .currentThread()) {
1020: // only do our closeTransaction work when exiting the nest.
1021: if (transactionLock.getBusyCount() == 1) {
1022: checkUnpostedChangeReports();
1023:
1024: if (!isReadyToPersist()) {
1025: setReadyToPersist();
1026: }
1027: if (resetSubscriptions)
1028: resetSubscriptionChanges();
1029: Envelope box = privateGetPublishedChanges();
1030: try {
1031: theDistributor.finishTransaction(box, getClient());
1032: } finally {
1033: stopTransaction();
1034: }
1035: } else {
1036: // Nested transaction (more than 1 busy)?
1037: //System.err.println("Closed nested transaction.");
1038: if (logger.isDebugEnabled())
1039: logger.debug("Closed nested transaction.");
1040: }
1041: // If no subscriptions we will never process the inbox. Empty
1042: // it to conserve memory instead of waiting for
1043: // openTransaction
1044: synchronized (inboxLock) {
1045: if (getSubscriptionCount() == 0) {
1046: pendingEnvelopes.clear();
1047: }
1048: if (!transactionLock.freeBusyFlag()) {
1049: throw new SubscriberException(
1050: "Failed to close an owned transaction");
1051: }
1052: }
1053: } else {
1054: throw new SubscriberException(
1055: "Attempt to close a non-open transaction");
1056: }
1057: noteCloseTransaction(this );
1058: }
1059:
1060: protected final void stopTransaction() {
1061: Transaction.close(theTransaction);
1062: theTransaction = null;
1063: }
1064:
1065: protected final void checkUnpostedChangeReports() {
1066: //Map map = theTransaction.getChangeMap();
1067: Map map = Transaction.getCurrentTransaction().getChangeMap();
1068: if (warnUnpublishChanges) {
1069: if (map == null || map.size() == 0)
1070: return;
1071: if (logger.isWarnEnabled())
1072: logger
1073: .warn("Ignoring outstanding unpublished changes:");
1074: for (Iterator ki = map.keySet().iterator(); ki.hasNext();) {
1075: Object o = ki.next();
1076: List l = (List) map.get(o);
1077: if (logger.isWarnEnabled())
1078: logger.warn("\t" + o + " (" + l.size() + ")");
1079: // we could just publish them with something like:
1080: //handleActiveSubscriptionObjects()
1081: //clientChangedObject(o, l);
1082: }
1083: }
1084: }
1085:
1086: /** Close a transaction opened by openTransaction() or a
1087: * successful tryOpenTransaction().
1088: * Will reset all subscription change tracking facilities.
1089: * To avoid this, use closeTransactionDontReset() instead.
1090: * @exception SubscriberException IFF we did not own the transaction
1091: * lock.
1092: */
1093: public final void closeTransaction() {
1094: closeTransaction(true);
1095: }
1096:
1097: /** Does someone have an open transaction? */
1098: public final boolean isInTransaction() {
1099: return (transactionLock.getBusyFlagOwner() != null);
1100: }
1101:
1102: /** Do I have an open transaction?
1103: * This really translates to "Is is safe to make changes to my
1104: * collections?"
1105: */
1106: public final boolean isMyTransaction() {
1107: return (transactionLock.getBusyFlagOwner() == Thread
1108: .currentThread());
1109: }
1110:
1111: //
1112: // Interest Handling - extension of earlier wakeRequest and
1113: // interestSemaphore code.
1114: //
1115:
1116: /** list of SubscriptionWatchers to be notified when something
1117: * interesting happens. Access must be synchronized on watchers.
1118: */
1119: private final List watchers = new ArrayList(1);
1120:
1121: public final SubscriptionWatcher registerInterest(
1122: SubscriptionWatcher w) {
1123: if (w == null) {
1124: throw new IllegalArgumentException(
1125: "Null SubscriptionWatcher");
1126: }
1127:
1128: synchronized (watchers) {
1129: watchers.add(w);
1130: }
1131:
1132: return w;
1133: }
1134:
1135: /** Allow a thread of a subscriber to register an interest in the
1136: * subscriber's collections. Mainly used to allow threads to monitor
1137: * changes in collections - that is, the fact of change, not the details.
1138: * The level of support here is like the old wake and interestSemaphore
1139: * code. The client of a subscriber need not register explicitly, as
1140: * it is done at initialization time.
1141: */
1142: public final SubscriptionWatcher registerInterest() {
1143: return registerInterest(new SubscriptionWatcher());
1144: }
1145:
1146: /** Allow a thread to unregister an interest registered by
1147: * registerInterest. Should be done if a subordinate (watching)
1148: * thread exits, or a plugin unloads.
1149: */
1150: public final void unregisterInterest(SubscriptionWatcher w)
1151: throws SubscriberException {
1152: synchronized (watchers) {
1153: if (!watchers.remove(w)) {
1154: throw new SubscriberException(
1155: "Attempt to unregisterInterest of unknown SubscriptionWatcher");
1156: }
1157: }
1158: }
1159:
1160: //
1161: // watcher triggers
1162: //
1163:
1164: private boolean _externalActivity = false;
1165: private boolean _internalActivity = false;
1166: private boolean _clientActivity = false;
1167:
1168: public boolean wasExternalActivity() {
1169: return _externalActivity;
1170: }
1171:
1172: public boolean wasInternalActivity() {
1173: return _internalActivity;
1174: }
1175:
1176: public boolean wasClientActivity() {
1177: return _clientActivity;
1178: }
1179:
1180: /** called when external activity changes the subscriber's collections.
1181: * by default, just calls wakeSubscriptionWatchers, but subclasses
1182: * may be more circumspect.
1183: */
1184: public void signalExternalActivity() {
1185: _externalActivity = true;
1186: wakeSubscriptionWatchers(SubscriptionWatcher.EXTERNAL);
1187: }
1188:
1189: /** called when internal activity actually changes the subscriber's
1190: * collections.
1191: * by default, just calls wakeSubscriptionWatchers, but subclasses
1192: * may be more circumspect.
1193: */
1194: public void signalInternalActivity() {
1195: _internalActivity = true;
1196: wakeSubscriptionWatchers(SubscriptionWatcher.INTERNAL);
1197: }
1198:
1199: /** called when the client (Plugin) requests that it be waked again.
1200: * by default, just calls wakeSubscriptionWatchers, but subclasses
1201: * may be more circumspect.
1202: */
1203: public void signalClientActivity() {
1204: _clientActivity = true;
1205: wakeSubscriptionWatchers(SubscriptionWatcher.CLIENT);
1206: }
1207:
1208: /** called to notify all SubscriptionWatchers.
1209: */
1210: private final void wakeSubscriptionWatchers(int event) {
1211: synchronized (watchers) {
1212: int l = watchers.size();
1213: for (int i = 0; i < l; i++) {
1214: ((SubscriptionWatcher) (watchers.get(i)))
1215: .signalNotify(event);
1216: }
1217: }
1218: }
1219:
1220: //
1221: // usability and debugability methods
1222: //
1223:
1224: public String toString() {
1225: String cs = "(self)";
1226: if (theClient != this )
1227: cs = theClient.toString();
1228:
1229: return "<" + getClass().getName() + " " + this .hashCode()
1230: + " for " + cs + " and " + theDistributor + ">";
1231: }
1232:
1233: /** utility to claim an object as ours */
1234: protected void claimObject(Object o) {
1235: if (o instanceof ClaimableHolder) {
1236: Claimable c = ((ClaimableHolder) o).getClaimable();
1237: if (c != null) {
1238: //System.err.println("\n->"+getClient()+" claimed "+c);
1239: c.setClaim(getClient());
1240: }
1241: }
1242: }
1243:
1244: /** utility to release a claim on an object */
1245: protected void unclaimObject(Object o) {
1246: if (o instanceof ClaimableHolder) {
1247: Claimable c = ((ClaimableHolder) o).getClaimable();
1248: if (c != null) {
1249: //System.err.println("\n->"+getClient()+" unclaimed "+c);
1250: c.resetClaim(getClient());
1251: }
1252: }
1253: }
1254:
1255: /** return the client of the the subscriber.
1256: * May be overridden by subclasses in case they are really
1257: * delegating to some other object.
1258: */
1259: public BlackboardClient getClient() {
1260: return theClient;
1261: }
1262:
1263: //Leftover from ancient code - now in BlackboardService... may want to
1264: //deprecate next release?
1265: public Subscriber getSubscriber() {
1266: return this ;
1267: }
1268:
1269: // try and save the state so that we can abort open transactions if
1270: // someone is bad.
1271: private static final ThreadLocal _openTransaction = new ThreadLocal();
1272:
1273: private static void noteOpenTransaction(Subscriber s) {
1274: _openTransaction.set(s);
1275: }
1276:
1277: private static void noteCloseTransaction(Subscriber s) {
1278: if (s != _openTransaction.get()) {
1279: Logging.getLogger(Subscriber.class).error(
1280: "Attempt to close a transaction from a different thread"
1281: + " than the one which opened it:\n\t" + s
1282: + "\t" + _openTransaction.get(),
1283: new Throwable());
1284: }
1285: _openTransaction.set(null);
1286: }
1287:
1288: public static boolean abortTransaction() {
1289: Subscriber s = (Subscriber) _openTransaction.get();
1290: if (s != null) {
1291: s.closeTransaction();
1292: return true;
1293: } else {
1294: return false;
1295: }
1296: }
1297:
1298: }
|