001: /*
002: * <copyright>
003: *
004: * Copyright 1997-2004 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026:
027: package org.cougaar.core.blackboard;
028:
029: import java.util.ArrayList;
030: import java.util.HashMap;
031: import java.util.List;
032: import java.util.Map;
033:
034: import org.cougaar.core.util.UID;
035: import org.cougaar.core.util.UniqueObject;
036: import org.cougaar.util.UnaryPredicate;
037:
038: /**
039: * A {@link Subscription} that tracks {@link UniqueObject} publishAdd
040: * and most recent publishChange timestamps.
041: * <p>
042: * These timestamps are not persisted, and upon rehydration the
043: * creation time of the objects will be the agent restart time.
044: *
045: * @see Subscriber required system property that must be enabled
046: * @see #getTimestampEntry access to the (UID, TimestampEntry) timestamp data
047: */
048: public class TimestampSubscription extends Subscription {
049:
050: // if this timestamp subscription is not shared, this list
051: // buffers the removed entries until after the plugin
052: // transaction completes.
053: //
054: // this list is locked by the transaction.
055: private final List removedList;
056:
057: // a map if (UID, TimestampEntry) pairs
058: //
059: // the map is locked to allow multiple reader threads to
060: // access the "get*(UID)" methods. Even if the subscription
061: // is not shared, it must be locked to allow distributor
062: // updates during a transaction.
063: private final Map map;
064:
065: // the "apply(..)" timestamp from the transaction close time
066: // of the TimestampedEnvelope that is being processed.
067: //
068: // only one "apply(..)" can occur at a time, so this is thread
069: // safe.
070: private long time;
071:
072: /**
073: * Equivalent to<code>new TimestampSubscription(p, true)</code>.
074: */
075: public TimestampSubscription(UnaryPredicate p) {
076: this (p, true);
077: }
078:
079: /**
080: * @param p the predicate should only accept UniqueObjects;
081: * all non-UniqueObjects and UniqueObjects with null UIDs
082: * are ignored.
083: * @param isShared if true, removals are done immediately,
084: * otherwise they are done at the end of the plugin's
085: * transaction.
086: */
087: public TimestampSubscription(UnaryPredicate p, boolean isShared) {
088: super (p);
089: map = new HashMap(13);
090: removedList = (isShared ? null : new ArrayList(11));
091: }
092:
093: //
094: // all the methods from Subscription are provided.
095: //
096:
097: /**
098: * @see #getTimestampEntry get the creation time
099: */
100: public long getCreationTime(UID uid) {
101: TimestampEntry entry = getTimestampEntry(uid);
102: return ((entry != null) ? entry.getCreationTime()
103: : TimestampEntry.UNKNOWN_TIME);
104: }
105:
106: /**
107: * @see #getTimestampEntry get the modification time
108: */
109: public long getModificationTime(UID uid) {
110: TimestampEntry entry = getTimestampEntry(uid);
111: return ((entry != null) ? entry.getModificationTime()
112: : TimestampEntry.UNKNOWN_TIME);
113: }
114:
115: /**
116: * Get the TimestampEntry for the local blackboard object with the
117: * given UID.
118: * <p>
119: * The object must match this subscription's predicate.
120: * <p>
121: * The timestamps are measured in milliseconds, and matches the
122: * transaction close times of the subscriber that performed
123: * the "publishAdd()" or "publishChange()".
124: * <p>
125: * This method is thread-safe to allow multiple clients to
126: * access the underlying (UID, TimestampEntry) map. The map is
127: * also updated <i>during</i> the subscriber's transaction.
128: * Multiple calls to "getTimestampEntry()", even within the same
129: * subscriber transaction, may return different results.
130: *
131: * @return the TimestampEntry, or null if not known.
132: */
133: public TimestampEntry getTimestampEntry(UID uid) {
134: synchronized (map) {
135: return (TimestampEntry) map.get(uid);
136: }
137: }
138:
139: protected void privateAdd(Object o, boolean isVisible) {
140: // always fill in the map, even if (!isVisible)
141: if (o instanceof UniqueObject) {
142: UID uid = ((UniqueObject) o).getUID();
143: if (uid != null) {
144: TimestampEntry entry = new TimestampEntry(time, time);
145: synchronized (map) {
146: map.put(uid, entry);
147: }
148: }
149: }
150: }
151:
152: protected void privateChange(Object o, List changes,
153: boolean isVisible) {
154: if (o instanceof UniqueObject) {
155: UID uid = ((UniqueObject) o).getUID();
156: if (uid != null) {
157: TimestampEntry newEntry = new TimestampEntry(time, time);
158: synchronized (map) {
159: TimestampEntry prevEntry = (TimestampEntry) map
160: .put(uid, newEntry);
161: if (prevEntry != null) {
162: // typical case. replace an existing entry.
163: long creationTime = prevEntry.getCreationTime();
164: // assert (creationTime <= time);
165: //
166: // this "private_*" call saves us an extra "map.get(..)".
167: // it is safe only within this "map.put(..)" situation
168: newEntry.private_setCreationTime(creationTime);
169: }
170: }
171: }
172: }
173: }
174:
175: protected void privateRemove(Object o, boolean isVisible) {
176: if (removedList == null) {
177: removeEntry(o); // remove immediately
178: } else {
179: removedList.add(o); // wait until transaction close
180: }
181: }
182:
183: private void removeEntry(Object o) {
184: if (o instanceof UniqueObject) {
185: UID uid = ((UniqueObject) o).getUID();
186: synchronized (map) {
187: map.remove(uid);
188: }
189: }
190: }
191:
192: protected void resetChanges() {
193: super .resetChanges();
194: if (removedList != null) {
195: // process removals
196: int n = removedList.size();
197: if (n > 0) {
198: for (int i = 0; i < n; i++) {
199: removeEntry(removedList.get(i));
200: }
201: removedList.clear();
202: }
203: }
204: }
205:
206: public boolean apply(Envelope envelope) {
207: if (envelope instanceof TimestampedEnvelope) {
208: TimestampedEnvelope te = (TimestampedEnvelope) envelope;
209: long closeTime = te.getTransactionCloseTime();
210: if (closeTime != TimestampEntry.UNKNOWN_TIME) {
211: this .time = closeTime;
212: return super .apply(envelope);
213: }
214: } else if (envelope instanceof InitializeSubscriptionEnvelope) {
215: super .apply(envelope);
216: }
217: // FIXME should we still "apply(..)" with the current time?
218: return false;
219: }
220:
221: }
|