001: /*
002: * <copyright>
003: *
004: * Copyright 1997-2004 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026:
027: package org.cougaar.core.agent;
028:
029: import java.net.URI;
030: import java.util.ArrayList;
031: import java.util.Collections;
032: import java.util.HashMap;
033: import java.util.Iterator;
034: import java.util.List;
035: import java.util.Map;
036: import java.util.Set;
037: import org.cougaar.core.blackboard.BlackboardForAgent;
038: import org.cougaar.core.component.Component;
039: import org.cougaar.core.component.ServiceBroker;
040: import org.cougaar.core.component.ServiceProvider;
041: import org.cougaar.core.component.ServiceRevokedListener;
042: import org.cougaar.core.logging.LoggingServiceWithPrefix;
043: import org.cougaar.core.mts.MessageAddress;
044: import org.cougaar.core.persist.PersistenceClient;
045: import org.cougaar.core.persist.PersistenceIdentity;
046: import org.cougaar.core.persist.PersistenceService;
047: import org.cougaar.core.persist.RehydrationData;
048: import org.cougaar.core.service.AgentIdentificationService;
049: import org.cougaar.core.service.IncarnationService;
050: import org.cougaar.core.service.LoggingService;
051: import org.cougaar.core.service.ThreadService;
052: import org.cougaar.core.thread.Schedulable;
053: import org.cougaar.core.thread.SchedulableStatus;
054: import org.cougaar.util.GenericStateModelAdapter;
055:
056: /**
057: * This component watches blackboard message traffic and periodically
058: * checks the white pages for agent restarts, which require
059: * blackboard-to-blackboard state reconciliation.
060: */
061: public final class Reconcile extends GenericStateModelAdapter implements
062: Component {
063: private ServiceBroker sb;
064:
065: private LoggingService log;
066: private MessageAddress localAgent;
067: private IncarnationService incarnationService;
068:
069: private PersistenceService ps;
070: private PersistenceClient pc;
071:
072: private ReconcileAddressWatcherServiceProvider rawsp;
073: private ReconcileEnablerServiceProvider resp;
074:
075: private BlackboardForAgent bb;
076:
077: private Schedulable reconcileThread;
078: private final List reconcileTmp = new ArrayList();
079:
080: private final List queue = new ArrayList();
081: private boolean active;
082:
083: private final IncarnationService.Callback cb = new ReconcileCallback();
084:
085: public void setServiceBroker(ServiceBroker sb) {
086: this .sb = sb;
087: }
088:
089: public void load() {
090: super .load();
091:
092: localAgent = find_local_agent();
093:
094: log = (LoggingService) sb.getService(this ,
095: LoggingService.class, null);
096: String prefix = localAgent + ": ";
097: log = LoggingServiceWithPrefix.add(log, prefix);
098:
099: // get incarnation service
100: incarnationService = (IncarnationService) sb.getService(this ,
101: IncarnationService.class, null);
102: if (incarnationService == null) {
103: throw new RuntimeException(
104: "Unable to obtain IncarnationService");
105: }
106:
107: // create queue thread
108: Runnable reconcileRunner = new Runnable() {
109: public void run() {
110: reconcileNow();
111: }
112: };
113: ThreadService threadService = (ThreadService) sb.getService(
114: this , ThreadService.class, null);
115: reconcileThread = threadService.getThread(this ,
116: reconcileRunner, "Reconciler");
117: sb.releaseService(this , ThreadService.class, threadService);
118:
119: register_persistence();
120:
121: // get mobile state, to make sure we don't miss a reconcile
122: // and avoid unnecessary reconciles
123: Object o = rehydrate();
124: if (o instanceof Map) {
125: resubscribe((Map) o);
126: }
127: o = null;
128:
129: // create "recordAddress" access
130: rawsp = new ReconcileAddressWatcherServiceProvider();
131: sb.addService(ReconcileAddressWatcherService.class, rawsp);
132:
133: // create "enableReconcile" to enable reconcile processing
134: // once the agent has loaded, and "disableReconcile" to
135: // disable reconciles when the agent is persisting or unloading.
136: resp = new ReconcileEnablerServiceProvider();
137: sb.addService(ReconcileEnablerService.class, resp);
138: }
139:
140: private MessageAddress find_local_agent() {
141: AgentIdentificationService ais = (AgentIdentificationService) sb
142: .getService(this , AgentIdentificationService.class,
143: null);
144: if (ais == null) {
145: return null;
146: }
147: MessageAddress ret = ais.getMessageAddress();
148: sb.releaseService(this , AgentIdentificationService.class, ais);
149: return ret;
150: }
151:
152: public void start() {
153: super .start();
154: // called later via ReconcileEnablerService:
155: //enableReconcile();
156: }
157:
158: public void suspend() {
159: super .suspend();
160: // called earlier via ReconcileEnablerService:
161: //disableReconcile();
162: }
163:
164: public void resume() {
165: super .resume();
166: // called later via ReconcileEnablerService:
167: //enableReconcile();
168: }
169:
170: public void stop() {
171: super .stop();
172: // called earlier via ReconcileEnablerService:
173: //disableReconcile();
174: }
175:
176: public void unload() {
177: super .unload();
178:
179: if (resp != null) {
180: sb.revokeService(ReconcileEnablerService.class, resp);
181: resp = null;
182: }
183: if (rawsp != null) {
184: sb.revokeService(ReconcileAddressWatcherService.class,
185: rawsp);
186: rawsp = null;
187: }
188:
189: unregister_persistence();
190:
191: if (incarnationService != null) {
192: // release service, unsubscribes our callbacks:
193: sb.releaseService(this , IncarnationService.class,
194: incarnationService);
195: incarnationService = null;
196: }
197: }
198:
199: private Object captureState() {
200: if (getModelState() == ACTIVE) {
201: if (log.isDetailEnabled()) {
202: log.detail("ignoring persist while active");
203: }
204: return null;
205: }
206:
207: synchronized (queue) {
208: if (active && log.isErrorEnabled()) {
209: log.error("Attempting to captureState while active!");
210: }
211: }
212:
213: // capture our Map<agentId, Long> state
214: //
215: // TODO: our active flag is false, so we won't reconcile while
216: // we're capturing our state. However, we don't actually lockout
217: // the incarnationChanged callbacks, so there's a small risk that
218: // our queue is not empty, meaning that we'd miss a reconcile,
219: // but we'd rather not block the incarnationService.
220: Map ret;
221: // get Map<agentId, Set<Callback>>
222: Map subs = incarnationService.getSubscriptions();
223: if (subs == null || subs.isEmpty()) {
224: ret = Collections.EMPTY_MAP;
225: } else {
226: // for all agentIds, get our inc
227: ret = new HashMap(subs.size());
228: for (Iterator iter = subs.keySet().iterator(); iter
229: .hasNext();) {
230: MessageAddress agentId = (MessageAddress) iter.next();
231: long inc = incarnationService.getIncarnation(agentId);
232: ret.put(agentId, new Long(inc));
233: }
234: }
235: if (log.isDebugEnabled()) {
236: log.debug("Captured state[" + ret.size() + "]: " + ret);
237: }
238: return ret;
239: }
240:
241: private void register_persistence() {
242: // get persistence
243: pc = new PersistenceClient() {
244: public PersistenceIdentity getPersistenceIdentity() {
245: String id = getClass().getName();
246: return new PersistenceIdentity(id);
247: }
248:
249: public List getPersistenceData() {
250: Object o = captureState();
251: // must return mutable list!
252: List l = new ArrayList(1);
253: l.add(o);
254: return l;
255: }
256: };
257: ps = (PersistenceService) sb.getService(pc,
258: PersistenceService.class, null);
259: }
260:
261: private void unregister_persistence() {
262: if (ps != null) {
263: sb.releaseService(pc, PersistenceService.class, ps);
264: ps = null;
265: pc = null;
266: }
267: }
268:
269: private Object rehydrate() {
270: RehydrationData rd = ps.getRehydrationData();
271: if (rd == null) {
272: if (log.isInfoEnabled()) {
273: log.info("No rehydration data found");
274: }
275: return null;
276: }
277:
278: List l = rd.getObjects();
279: rd = null;
280: int lsize = (l == null ? 0 : l.size());
281: if (lsize < 1) {
282: if (log.isInfoEnabled()) {
283: log.info("Invalid rehydration list? " + l);
284: }
285: return null;
286: }
287: Object o = l.get(0);
288: if (o == null) {
289: if (log.isInfoEnabled()) {
290: log.info("Null rehydration state?");
291: }
292: return null;
293: }
294:
295: if (log.isInfoEnabled()) {
296: log.info("Found rehydrated state");
297: if (log.isDetailEnabled()) {
298: log.detail("state is " + o);
299: }
300: }
301:
302: return o;
303: }
304:
305: private void resubscribe(Map m) {
306: if (m == null || m.isEmpty()) {
307: return;
308: }
309: for (Iterator iter = m.entrySet().iterator(); iter.hasNext();) {
310: Map.Entry me = (Map.Entry) iter.next();
311: MessageAddress agentId = (MessageAddress) me.getKey();
312: Long l = (Long) me.getValue();
313: long inc = l.longValue();
314: boolean b = incarnationService.subscribe(agentId, cb, inc);
315: if (b) {
316: if (log.isDebugEnabled()) {
317: log.debug("rehydrate-recordAddress(" + agentId
318: + ", " + inc + ")");
319: }
320: } else {
321: if (log.isWarnEnabled()) {
322: log.warn("subscribe(" + agentId + ", " + cb + ", "
323: + inc + ") returned false");
324: }
325: }
326: }
327: }
328:
329: private void recordAddress(MessageAddress agentId) {
330: if (incarnationService.subscribe(agentId, cb)
331: && log.isDebugEnabled()) {
332: log.debug("recordAddress(" + agentId + ")");
333: }
334: }
335:
336: private void enableReconcile() {
337: if (bb == null) {
338: bb = (BlackboardForAgent) sb.getService(this ,
339: BlackboardForAgent.class, null);
340: if (bb == null) {
341: throw new RuntimeException(
342: "Unable to obtain BlackboardForAgent");
343: }
344: }
345:
346: synchronized (queue) {
347: if (active) {
348: return;
349: }
350: active = true;
351: if (log.isDebugEnabled()) {
352: log.debug("Enabled");
353: }
354: if (!queue.isEmpty()) {
355: reconcileThread.start();
356: }
357: }
358: }
359:
360: private void disableReconcile() {
361: synchronized (queue) {
362: if (!active) {
363: return;
364: }
365: active = false;
366: if (log.isDebugEnabled()) {
367: log.debug("Disabled");
368: }
369: }
370:
371: if (bb != null) {
372: sb.releaseService(this , BlackboardForAgent.class, bb);
373: bb = null;
374: }
375: }
376:
377: private void reconcileLater(MessageAddress agentId) {
378: synchronized (queue) {
379: queue.add(agentId);
380: if (active) {
381: reconcileThread.start();
382: }
383: }
384: }
385:
386: private void reconcileNow() {
387: synchronized (queue) {
388: if (!active || queue.isEmpty()) {
389: return;
390: }
391: reconcileTmp.addAll(queue);
392: queue.clear();
393: }
394: for (int i = 0, n = reconcileTmp.size(); i < n; i++) {
395: MessageAddress agentId = (MessageAddress) reconcileTmp
396: .get(i);
397: reconcileNow(agentId);
398: }
399: reconcileTmp.clear();
400: }
401:
402: private void reconcileNow(MessageAddress agentId) {
403: if (log.isInfoEnabled()) {
404: log.info("Detected (re)start of agent " + agentId
405: + ", synchronizing blackboards");
406: }
407: bb.restartAgent(agentId);
408: }
409:
410: private class ReconcileCallback implements
411: IncarnationService.Callback, Comparable {
412: public void incarnationChanged(MessageAddress addr, long inc) {
413: // this could block, so run it in a separate thread
414: Reconcile.this .reconcileLater(addr);
415: }
416:
417: public int compareTo(Object o) {
418: // make me last, so MTS links are reset before I reconcile!
419: return (o instanceof ReconcileCallback ? 0 : -1);
420: }
421:
422: public String toString() {
423: return "(reconcile for " + localAgent + ")";
424: }
425: }
426:
427: private final class ReconcileAddressWatcherServiceProvider
428: implements ServiceProvider {
429: private final ReconcileAddressWatcherService raws;
430:
431: public ReconcileAddressWatcherServiceProvider() {
432: raws = new ReconcileAddressWatcherService() {
433: public void sentMessageTo(MessageAddress addr) {
434: recordAddress(addr);
435: }
436:
437: public void receivedMessageFrom(MessageAddress addr) {
438: recordAddress(addr);
439: }
440: };
441: }
442:
443: public Object getService(ServiceBroker sb, Object requestor,
444: Class serviceClass) {
445: if (ReconcileAddressWatcherService.class
446: .isAssignableFrom(serviceClass)) {
447: return raws;
448: } else {
449: return null;
450: }
451: }
452:
453: public void releaseService(ServiceBroker sb, Object requestor,
454: Class serviceClass, Object service) {
455: }
456: }
457:
458: private final class ReconcileEnablerServiceProvider implements
459: ServiceProvider {
460: private final ReconcileEnablerService res;
461:
462: public ReconcileEnablerServiceProvider() {
463: res = new ReconcileEnablerService() {
464: public void startTimer() {
465: enableReconcile();
466: }
467:
468: public void stopTimer() {
469: disableReconcile();
470: }
471: };
472: }
473:
474: public Object getService(ServiceBroker sb, Object requestor,
475: Class serviceClass) {
476: if (ReconcileEnablerService.class
477: .isAssignableFrom(serviceClass)) {
478: return res;
479: } else {
480: return null;
481: }
482: }
483:
484: public void releaseService(ServiceBroker sb, Object requestor,
485: Class serviceClass, Object service) {
486: }
487: }
488: }
|