001: /*
002: * <copyright>
003: *
004: * Copyright 1997-2007 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026:
027: package org.cougaar.core.blackboard;
028:
029: import java.util.ArrayList;
030: import java.util.Collections;
031: import java.util.HashMap;
032: import java.util.LinkedHashSet;
033: import java.util.List;
034: import java.util.Map;
035: import java.util.Set;
036: import org.cougaar.util.StackElements;
037: import org.cougaar.util.UnaryPredicate;
038: import org.cougaar.core.util.UID;
039: import org.cougaar.core.util.UniqueObject;
040:
041: /**
042: * A subscription that tracks the plugin publisher and add/change stacks for
043: * UniqueObjects.
044: * <p>
045: * @see Subscriber must enable "-Dorg.cougaar.core.blackboard.trackPublishers=true"
046: */
047: public class PublisherSubscription extends Subscription {
048:
049: // our per-uid table
050: //Map<UID, Info>
051: private final Map map = new HashMap(13);
052:
053: // if we're not shared, this is the transaction-pending removal list
054: private final List removedList;
055:
056: // our temp variable that's used when updating this subscription.
057: // it's only used in the distributor lock, so it's safe.
058: private String name;
059:
060: public PublisherSubscription(UnaryPredicate p) {
061: this (p, true);
062: }
063:
064: /**
065: * @param isShared use "false" for plugins and "true" for servlets.
066: * This flag controls whether removes are immediately processed or
067: * delayed until the end of the client's transaction.
068: */
069: public PublisherSubscription(UnaryPredicate p, boolean isShared) {
070: super (p);
071: removedList = (isShared ? null : new ArrayList(11));
072: }
073:
074: //
075: // all the methods from Subscription are provided.
076: //
077:
078: /**
079: * @return publish details for the UniqueObject.
080: */
081: public PublisherInfo getInfo(UID uid) {
082: synchronized (map) {
083: Info info = (Info) map.get(uid);
084: if (info == null) {
085: return null;
086: } else {
087: return new PublisherInfo(getPublisher(info),
088: getAddStack(info), getChangeStacks(info));
089: }
090: }
091: }
092:
093: private void privateAdd(Object o, StackElements se,
094: boolean isVisible) {
095: // always fill in the map, even if (!isVisible)
096: if (name != null && o instanceof UniqueObject) {
097: UID uid = ((UniqueObject) o).getUID();
098: if (uid != null) {
099: synchronized (map) {
100: map.put(uid, newInfo(name, se));
101: }
102: }
103: }
104: }
105:
106: private void privateChange(Object o, List changes,
107: StackElements se, boolean isVisible) {
108: if (se != null && o instanceof UniqueObject) {
109: UID uid = ((UniqueObject) o).getUID();
110: if (uid != null) {
111: synchronized (map) {
112: Info info = (Info) map.get(uid);
113: if (info != null) {
114: Info newInfo = changeInfo(info, se);
115: if (newInfo != info) {
116: map.put(uid, newInfo);
117: }
118: }
119: }
120: }
121: }
122: }
123:
124: private void privateRemove(Object o, StackElements se,
125: boolean isVisible) {
126: if (removedList == null) {
127: removeEntry(o); // remove immediately
128: } else {
129: removedList.add(o); // wait until transaction close
130: }
131: }
132:
133: protected void privateAdd(Object o, boolean isVisible) {
134: privateAdd(o, null, isVisible);
135: }
136:
137: protected void privateChange(Object o, List changes,
138: boolean isVisible) {
139: privateChange(o, changes, null, isVisible);
140: }
141:
142: protected void privateRemove(Object o, boolean isVisible) {
143: privateRemove(o, null, isVisible);
144: }
145:
146: private void removeEntry(Object o) {
147: if (o instanceof UniqueObject) {
148: UID uid = ((UniqueObject) o).getUID();
149: synchronized (map) {
150: map.remove(uid);
151: }
152: }
153: }
154:
155: protected void resetChanges() {
156: super .resetChanges();
157: if (removedList != null) {
158: // process removals
159: int n = removedList.size();
160: if (n > 0) {
161: for (int i = 0; i < n; i++) {
162: removeEntry(removedList.get(i));
163: }
164: removedList.clear();
165: }
166: }
167: }
168:
169: public boolean apply(Envelope envelope) {
170: if (envelope instanceof TimestampedEnvelope) {
171: TimestampedEnvelope te = (TimestampedEnvelope) envelope;
172: if (te.getTransactionCloseTime() >= 0) {
173: this .name = te.getName();
174: //return super.apply(envelope);
175: {
176: // based on subscription/tuple code:
177: boolean vp = te.get_isVisible();
178: boolean somethingFired = false;
179: List deltas = te.getRawDeltas();
180: int l = deltas.size();
181: for (int i = 0; i < l; i++) {
182: EnvelopeTuple tuple = (EnvelopeTuple) deltas
183: .get(i);
184: boolean b;
185: if (tuple instanceof AddEnvelopeTuple) {
186: Object object = tuple.getObject();
187: b = predicate.execute(object);
188: if (b) {
189: privateAdd(object, tuple.getStack(), vp);
190: }
191: } else if (tuple instanceof ChangeEnvelopeTuple) {
192: Object object = tuple.getObject();
193: b = predicate.execute(object);
194: if (b) {
195: List changes = (List) ((ChangeEnvelopeTuple) tuple)
196: .getChangeReports();
197: privateChange(object, changes, tuple
198: .getStack(), vp);
199: }
200: } else if (tuple instanceof RemoveEnvelopeTuple) {
201: Object object = tuple.getObject();
202: b = predicate.execute(object);
203: if (b) {
204: privateRemove(object, tuple.getStack(),
205: vp);
206: }
207: } else {
208: b = tuple.applyToSubscription(this , vp);
209: }
210: somethingFired |= b;
211: }
212: return vp && somethingFired;
213: }
214: }
215: } else if (envelope instanceof InitializeSubscriptionEnvelope) {
216: super .apply(envelope);
217: } else {
218: // the subscriber -D was not set, so do nothing
219: }
220: return false;
221: }
222:
223: // accessor methods
224: private static String getPublisher(Info info) {
225: return (info == null ? null : info.getPublisher());
226: }
227:
228: private static StackElements getAddStack(Info info) {
229: return (info == null ? null : info.getAddStack());
230: }
231:
232: private static Set getChangeStacks(Info info) {
233: Set ret = null;
234: if (info != null) {
235: ret = info.getChangeStacks();
236: if (ret != null && ret.size() > 1) {
237: // must make a copy, since our sub can modify the set
238: ret = Collections
239: .unmodifiableSet(new LinkedHashSet(ret));
240: }
241: }
242: return ret;
243: }
244:
245: // factory methods
246: private static Info newInfo(String publisher, StackElements stack) {
247: if (stack == null) {
248: return new Info(publisher);
249: } else {
250: return new InfoAdd(publisher, stack);
251: }
252: }
253:
254: private static Info changeInfo(Info info, StackElements stack) {
255: InfoAddChange iac;
256: if (info instanceof InfoAddChange) {
257: iac = (InfoAddChange) info;
258: } else {
259: iac = new InfoAddChange(info.getPublisher(), info
260: .getAddStack());
261: }
262: iac.addChangeStack(stack);
263: return iac;
264: }
265:
266: // our info impls, with subclasses to avoid unnecessary fields
267: private static class Info {
268: private final String publisher;
269:
270: public Info(String publisher) {
271: this .publisher = publisher;
272: }
273:
274: public final String getPublisher() {
275: return publisher;
276: }
277:
278: public StackElements getAddStack() {
279: return null;
280: }
281:
282: // optionally add timestamps here..
283: public Set getChangeStacks() {
284: return null;
285: }
286: }
287:
288: private static class InfoAdd extends Info {
289: private final StackElements add_stack;
290:
291: public InfoAdd(String publisher, StackElements add_stack) {
292: super (publisher);
293: this .add_stack = add_stack;
294: }
295:
296: public StackElements getAddStack() {
297: return add_stack;
298: }
299:
300: }
301:
302: private static class InfoAddChange extends InfoAdd {
303: private Set change_stacks;
304:
305: public InfoAddChange(String publisher, StackElements stack) {
306: super (publisher, stack);
307: }
308:
309: public Set getChangeStacks() {
310: return change_stacks;
311: }
312:
313: public void addChangeStack(StackElements se) {
314: if (change_stacks == null) {
315: change_stacks = Collections.singleton(se);
316: } else {
317: if (change_stacks.size() == 1) {
318: Object o = change_stacks.iterator().next();
319: if (se.equals(o)) {
320: return;
321: }
322: change_stacks = new LinkedHashSet();
323: change_stacks.add(o);
324: }
325: change_stacks.add(se);
326: }
327: }
328: }
329: }
|