001: /*
002: * <copyright>
003: *
004: * Copyright 1997-2004 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026:
027: package org.cougaar.core.agent;
028:
029: import java.io.Serializable;
030: import java.net.InetAddress;
031: import java.net.URI;
032: import java.util.ArrayList;
033: import java.util.List;
034: import org.cougaar.bootstrap.SystemProperties;
035: import org.cougaar.core.blackboard.BlackboardForAgent;
036: import org.cougaar.core.component.Component;
037: import org.cougaar.core.component.ServiceBroker;
038: import org.cougaar.core.component.ServiceRevokedListener;
039: import org.cougaar.core.component.ServiceProvider;
040: import org.cougaar.core.mts.MessageAddress;
041: import org.cougaar.core.node.NodeIdentificationService;
042: import org.cougaar.core.persist.PersistenceClient;
043: import org.cougaar.core.persist.PersistenceIdentity;
044: import org.cougaar.core.persist.PersistenceService;
045: import org.cougaar.core.persist.RehydrationData;
046: import org.cougaar.core.service.AgentIdentificationService;
047: import org.cougaar.core.service.LoggingService;
048: import org.cougaar.core.service.wp.AddressEntry;
049: import org.cougaar.core.service.wp.Callback;
050: import org.cougaar.core.service.wp.Response;
051: import org.cougaar.core.service.wp.WhitePagesService;
052: import org.cougaar.util.GenericStateModelAdapter;
053:
054: /**
055: * This component binds the agent's "version://" and
056: * "node://" information in the white pages, and preserves the
057: * incarnation across agent moves.
058: * <p>
059: * @property org.cougaar.core.node.SkipReconciliation
060: * If enabled, rehydrating agents will not run RestartLPs to do
061: * reconciliation with other agents, when the Node starts up.
062: * Use this (with caution) to quickly rehydate a society from
063: * a consistent set of (quiescent) persistence snapshots. This
064: * flag will be cleared once the Node and all its agents have loaded,
065: * so that later added agents will do reconciliation. Default <em>false</em>.
066: */
067: public final class Topology extends GenericStateModelAdapter implements
068: Component {
069:
070: private ServiceBroker sb;
071:
072: private LoggingService log;
073: private WhitePagesService wps;
074:
075: private PersistenceService ps;
076: private PersistenceClient pc;
077:
078: private MessageAddress localAgent;
079: private MessageAddress localNode;
080: private boolean isNode;
081:
082: // incarnation for this agent, which is incremented every time
083: // this agent restarts but not when the agent moves.
084: private long incarnation;
085:
086: // move identity of this agent, which is incremented every time this
087: // agent moves.
088: private long moveId;
089:
090: private TopologyServiceProvider tsp;
091:
092: private boolean needsRestart = true;
093:
094: private static boolean skipReconciliation;
095:
096: public void setServiceBroker(ServiceBroker sb) {
097: this .sb = sb;
098: }
099:
100: public void load() {
101: super .load();
102:
103: log = (LoggingService) sb.getService(this ,
104: LoggingService.class, null);
105:
106: // get our local agent's address
107: AgentIdentificationService ais = (AgentIdentificationService) sb
108: .getService(this , AgentIdentificationService.class,
109: null);
110: if (ais != null) {
111: localAgent = ais.getMessageAddress();
112: sb.releaseService(this , AgentIdentificationService.class,
113: ais);
114: }
115:
116: // get our local node's address
117: NodeIdentificationService nis = (NodeIdentificationService) sb
118: .getService(this , NodeIdentificationService.class, null);
119: if (nis != null) {
120: localNode = nis.getMessageAddress();
121: sb.releaseService(this , NodeIdentificationService.class,
122: nis);
123: }
124:
125: boolean isNode = (localAgent == null || localAgent
126: .equals(localNode));
127:
128: // get wp
129: wps = (WhitePagesService) sb.getService(this ,
130: WhitePagesService.class, null);
131: if (wps == null) {
132: throw new RuntimeException(
133: "Unable to obtain WhitePagesService");
134: }
135:
136: register_persistence();
137:
138: // get mobile state
139: Object o = rehydrate();
140: if (o instanceof TopologyState) {
141: TopologyState ts = (TopologyState) o;
142: needsRestart = false;
143: incarnation = ts.incarnation;
144: // ignore moveId, maybe use someday
145: }
146: o = null;
147:
148: updateNaming(true);
149:
150: tsp = new TopologyServiceProvider();
151: sb.addService(TopologyService.class, tsp);
152:
153: if (isNode) {
154: // we haven't added our child agents yet, so we can set
155: // our skip-reconcile flag here or earlier.
156: initializeSkipReconciliation(log);
157: }
158: }
159:
160: public void start() {
161: super .start();
162: // do restart reconciliation if necessary
163: reconcileBlackboard();
164:
165: if (isNode) {
166: // we've added our initial agents in AgentLoader's "load()",
167: // so now we clear our skip-reconcile flag. This will make
168: // dynamically added agents do the usual reconcile.
169: clearSkipReconciliation(log);
170: }
171: }
172:
173: public void unload() {
174: super .unload();
175:
176: if (tsp != null) {
177: sb.revokeService(TopologyService.class, tsp);
178: tsp = null;
179: }
180:
181: unregister_persistence();
182:
183: updateNaming(false);
184:
185: if (wps != null) {
186: sb.releaseService(this , WhitePagesService.class, wps);
187: wps = null;
188: }
189: }
190:
191: private Object captureState() {
192: if (getModelState() == ACTIVE) {
193: if (log.isDebugEnabled()) {
194: log.debug("Ignoring persist while active");
195: }
196: return null;
197: }
198: return new TopologyState(localAgent, incarnation, moveId);
199: }
200:
201: private void register_persistence() {
202: // get persistence
203: pc = new PersistenceClient() {
204: public PersistenceIdentity getPersistenceIdentity() {
205: String id = getClass().getName();
206: return new PersistenceIdentity(id);
207: }
208:
209: public List getPersistenceData() {
210: Object o = captureState();
211: // must return mutable list!
212: List l = new ArrayList(1);
213: l.add(o);
214: return l;
215: }
216: };
217: ps = (PersistenceService) sb.getService(pc,
218: PersistenceService.class, null);
219: }
220:
221: private void unregister_persistence() {
222: if (ps != null) {
223: sb.releaseService(pc, PersistenceService.class, ps);
224: ps = null;
225: pc = null;
226: }
227: }
228:
229: private Object rehydrate() {
230: RehydrationData rd = ps.getRehydrationData();
231: if (rd == null) {
232: if (log.isInfoEnabled()) {
233: log.info("No rehydration data found");
234: }
235: return null;
236: }
237:
238: List l = rd.getObjects();
239: rd = null;
240: int lsize = (l == null ? 0 : l.size());
241: if (lsize < 1) {
242: if (log.isInfoEnabled()) {
243: log.info("Invalid rehydration list? " + l);
244: }
245: return null;
246: }
247: Object o = l.get(0);
248: if (o == null) {
249: if (log.isInfoEnabled()) {
250: log.info("Null rehydration state?");
251: }
252: return null;
253: }
254:
255: if (log.isInfoEnabled()) {
256: log.info("Found rehydrated state");
257: if (log.isDetailEnabled()) {
258: log.detail("state is " + o);
259: }
260: }
261:
262: return o;
263: }
264:
265: private static void initializeSkipReconciliation(LoggingService log) {
266: // Set a static flag for whether to skip reconciliation,
267: // based on a -D argument
268: skipReconciliation = SystemProperties
269: .getBoolean("org.cougaar.core.node.SkipReconciliation");
270: if (log.isDebugEnabled()) {
271: log.debug("Before loads, SkipReconcile set to "
272: + skipReconciliation);
273: }
274: }
275:
276: private static void clearSkipReconciliation(LoggingService log) {
277: // Clear the static flag for whether to skip reconciliation
278: // This way, mobile agents, or, during a run, restarting an
279: // agent, will not suppress reconciliation
280: skipReconciliation = false;
281: if (log.isDebugEnabled()) {
282: log.debug("After loads, SkipReconcile set to "
283: + skipReconciliation);
284: }
285: }
286:
287: /**
288: * Used on startup to enquire whether we should skip
289: * doing reconciliation (as in when an entire society was
290: * stopped and is being restored from a consistent persistence
291: * snapshot).
292: */
293: private static boolean shouldSkipReconciliation() {
294: return skipReconciliation;
295: }
296:
297: /**
298: * The local agent has restarted.
299: */
300: private void reconcileBlackboard() {
301: if (!(needsRestart)) {
302: if (log.isInfoEnabled()) {
303: log
304: .info("No restart blackboard synchronization required");
305: }
306: return;
307: }
308:
309: needsRestart = false;
310:
311: // Check the static flag to see if we should actually
312: // ask the BBoard to ask the Domains to run restart LPs on the agent.
313: // This forces reconciliation. In some cases, you do not want to run
314: // the restart LPs, because that takes time, and you _know_ that is
315: // not needed. Note that this is dangerous in general.
316: // For now, do this here I guess
317: if (shouldSkipReconciliation()) {
318: if (log.isInfoEnabled()) {
319: log.info("Restarting but NOT RECONCILING!");
320: }
321: return;
322: }
323:
324: if (log.isInfoEnabled()) {
325: log.info("Restarting, synchronizing blackboards");
326: }
327:
328: BlackboardForAgent bb = (BlackboardForAgent) sb.getService(
329: this , BlackboardForAgent.class, null);
330: if (bb == null) {
331: throw new RuntimeException(
332: "Unable to find BlackboardForAgent");
333: }
334: try {
335: // restart this agent. The "null" is shorthand for
336: // "all agents that are not this agent".
337: bb.restartAgent(null);
338: } catch (Exception e) {
339: if (log.isInfoEnabled()) {
340: log.info("Restart failed", e);
341: }
342: }
343: sb.releaseService(this , BlackboardForAgent.class, bb);
344: bb = null;
345: }
346:
347: private void updateNaming(boolean bind) {
348: String localHost;
349: try {
350: InetAddress localAddr = InetAddress.getLocalHost();
351: localHost = localAddr.getHostName();
352: } catch (Exception e) {
353: localHost = "?";
354: }
355:
356: String identifier = localAgent.getAddress();
357:
358: // dummy callback, probably should pay more attention to errors
359: final LoggingService ls = log;
360: Callback callback = new Callback() {
361: public void execute(Response res) {
362: if (res.isSuccess()) {
363: if (ls.isInfoEnabled()) {
364: ls.info("WP Response: " + res);
365: }
366: } else {
367: ls.error("WP Error: " + res);
368: }
369: }
370: };
371:
372: // register WP version numbers
373: if (log.isInfoEnabled()) {
374: log.info("Updating white pages");
375: }
376: if (bind) {
377: if (incarnation == 0) {
378: incarnation = System.currentTimeMillis();
379: }
380: // ignore prior moveId
381: moveId = System.currentTimeMillis();
382: }
383:
384: // incarnation number
385: URI versionURI = URI.create("version:///" + incarnation + "/"
386: + moveId);
387: AddressEntry versionEntry = AddressEntry.getAddressEntry(
388: identifier, "version", versionURI);
389:
390: // node location
391: URI nodeURI = URI.create("node://" + localHost + "/"
392: + localNode.getAddress());
393: AddressEntry nodeEntry = AddressEntry.getAddressEntry(
394: identifier, "topology", nodeURI);
395:
396: try {
397: if (bind) {
398: wps.rebind(versionEntry, callback);
399: wps.rebind(nodeEntry, callback);
400: } else {
401: wps.unbind(nodeEntry, callback);
402: wps.unbind(versionEntry, callback);
403: }
404: } catch (Exception e) {
405: String msg = "Unable to " + (bind ? "" : "un")
406: + "bind in naming service";
407: if (bind) {
408: throw new RuntimeException(msg, e);
409: } else {
410: ls.error(msg, e);
411: }
412: }
413: }
414:
415: private final class TopologyServiceProvider implements
416: ServiceProvider {
417: private final TopologyService ts;
418:
419: public TopologyServiceProvider() {
420: ts = new TopologyService() {
421: public long getIncarnationNumber() {
422: return incarnation;
423: }
424:
425: public long getMoveNumber() {
426: return moveId;
427: }
428: };
429: }
430:
431: public Object getService(ServiceBroker sb, Object requestor,
432: Class serviceClass) {
433: if (TopologyService.class.isAssignableFrom(serviceClass)) {
434: return ts;
435: } else {
436: return null;
437: }
438: }
439:
440: public void releaseService(ServiceBroker sb, Object requestor,
441: Class serviceClass, Object service) {
442: }
443: }
444:
445: private static final class TopologyState implements Serializable {
446:
447: public final MessageAddress agentId;
448: public final long incarnation;
449: public final long moveId;
450:
451: public TopologyState(MessageAddress agentId, long incarnation,
452: long moveId) {
453: this .agentId = agentId;
454: this .incarnation = incarnation;
455: this .moveId = moveId;
456: if (agentId == null) {
457: throw new IllegalArgumentException("null agentId");
458: }
459: }
460:
461: public String toString() {
462: return "Agent " + agentId + ", incarnation " + incarnation
463: + ", moveId " + moveId;
464: }
465:
466: private static final long serialVersionUID = 1890394862083942083L;
467: }
468: }
|