001: /*
002: * <copyright>
003: *
004: * Copyright 1997-2004 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026:
027: package org.cougaar.core.node;
028:
029: import java.net.URI;
030: import java.util.ArrayList;
031: import java.util.Collections;
032: import java.util.Comparator;
033: import java.util.HashMap;
034: import java.util.HashSet;
035: import java.util.Iterator;
036: import java.util.List;
037: import java.util.Map;
038: import java.util.Set;
039: import org.cougaar.bootstrap.SystemProperties;
040: import org.cougaar.core.component.Component;
041: import org.cougaar.core.component.ServiceBroker;
042: import org.cougaar.core.component.ServiceProvider;
043: import org.cougaar.core.component.ServiceRevokedListener;
044: import org.cougaar.core.mts.MessageAddress;
045: import org.cougaar.core.service.IncarnationService;
046: import org.cougaar.core.service.LoggingService;
047: import org.cougaar.core.service.ThreadService;
048: import org.cougaar.core.service.wp.AddressEntry;
049: import org.cougaar.core.service.wp.Callback;
050: import org.cougaar.core.service.wp.Response;
051: import org.cougaar.core.service.wp.WhitePagesService;
052: import org.cougaar.core.thread.Schedulable;
053: import org.cougaar.core.thread.SchedulableStatus;
054: import org.cougaar.util.GenericStateModelAdapter;
055: import org.cougaar.util.IdentityHashSet;
056:
057: /**
058: * This component provides the {@link IncarnationService} that
059: * monitors agent incarnation (version) numbers in the {@link
060: * WhitePagesService} and notifies clients of any changes.
061: *
062: * @property org.cougaar.core.node.incarnation.period
063: * Milliseconds between white pages incarnation polling to detect
064: * agent restarts, defaults to 43000.
065: */
066: public final class Incarnation extends GenericStateModelAdapter
067: implements Component {
068:
069: private static final long RESTART_CHECK_INTERVAL = SystemProperties
070: .getLong("org.cougaar.core.node.incarnation.period", 43000L);
071:
072: private ServiceBroker sb;
073:
074: private LoggingService log;
075: private ServiceBroker rootsb;
076: private WhitePagesService wps;
077:
078: private IncarnationSP isp;
079:
080: private Schedulable pollThread;
081:
082: // map of agent name to an entry with the most recently observed
083: // incarnation and listener callbacks
084: private final Map incarnationMap = new HashMap();
085:
086: // WP callbacks for non-blocking lookups
087: private final Map pendingMap = new HashMap();
088:
089: public void setServiceBroker(ServiceBroker sb) {
090: this .sb = sb;
091: }
092:
093: public void load() {
094: super .load();
095:
096: log = (LoggingService) sb.getService(this ,
097: LoggingService.class, null);
098:
099: // get root sb
100: NodeControlService ncs = (NodeControlService) sb.getService(
101: this , NodeControlService.class, null);
102: if (ncs == null) {
103: throw new RuntimeException(
104: "Unable to obtain NodeControlService");
105: }
106: rootsb = ncs.getRootServiceBroker();
107: sb.releaseService(this , NodeControlService.class, ncs);
108:
109: // get wp
110: wps = (WhitePagesService) sb.getService(this ,
111: WhitePagesService.class, null);
112: if (wps == null) {
113: throw new RuntimeException(
114: "Unable to obtain WhitePagesService");
115: }
116:
117: // get thread
118: ThreadService threadService = (ThreadService) sb.getService(
119: this , ThreadService.class, null);
120: Runnable pollRunner = new Runnable() {
121: public void run() {
122: pollWhitePages();
123: }
124: };
125: pollThread = threadService.getThread(this , pollRunner,
126: "Incarnation");
127: sb.releaseService(this , ThreadService.class, threadService);
128:
129: // assume we're running
130: pollThread.schedule(RESTART_CHECK_INTERVAL,
131: RESTART_CHECK_INTERVAL);
132:
133: // advertise our service
134: isp = new IncarnationSP();
135: rootsb.addService(IncarnationService.class, isp);
136: }
137:
138: public void unload() {
139: if (pollThread != null) {
140: pollThread.cancelTimer();
141: pollThread = null;
142: }
143: if (isp != null) {
144: sb.revokeService(IncarnationService.class, isp);
145: isp = null;
146: }
147: if (wps != null) {
148: sb.releaseService(this , WhitePagesService.class, wps);
149: wps = null;
150: }
151: super .unload();
152: }
153:
154: private long getIncarnation(MessageAddress agentId) {
155: synchronized (incarnationMap) {
156: Entry e = (Entry) incarnationMap.get(agentId);
157: if (e == null) {
158: return 0;
159: }
160: return e.getIncarnation();
161: }
162: }
163:
164: private int updateIncarnation(MessageAddress agentId, long inc) {
165: if (inc <= 0) {
166: // invalid incarnation (wp cache miss?)
167: return 0;
168: }
169: List callbacks;
170: synchronized (incarnationMap) {
171: Entry e = (Entry) incarnationMap.get(agentId);
172: if (e == null) {
173: // no subscribers for this information!
174: return 0;
175: }
176: long cachedInc = e.getIncarnation();
177: if (inc == cachedInc) {
178: // no change
179: return 0;
180: }
181: if (inc < cachedInc) {
182: // stale incarnation
183: return -1;
184: }
185: // increase
186: if (log.isInfoEnabled()) {
187: log.info("Update agent " + agentId + " from "
188: + cachedInc + " to " + inc);
189: }
190: e.setIncarnation(inc);
191: if (cachedInc == 0) {
192: // first time, don't invoke callbacks
193: return 0;
194: }
195: // get callbacks
196: callbacks = e.getCallbacks();
197: }
198: // invoke callbacks in the caller's thread
199: int n = (callbacks == null ? 0 : callbacks.size());
200: for (int i = 0; i < n; i++) {
201: IncarnationService.Callback cb = (IncarnationService.Callback) callbacks
202: .get(i);
203: if (log.isDebugEnabled()) {
204: log.debug("Invoking callback(" + agentId + ", " + inc
205: + ")[" + i + " / " + n + "]: " + cb);
206: }
207: cb.incarnationChanged(agentId, inc);
208: }
209: return 1;
210: }
211:
212: private boolean subscribe(MessageAddress agentId,
213: IncarnationService.Callback cb, long initialInc) {
214: if (agentId == null || cb == null) {
215: throw new IllegalArgumentException("null "
216: + (agentId == null ? "addr" : "cb"));
217: }
218: long inc = initialInc;
219: while (true) {
220: long cachedInc;
221: synchronized (incarnationMap) {
222: Entry e = (Entry) incarnationMap.get(agentId);
223: if (e == null) {
224: if (log.isInfoEnabled()) {
225: log.info("Adding " + agentId);
226: }
227: e = new Entry();
228: incarnationMap.put(agentId, e);
229: }
230: cachedInc = e.getIncarnation();
231: if (inc <= 0 || inc == cachedInc) {
232: // okay to add now
233: boolean ret = e.addCallback(cb);
234: if (log.isDetailEnabled()) {
235: log.detail("addCallback(" + agentId + ", " + cb
236: + ", " + inc + ")=" + ret);
237: }
238: return ret;
239: }
240: }
241: // inc != cachedInc, so we must bring them into sync,
242: // but don't invoke callbacks while holding the lock!
243: if (inc >= cachedInc) {
244: updateIncarnation(agentId, inc);
245: continue;
246: }
247: if (log.isDebugEnabled()) {
248: log.debug("Invoking callback(" + agentId + ", " + inc
249: + "):" + cb);
250: }
251: cb.incarnationChanged(agentId, cachedInc);
252: inc = cachedInc;
253: }
254: }
255:
256: private boolean unsubscribe(MessageAddress agentId,
257: IncarnationService.Callback cb) {
258: if (agentId == null || cb == null) {
259: throw new IllegalArgumentException("null "
260: + (agentId == null ? "addr" : "cb"));
261: }
262: synchronized (incarnationMap) {
263: Entry e = (Entry) incarnationMap.get(agentId);
264: if (e == null) {
265: return false;
266: }
267: if (!e.removeCallback(cb)) {
268: return false;
269: }
270: if (!e.hasCallbacks()) {
271: incarnationMap.remove(agentId);
272: if (log.isInfoEnabled()) {
273: log.info("Removing " + agentId);
274: }
275: }
276: return true;
277: }
278: }
279:
280: /**
281: * Periodically called to poll for remote agent incarnation
282: * changes.
283: */
284: private void pollWhitePages() {
285: if (log.isDebugEnabled()) {
286: log.debug("pollWhitePages");
287: }
288: // snapshot the agent names
289: Set agentIds;
290: synchronized (incarnationMap) {
291: if (incarnationMap.isEmpty()) {
292: return; // nothing to do
293: }
294: agentIds = new HashSet(incarnationMap.keySet());
295: }
296: // update the latest incarnations from the white pages
297: for (Iterator iter = agentIds.iterator(); iter.hasNext();) {
298: MessageAddress agentId = (MessageAddress) iter.next();
299: long currentInc = lookupIncarnation(agentId);
300: updateIncarnation(agentId, currentInc);
301: }
302: }
303:
304: /**
305: * White pages lookup to get the latest incarnation number for
306: * the specified agent.
307: *
308: * @return -1 if the WP lacks an entry, -2 if a WP background
309: * lookup is in progress, or > 0 for a valid incarnation
310: */
311: private long lookupIncarnation(MessageAddress agentId) {
312: AddressEntry entry;
313:
314: // runs in the pollThread, so no locking required
315: BlockingWPCallback callback = (BlockingWPCallback) pendingMap
316: .get(agentId);
317: if (callback == null) {
318: // no pending callback yet.
319: callback = new BlockingWPCallback();
320: wps.get(agentId.getAddress(), "version", callback);
321: if (callback.completed) {
322: // cache hit
323: entry = callback.entry;
324: } else {
325: // no cache hit. Remember the callback.
326: pendingMap.put(agentId, callback);
327: return -2;
328: }
329: } else if (callback.completed) {
330: // pending callback completed
331: entry = callback.entry;
332: pendingMap.remove(agentId);
333: } else {
334: // pending callback not completed yet
335: return -2;
336: }
337:
338: if (entry == null) {
339: // log this?
340: // return error code
341: return -1;
342: }
343:
344: // parse the entry
345: String path = entry.getURI().getPath();
346: int end = path.indexOf('/', 1);
347: String incn_str = path.substring(1, end);
348: return Long.parseLong(incn_str);
349: }
350:
351: /** an incarnationMap entry */
352: private static final class Entry {
353:
354: // comparator that puts non-comparables first
355: private static final Comparator CALLBACK_COMP = new Comparator() {
356: public int compare(Object o1, Object o2) {
357: if (o1 instanceof Comparable) {
358: if (o2 instanceof Comparable) {
359: return ((Comparable) o1).compareTo(o2);
360: } else {
361: return 1;
362: }
363: } else if (o2 instanceof Comparable) {
364: return -1;
365: } else {
366: return 0;
367: }
368: }
369: };
370:
371: // cached incarnation.
372: private long inc;
373:
374: // our callbacks
375: private Set callbacks;
376:
377: public long getIncarnation() {
378: return inc;
379: }
380:
381: public void setIncarnation(long currentInc) {
382: inc = currentInc;
383: }
384:
385: public boolean hasCallbacks() {
386: return (callbacks != null && !callbacks.isEmpty());
387: }
388:
389: public boolean addCallback(IncarnationService.Callback cb) {
390: if (callbacks == null) {
391: // use a regular IdentityHashSet and sort on "update",
392: // instead of using a more expensive TreeSet.
393: callbacks = new IdentityHashSet();
394: }
395: return callbacks.add(cb);
396: }
397:
398: public boolean removeCallback(IncarnationService.Callback cb) {
399: if (callbacks == null) {
400: return false;
401: }
402: return callbacks.remove(cb);
403: }
404:
405: public List getCallbacks() {
406: if (callbacks.isEmpty()) {
407: return null;
408: }
409: List ret = new ArrayList(callbacks);
410: Collections.sort(ret, CALLBACK_COMP);
411: return ret;
412: }
413:
414: public String toString() {
415: return "(entry inc=" + inc + " callbacks["
416: + (callbacks == null ? 0 : callbacks.size()) + "]="
417: + callbacks + ")";
418: }
419: }
420:
421: private final class IncarnationSP implements ServiceProvider {
422: public Object getService(ServiceBroker sb, Object requestor,
423: Class serviceClass) {
424: if (IncarnationService.class.isAssignableFrom(serviceClass)) {
425: return new Impl();
426: } else {
427: return null;
428: }
429: }
430:
431: public void releaseService(ServiceBroker sb, Object requestor,
432: Class serviceClass, Object service) {
433: if (!(service instanceof Impl)) {
434: return;
435: }
436: ((Impl) service).unsubscribeAll();
437: }
438:
439: private class Impl implements IncarnationService {
440: // map from agentId to callbacks
441: private final Map subs = new HashMap();
442:
443: public long getIncarnation(MessageAddress addr) {
444: MessageAddress agentId = addr.getPrimary();
445: return Incarnation.this .getIncarnation(agentId);
446: }
447:
448: public int updateIncarnation(MessageAddress addr, long inc) {
449: MessageAddress agentId = addr.getPrimary();
450: return Incarnation.this .updateIncarnation(agentId, inc);
451: }
452:
453: public Map getSubscriptions() {
454: synchronized (subs) {
455: return (subs.isEmpty() ? Collections.EMPTY_MAP
456: : (new HashMap(subs)));
457: }
458: }
459:
460: public boolean subscribe(MessageAddress addr, Callback cb) {
461: return subscribe(addr, cb, 0);
462: }
463:
464: public boolean subscribe(MessageAddress addr, Callback cb,
465: long initialInc) {
466: MessageAddress agentId = addr.getPrimary();
467: synchronized (subs) {
468: Set s = (Set) subs.get(agentId);
469: if (s == null) {
470: s = new IdentityHashSet();
471: subs.put(agentId, s);
472: }
473: if (!s.add(cb)) {
474: // already have this subscription
475: return false;
476: }
477: return Incarnation.this .subscribe(agentId, cb,
478: initialInc);
479: }
480: }
481:
482: public boolean unsubscribe(MessageAddress addr, Callback cb) {
483: MessageAddress agentId = addr.getPrimary();
484: synchronized (subs) {
485: Set s = (Set) subs.get(agentId);
486: if (s == null) {
487: return false;
488: }
489: if (!s.remove(cb)) {
490: return false;
491: }
492: return Incarnation.this .unsubscribe(agentId, cb);
493: }
494: }
495:
496: private void unsubscribeAll() {
497: synchronized (subs) {
498: for (Iterator iter = subs.entrySet().iterator(); iter
499: .hasNext();) {
500: Map.Entry me = (Map.Entry) iter.next();
501: MessageAddress agentId = (MessageAddress) me
502: .getKey();
503: Set s = (Set) me.getValue();
504: for (Iterator i2 = s.iterator(); i2.hasNext();) {
505: Callback cb = (Callback) i2.next();
506: Incarnation.this .unsubscribe(agentId, cb);
507: }
508: }
509: subs.clear();
510: }
511: }
512: }
513: }
514:
515: // replace with a Latch?
516: private class BlockingWPCallback implements Callback {
517: AddressEntry entry;
518: boolean completed = false;
519:
520: public void execute(Response response) {
521: completed = true;
522: if (response.isSuccess()) {
523: if (log.isDetailEnabled()) {
524: log.detail("wp response: " + response);
525: }
526: entry = ((Response.Get) response).getAddressEntry();
527: } else {
528: if (log.isDetailEnabled()) {
529: log.detail("wp error: " + response);
530: }
531: }
532: }
533: }
534:
535: }
|