0001: /*
0002: * <copyright>
0003: *
0004: * Copyright 2002-2004 BBNT Solutions, LLC
0005: * under sponsorship of the Defense Advanced Research Projects
0006: * Agency (DARPA).
0007: *
0008: * You can redistribute this software and/or modify it under the
0009: * terms of the Cougaar Open Source License as published on the
0010: * Cougaar Open Source Website (www.cougaar.org).
0011: *
0012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
0013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
0014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
0015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
0016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
0017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
0018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
0019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
0020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
0021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
0022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
0023: *
0024: * </copyright>
0025: */
0026: package org.cougaar.core.examples.mobility.step;
0027:
0028: import java.net.URI;
0029: import java.util.ArrayList;
0030: import java.util.Collection;
0031: import java.util.Enumeration;
0032: import java.util.HashMap;
0033: import java.util.Iterator;
0034: import java.util.List;
0035: import java.util.Map;
0036: import org.cougaar.core.agent.service.alarm.Alarm;
0037: import org.cougaar.core.blackboard.CollectionSubscription;
0038: import org.cougaar.core.blackboard.IncrementalSubscription;
0039: import org.cougaar.core.component.ServiceBroker;
0040: import org.cougaar.core.examples.mobility.ldm.Step;
0041: import org.cougaar.core.examples.mobility.ldm.StepOptions;
0042: import org.cougaar.core.examples.mobility.ldm.StepStatus;
0043: import org.cougaar.core.mobility.Ticket;
0044: import org.cougaar.core.mobility.ldm.MobilityFactory;
0045: import org.cougaar.core.mobility.ldm.MoveAgent;
0046: import org.cougaar.core.mts.MessageAddress;
0047: import org.cougaar.core.node.NodeIdentificationService;
0048: import org.cougaar.core.plugin.ComponentPlugin;
0049: import org.cougaar.core.service.AgentIdentificationService;
0050: import org.cougaar.core.service.AlarmService;
0051: import org.cougaar.core.service.BlackboardService;
0052: import org.cougaar.core.service.DomainService;
0053: import org.cougaar.core.service.LoggingService;
0054: import org.cougaar.core.service.wp.AddressEntry;
0055: import org.cougaar.core.service.wp.WhitePagesService;
0056: import org.cougaar.core.util.UID;
0057: import org.cougaar.core.util.UniqueObject;
0058: import org.cougaar.util.UnaryPredicate;
0059:
0060: /**
0061: * This plugin executes Steps where the step option's
0062: * target is the local agent.
0063: * <p>
0064: * MoveAgent objects are created by this plugin.
0065: */
0066: public class StepRunnerPlugin extends ComponentPlugin {
0067:
0068: private MessageAddress todd;
0069: private MessageAddress agentId;
0070: private MessageAddress nodeId;
0071:
0072: private IncrementalSubscription stepSub;
0073: private IncrementalSubscription moveSub;
0074:
0075: private LoggingService log;
0076: private DomainService domain;
0077: private WhitePagesService wps;
0078:
0079: private MobilityFactory mobilityFactory;
0080:
0081: // Non-persisted cache of blackboard objects, for quick access.
0082: // On rehydration it is fully reconstructed from the blackboard.
0083: private Map idToEntry = new HashMap(13);
0084:
0085: // pending alarms
0086: //
0087: // FIXME could optimize to use treeset, keep fewer alarms, etc
0088: // for now this is workable
0089: private List pendingAlarms = new ArrayList(13);
0090:
0091: public void load() {
0092: super .load();
0093:
0094: // get the logger
0095: log = (LoggingService) getServiceBroker().getService(this ,
0096: LoggingService.class, null);
0097: if (log == null) {
0098: log = LoggingService.NULL;
0099: }
0100:
0101: // get the agentId
0102: AgentIdentificationService agentIdService = (AgentIdentificationService) getServiceBroker()
0103: .getService(this , AgentIdentificationService.class,
0104: null);
0105: if (agentIdService == null) {
0106: throw new RuntimeException(
0107: "Unable to obtain agent-id service");
0108: }
0109: this .agentId = agentIdService.getMessageAddress();
0110: getServiceBroker().releaseService(this ,
0111: AgentIdentificationService.class, agentIdService);
0112: if (agentId == null) {
0113: throw new RuntimeException("Unable to obtain agent id");
0114: }
0115: todd = agentId;
0116:
0117: // get the nodeId
0118: NodeIdentificationService nodeIdService = (NodeIdentificationService) getServiceBroker()
0119: .getService(this , NodeIdentificationService.class, null);
0120: if (nodeIdService == null) {
0121: throw new RuntimeException(
0122: "Unable to obtain node-id service");
0123: }
0124: this .nodeId = nodeIdService.getMessageAddress();
0125: getServiceBroker().releaseService(this ,
0126: NodeIdentificationService.class, nodeIdService);
0127: if (nodeId == null) {
0128: throw new RuntimeException("Unable to obtain node id");
0129: }
0130:
0131: // get the mobility domain and factory
0132: this .domain = (DomainService) getServiceBroker().getService(
0133: this , DomainService.class, null);
0134: if (domain == null) {
0135: throw new RuntimeException(
0136: "Unable to obtain domain service");
0137: }
0138: this .mobilityFactory = (MobilityFactory) domain
0139: .getFactory("mobility");
0140: if (mobilityFactory == null) {
0141: throw new RuntimeException(
0142: "Mobility factory (and domain) not enabled");
0143: }
0144:
0145: // get the white pages service
0146: this .wps = (WhitePagesService) getServiceBroker().getService(
0147: this , WhitePagesService.class, null);
0148: if (wps == null) {
0149: throw new RuntimeException(
0150: "Unable to obtain white pages service");
0151: }
0152:
0153: if (log.isDebugEnabled()) {
0154: log.debug(todd + "Loaded");
0155: }
0156: }
0157:
0158: public void unload() {
0159: if (wps != null) {
0160: getServiceBroker().releaseService(this ,
0161: WhitePagesService.class, wps);
0162: wps = null;
0163: }
0164: if (domain != null) {
0165: getServiceBroker().releaseService(this ,
0166: DomainService.class, domain);
0167: domain = null;
0168: }
0169: if ((log != null) && (log != LoggingService.NULL)) {
0170: getServiceBroker().releaseService(this ,
0171: LoggingService.class, log);
0172: log = LoggingService.NULL;
0173: }
0174: super .unload();
0175: }
0176:
0177: protected void setupSubscriptions() {
0178: // subscribe to steps with a target matching this agent
0179: UnaryPredicate stepPred = createStepPredicate(agentId);
0180: stepSub = (IncrementalSubscription) blackboard
0181: .subscribe(stepPred);
0182:
0183: // subscribe to our own move requests
0184: UnaryPredicate movePred = createMovePredicate(agentId);
0185: moveSub = (IncrementalSubscription) blackboard
0186: .subscribe(movePred);
0187:
0188: if (blackboard.didRehydrate()) {
0189: // recreate cache from blackboard
0190: recreateEntries();
0191: } else {
0192: // create new cache
0193: }
0194: }
0195:
0196: protected void execute() {
0197: if (log.isDebugEnabled()) {
0198: log.debug(todd + "Execute");
0199: }
0200:
0201: // watch steps
0202: if (stepSub.hasChanged()) {
0203: // added steps
0204: Enumeration en = stepSub.getAddedList();
0205: while (en.hasMoreElements()) {
0206: Step step = (Step) en.nextElement();
0207: StepStatus status = step.getStatus();
0208: // validate step status
0209: if (status.getState() != StepStatus.UNSEEN) {
0210: if (log.isErrorEnabled()) {
0211: log.error(todd + "Newly added step "
0212: + step.getUID()
0213: + " with prior status: " + status);
0214: }
0215: }
0216: addStep(step);
0217: }
0218: // ignore changes: only this plugin can modify steps
0219: // removed steps
0220: en = stepSub.getRemovedList();
0221: while (en.hasMoreElements()) {
0222: Step s = (Step) en.nextElement();
0223: removeStep(s);
0224: }
0225: }
0226:
0227: // watch move-reqs
0228: if (moveSub.hasChanged()) {
0229: // ignore adds: this plugin created them
0230: // changes fill in step details
0231: Enumeration en = moveSub.getChangedList();
0232: while (en.hasMoreElements()) {
0233: MoveAgent ma = (MoveAgent) en.nextElement();
0234: updateMove(ma);
0235: }
0236: // ignore removes: this plugin did it
0237: }
0238:
0239: // check alarms
0240: List l = getDueTicketIds();
0241: if (l != null) {
0242: handleDueTicketIds(l);
0243: }
0244: }
0245:
0246: private void recreateEntries() {
0247: // these should be empty
0248: idToEntry.clear();
0249: cancelAlarms();
0250:
0251: // recreate from blackboard contents
0252: Collection c = stepSub.getCollection();
0253: if (!(c.isEmpty())) {
0254: int n = c.size();
0255: Iterator iter = c.iterator();
0256: for (int i = 0; i < n; i++) {
0257: Step step = (Step) iter.next();
0258: switch (step.getStatus().getState()) {
0259: case StepStatus.UNSEEN:
0260: case StepStatus.PAUSED:
0261: case StepStatus.RUNNING:
0262: addStep(step);
0263: break;
0264: case StepStatus.SUCCESS:
0265: case StepStatus.FAILURE:
0266: case StepStatus.TIMEOUT: {
0267: // completed, but we still need it in the
0268: // table for later remote removal
0269: StepOptions options = step.getOptions();
0270: Ticket ticket = options.getTicket();
0271: Object id = ticket.getIdentifier();
0272: MoveAgent ma = findMove(id);
0273: Entry entry = new Entry(ticket, null, step);
0274: entry.moveAgent = ma;
0275: idToEntry.put(id, entry);
0276: if (log.isDebugEnabled()) {
0277: log.debug("Re-added entry "
0278: + id
0279: + " with move "
0280: + ((ma != null) ? ma.getUID()
0281: .toString() : "null"));
0282: }
0283: }
0284: break;
0285: }
0286: }
0287: }
0288:
0289: // remove unreferenced MoveAgent entries?
0290: }
0291:
0292: private void addStep(Step step) {
0293:
0294: // step status is typically UNSEEN
0295: // on restart it can be PAUSED or RUNNING
0296:
0297: StepOptions options = step.getOptions();
0298: Ticket ticket = options.getTicket();
0299: Object id = ticket.getIdentifier();
0300:
0301: StepStatus status = step.getStatus();
0302:
0303: // check for immediate timeout
0304: long nowTime = System.currentTimeMillis();
0305: long pauseTime = options.getPauseTime();
0306: long timeoutTime = options.getTimeoutTime();
0307: if (((timeoutTime > 0) && (timeoutTime <= nowTime))
0308: || ((pauseTime > 0) && (pauseTime <= nowTime) && (status
0309: .getState() != StepStatus.RUNNING))) {
0310:
0311: if (status.getState() == StepStatus.RUNNING) {
0312: // restart from RUNNING
0313: // alarms are not persisted
0314: // remove possible MoveAgent object
0315: MoveAgent ma = findMove(id);
0316: if (ma != null) {
0317: // timeout, don't care if it succeeded!
0318: blackboard.publishRemove(ma);
0319: }
0320: }
0321:
0322: status = new StepStatus(StepStatus.TIMEOUT, ((status
0323: .getState() == StepStatus.RUNNING) ? status
0324: .getStartTime() : nowTime), nowTime, null);
0325: step.setStatus(status);
0326: blackboard.publishChange(step);
0327:
0328: if (log.isDebugEnabled()) {
0329: log.debug(todd + "Step " + step.getUID()
0330: + " added, but already timed out: " + " now ("
0331: + nowTime + "), " + " pause (" + pauseTime
0332: + "), " + " timeout (" + timeoutTime + ")");
0333: }
0334: return;
0335: }
0336:
0337: MessageAddress mobileAgent = ticket.getMobileAgent();
0338: if (mobileAgent == null) {
0339: mobileAgent = agentId;
0340: }
0341:
0342: // check white pages. Okay if agent is not listed.
0343: WPInfo origWPI = lookupInWP(mobileAgent, 30000); // max wait
0344: if (log.isDebugEnabled()) {
0345: log.debug(todd + "Step " + step.getUID()
0346: + " added, wp entry for agent " + mobileAgent
0347: + " is " + origWPI);
0348: }
0349:
0350: // create new Entry for this ticket
0351: // add step to table
0352: Entry entry = new Entry(ticket, origWPI, step);
0353: idToEntry.put(id, entry);
0354:
0355: if ((pauseTime > 0)
0356: && (status.getState() != StepStatus.RUNNING)) {
0357:
0358: // set step status to PAUSED
0359: if (status.getState() != StepStatus.PAUSED) {
0360: status = new StepStatus(StepStatus.PAUSED, -1, -1, null);
0361: step.setStatus(status);
0362: blackboard.publishChange(step);
0363: }
0364:
0365: // create wakeup alarm
0366: MyAlarm alarm = addAlarm(pauseTime, id);
0367: entry.alarm = alarm;
0368:
0369: if (log.isDebugEnabled()) {
0370: log.debug(todd + "Step " + step.getUID()
0371: + " pausing for " + (pauseTime - nowTime)
0372: + " millis (" + pauseTime + " - " + nowTime
0373: + ")");
0374: }
0375: return;
0376: }
0377:
0378: startEntry(entry);
0379: }
0380:
0381: private void startEntry(Entry entry) {
0382: // step already listed in table, and pause-time has passed
0383: Step step = entry.step;
0384:
0385: StepOptions options = step.getOptions();
0386: StepStatus status = step.getStatus();
0387:
0388: Ticket ticket = options.getTicket();
0389: Object id = ticket.getIdentifier();
0390:
0391: if (log.isDebugEnabled()) {
0392: long nowTime = System.currentTimeMillis();
0393: log.debug(todd + "Start step " + step.getUID());
0394: }
0395:
0396: // don't check for a step in progress for this agent; it's
0397: // something we want to test.
0398:
0399: if (status.getState() != StepStatus.RUNNING) {
0400: // create new move request
0401: MoveAgent ma = mobilityFactory.createMoveAgent(ticket);
0402: blackboard.publishAdd(ma);
0403: entry.moveAgent = ma;
0404:
0405: if (log.isDebugEnabled()) {
0406: log.debug(todd + "Created new MoveAgent object "
0407: + ma.getUID() + " for step " + step.getUID());
0408: }
0409:
0410: // transition step status from (UNSEEN|PAUSED) to RUNNING
0411: status = new StepStatus(StepStatus.RUNNING, System
0412: .currentTimeMillis(), -1, null);
0413: step.setStatus(status);
0414: blackboard.publishChange(step);
0415:
0416: if (log.isDebugEnabled()) {
0417: log.debug(todd + "Step " + step.getUID()
0418: + " is now RUNNING");
0419: }
0420: } else {
0421: // find existing move request
0422: MoveAgent ma = findMove(id);
0423: if (ma == null) {
0424: if (log.isErrorEnabled()) {
0425: log
0426: .error("Recreated step "
0427: + step.getUID()
0428: + " but unable to find matching MoveAgent object"
0429: + " with ticket id " + id);
0430: }
0431: } else {
0432: entry.moveAgent = ma;
0433: if (log.isDebugEnabled()) {
0434: log.debug("Recreated step " + step.getUID()
0435: + " found matching MoveAgent object "
0436: + ma.getUID());
0437: }
0438: // need to updateMove now?
0439: }
0440:
0441: if (log.isDebugEnabled()) {
0442: log.debug(todd + "Step " + step.getUID()
0443: + " resumed RUNNING");
0444: }
0445: }
0446:
0447: // start timer
0448: long timeoutTime = options.getTimeoutTime();
0449: if (timeoutTime > 0) {
0450: MyAlarm alarm = addAlarm(timeoutTime, id);
0451: entry.alarm = alarm;
0452:
0453: if (log.isDebugEnabled()) {
0454: long nowTime = System.currentTimeMillis();
0455: log.debug(todd + "Started step " + step.getUID()
0456: + " with timeout in " + (timeoutTime - nowTime)
0457: + " millis (" + timeoutTime + " - " + nowTime
0458: + ")");
0459: }
0460: } else {
0461: if (log.isDebugEnabled()) {
0462: log
0463: .debug(todd + "No timeout for step "
0464: + step.getUID());
0465: }
0466: }
0467: }
0468:
0469: private void timeoutEntry(Entry entry) {
0470: // entry has been removed from table
0471:
0472: Step step = entry.step;
0473: StepStatus status = step.getStatus();
0474:
0475: // transition step status from RUNNING to TIMEOUT
0476: long nowTime = System.currentTimeMillis();
0477: status = new StepStatus(StepStatus.TIMEOUT, status
0478: .getStartTime(), nowTime, null);
0479: step.setStatus(status);
0480: blackboard.publishChange(step);
0481:
0482: // alarm should be expired
0483: MyAlarm alarm = entry.alarm;
0484: if (alarm == null) {
0485: if (step.getOptions().getTimeoutTime() > 0) {
0486: if (log.isErrorEnabled()) {
0487: log.error(todd + "Timeout with no alarm: " + step);
0488: }
0489: }
0490: } else if (!(alarm.hasExpired())) {
0491: if (log.isErrorEnabled()) {
0492: log.error(todd + "Timeout with non-expired alarm: "
0493: + alarm);
0494: }
0495: alarm.cancel();
0496: }
0497:
0498: // remove move-agent object
0499: MoveAgent ma = entry.moveAgent;
0500: if (ma != null) {
0501: if (ma.getStatus() != null) {
0502: if (log.isErrorEnabled()) {
0503: log.error(todd + "Timeout with move-agent status: "
0504: + ma);
0505: }
0506: }
0507: if (log.isInfoEnabled()) {
0508: log
0509: .info(todd
0510: + "Removing MoveAgent object "
0511: + ma.getUID()
0512: + " after move timeout. If the move later completes, the"
0513: + " MoveAgentPlugin may complain.");
0514: }
0515: blackboard.publishRemove(ma);
0516: }
0517:
0518: if (log.isDebugEnabled()) {
0519: log.debug(todd + "Timed out on step " + step.getUID()
0520: + " at time " + nowTime);
0521: }
0522: }
0523:
0524: private void handleDueTicketIds(List ids) {
0525: // lookup entry in table
0526: for (int i = 0, n = ids.size(); i < n; i++) {
0527: Object id = ids.get(i);
0528: Entry entry = (Entry) idToEntry.get(id);
0529: if (entry == null) {
0530: // removed, or already finished
0531: if (log.isDebugEnabled()) {
0532: log.debug(todd
0533: + "Ignoring alarm for unknown ticket id "
0534: + id);
0535: }
0536: continue;
0537: }
0538: Step step = entry.step;
0539: int state = step.getStatus().getState();
0540: if (state == StepStatus.PAUSED) {
0541: // verify that pause time has passed
0542: long nowTime = System.currentTimeMillis();
0543: long pauseTime = step.getOptions().getPauseTime();
0544: if (nowTime < pauseTime) {
0545: // interrupted?
0546: if (log.isWarnEnabled()) {
0547: log.warn(todd + "Alarm pause resume at ("
0548: + nowTime + ")"
0549: + " is less than pause time ("
0550: + pauseTime + "),"
0551: + " will proceed anyways");
0552: }
0553: // proceed anyways
0554: }
0555: // start the entry
0556: startEntry(entry);
0557: } else if (state == StepStatus.RUNNING) {
0558: // timeout now
0559: idToEntry.remove(id);
0560: timeoutEntry(entry);
0561: } else {
0562: // ignore, already completed
0563: }
0564: }
0565: }
0566:
0567: private void removeStep(Step step) {
0568: Ticket ticket = step.getOptions().getTicket();
0569: Object id = ticket.getIdentifier();
0570: // lookup entry in table
0571: Entry entry = (Entry) idToEntry.remove(id);
0572: if (entry == null) {
0573: // already completed, removed, or not known
0574: if (log.isDebugEnabled()) {
0575: log.debug(todd + "Remove step " + step.getUID()
0576: + " didn't find the step in the table");
0577: }
0578: return;
0579: }
0580:
0581: // step no longer in blackboard
0582:
0583: // alarm may be running
0584: MyAlarm alarm = entry.alarm;
0585: if ((alarm != null) && (!(alarm.hasExpired()))) {
0586: alarm.cancel();
0587: }
0588:
0589: // remove move-agent object
0590: MoveAgent ma = entry.moveAgent;
0591: if (ma != null) {
0592: blackboard.publishRemove(ma);
0593: }
0594:
0595: if (log.isDebugEnabled()) {
0596: log.debug(todd + "Removed step " + step.getUID());
0597: }
0598: }
0599:
0600: private void updateMove(MoveAgent ma) {
0601: // get entry
0602: Ticket ticket = ma.getTicket();
0603: Object id = ticket.getIdentifier();
0604: Entry entry = (Entry) idToEntry.get(id);
0605: if (entry == null) {
0606: if (log.isErrorEnabled()) {
0607: log.error(todd
0608: + "Move updated, but step no longer exists: "
0609: + ma);
0610: }
0611: // maybe not our move!
0612: //blackboard.publishRemove(ma);
0613: return;
0614: }
0615: // check entry
0616: if (ma != entry.moveAgent) {
0617: if (log.isErrorEnabled()) {
0618: log.error(todd
0619: + "Move updated, but given move-agent object "
0620: + ma
0621: + " doesn't == the entry move-agent object "
0622: + entry.moveAgent);
0623: }
0624: return;
0625: }
0626:
0627: Step step = entry.step;
0628: StepStatus status = step.getStatus();
0629: int state = status.getState();
0630: if (state != StepStatus.RUNNING) {
0631: if (log.isErrorEnabled()) {
0632: log.error(todd + "Move updated, but step status is ("
0633: + state + ") instead of RUNNING");
0634: }
0635: return;
0636: }
0637: // check status
0638: MoveAgent.Status mstat = ma.getStatus();
0639: if (mstat == null) {
0640: // still in progress?
0641: if (log.isDebugEnabled()) {
0642: log.debug(todd + "Step " + step.getUID()
0643: + " ignoring null move-agent " + ma.getUID()
0644: + " status");
0645: }
0646: return;
0647: }
0648:
0649: int newState = StepStatus.FAILURE;
0650: if (mstat.getCode() != MoveAgent.Status.OKAY) {
0651: // move itself failed
0652: if (log.isErrorEnabled()) {
0653: log.error(todd + "Step " + step.getUID()
0654: + " finished with non-OKAY status "
0655: + mstat.getCodeAsString());
0656: }
0657: } else {
0658: // check wp. The "entry.wpi" may be null, but
0659: // the current wpi should not be null.
0660: MessageAddress mobileAgent = ticket.getMobileAgent();
0661: if (mobileAgent == null) {
0662: mobileAgent = agentId;
0663: }
0664: MessageAddress destNode = ticket.getDestinationNode();
0665: WPInfo origWPI = entry.wpi;
0666:
0667: // the wp update may take several seconds, so we
0668: // do a limited backoff-n-retry in case the entry looks stale.
0669: //
0670: // FIXME: ideally we would release the plugin execute thread
0671: // and set an alarm, but for now we'll simply sleep.
0672: WPInfo newWPI;
0673: for (int i = 0, maxi = 5;; i++) {
0674: newWPI = lookupInWP(mobileAgent, 30000);
0675: if (newWPI == null) {
0676: // not listed in wp?
0677: if (i == 0) {
0678: if (log.isInfoEnabled()) {
0679: log.info("Will re-examine " + mobileAgent
0680: + "'s null wp entry (step: "
0681: + step.getUID() + ")");
0682: }
0683: } else if (i >= maxi) {
0684: if (log.isErrorEnabled()) {
0685: log
0686: .error("Step " + step.getUID()
0687: + " failed due to null wp"
0688: + " entry for agent "
0689: + mobileAgent);
0690: }
0691: break;
0692: }
0693: } else if ((destNode != null)
0694: && (!(destNode.getAddress().equals(newWPI.node)))) {
0695: // at the wrong node!
0696: if (i == 0) {
0697: if (log.isInfoEnabled()) {
0698: log
0699: .info("Will re-examine "
0700: + mobileAgent
0701: + "'s wp entry, which indicates that"
0702: + " the agent is at node "
0703: + newWPI.node
0704: + " instead of the move destination node "
0705: + destNode + " (step: "
0706: + step.getUID() + ")");
0707: }
0708: } else if (i >= maxi) {
0709: if (log.isErrorEnabled()) {
0710: log
0711: .error("Step "
0712: + step.getUID()
0713: + " failed due to wp listing of agent "
0714: + mobileAgent
0715: + " at node "
0716: + newWPI.node
0717: + " instead of ticket's destination node "
0718: + destNode
0719: + ", wp entry is " + newWPI);
0720: }
0721: newWPI = null;
0722: break;
0723: }
0724: } else {
0725: // seems okay
0726: break;
0727: }
0728:
0729: // FIXME release thread & use alarm!!!
0730: try {
0731: Thread.sleep(i * 2000);
0732: } catch (Exception e) {
0733: log.error("Cancel sleep", e);
0734: }
0735: }
0736:
0737: if (newWPI == null) {
0738: // already logged our error above
0739: } else if ((destNode == null)
0740: && (!(origWPI.node.equals(newWPI.node)))) {
0741: // didn't restart in place!
0742: if (log.isErrorEnabled()) {
0743: log.error("Step " + step.getUID()
0744: + " finished restart-in-place of agent "
0745: + mobileAgent + " started at node "
0746: + origWPI.node
0747: + " and ended up at a different node "
0748: + newWPI.node + ", wp entry is " + newWPI);
0749: }
0750: } else if (newWPI.inc != origWPI.inc) {
0751: // incarnation number is wrong! maybe crashed during move.
0752: if (log.isErrorEnabled()) {
0753: log
0754: .error("Step "
0755: + step.getUID()
0756: + " finished with incarnation number "
0757: + newWPI.inc
0758: + " that doesn't match the prior incarnation number "
0759: + origWPI.inc);
0760: }
0761: } else {
0762: // success!
0763: newState = StepStatus.SUCCESS;
0764: }
0765: }
0766:
0767: // update step
0768: status = new StepStatus(newState, status.getStartTime(), System
0769: .currentTimeMillis(), mstat);
0770: step.setStatus(status);
0771: blackboard.publishChange(step);
0772:
0773: if (log.isDebugEnabled()) {
0774: log.debug(todd + "Step " + step.getUID()
0775: + " is now in status " + status.getStateAsString());
0776: }
0777:
0778: // cancel alarm
0779: MyAlarm alarm = entry.alarm;
0780: if (alarm != null) {
0781: entry.alarm = null;
0782: if (!(alarm.hasExpired())) {
0783: alarm.cancel();
0784: if (log.isDebugEnabled()) {
0785: log.debug(todd + "Cancelled alarm for step "
0786: + step.getUID());
0787: }
0788: }
0789: }
0790:
0791: // keep entry for later removal
0792:
0793: if (log.isDebugEnabled()) {
0794: log.debug(todd + "Step " + step.getUID()
0795: + " completed move with status " + mstat);
0796: }
0797: }
0798:
0799: private MyAlarm addAlarm(long time, Object id) {
0800: MyAlarm alarm = new MyAlarm(time, id);
0801: getAlarmService().addRealTimeAlarm(alarm);
0802: pendingAlarms.add(alarm);
0803: return alarm;
0804: }
0805:
0806: private void cancelAlarms() {
0807: // FIXME optimize
0808: for (int i = 0, n = pendingAlarms.size(); i < n; i++) {
0809: MyAlarm ai = (MyAlarm) pendingAlarms.get(i);
0810: if (!(ai.hasExpired())) {
0811: ai.cancel();
0812: }
0813: }
0814: pendingAlarms.clear();
0815: }
0816:
0817: private List getDueTicketIds() {
0818: // FIXME optimize
0819: List l = null;
0820: for (int i = 0, n = pendingAlarms.size(); i < n; i++) {
0821: MyAlarm ai = (MyAlarm) pendingAlarms.get(i);
0822: if (ai.hasExpired()) {
0823: if (l == null) {
0824: l = new ArrayList(Math.min((n - i), 5));
0825: }
0826: l.add(ai.getId());
0827: pendingAlarms.remove(i);
0828: --i;
0829: --n;
0830: }
0831: }
0832: return l;
0833: }
0834:
0835: private WPInfo lookupInWP(MessageAddress agent, long timeout) {
0836: try {
0837: // get "node://host/node"
0838: AddressEntry nodeAE = wps.get(agent.getAddress(),
0839: "topology", timeout);
0840: if (nodeAE == null) {
0841: if (log.isInfoEnabled()) {
0842: log.info("Missing \"topology\" WP entry for "
0843: + agent);
0844: }
0845: return null;
0846: }
0847: URI nodeURI = nodeAE.getURI();
0848: String host = nodeURI.getHost();
0849: String node = nodeURI.getPath().substring(1);
0850: // get "version:///incarnation/moveId"
0851: AddressEntry versionAE = wps.get(agent.getAddress(),
0852: "version", timeout);
0853: if (versionAE == null) {
0854: if (log.isInfoEnabled()) {
0855: log.info("Missing \"version\" WP entry for "
0856: + agent);
0857: }
0858: return null;
0859: }
0860: URI versionURI = versionAE.getURI();
0861: String tmp = versionURI.getPath();
0862: int sep = tmp.indexOf('/', 1);
0863: long inc = Long.parseLong(tmp.substring(1, sep));
0864: long moveId = Long.parseLong(tmp.substring(sep + 1));
0865: return new WPInfo(agent.getAddress(), node, host, inc,
0866: moveId);
0867: } catch (Exception e) {
0868: if (log.isInfoEnabled()) {
0869: log.info("Failed WP lookup(" + agent + ")", e);
0870: }
0871: return null;
0872: }
0873: }
0874:
0875: // find MoveAgent with matching ticket-id
0876: private MoveAgent findMove(Object id) {
0877: Collection c = moveSub.getCollection();
0878: int n = c.size();
0879: if (n > 0) {
0880: Iterator iter = c.iterator();
0881: for (int i = 0; i < n; i++) {
0882: Object o = iter.next();
0883: if (o instanceof MoveAgent) {
0884: MoveAgent ma = (MoveAgent) o;
0885: MessageAddress a = ma.getSource();
0886: if (agentId.equals(a)) {
0887: Ticket ticket = ma.getTicket();
0888: Object tid = ticket.getIdentifier();
0889: if (id.equals(tid)) {
0890: return ma;
0891: }
0892: }
0893: }
0894: }
0895: }
0896: return null;
0897: }
0898:
0899: private static UnaryPredicate createStepPredicate(
0900: final MessageAddress agentId) {
0901: return new UnaryPredicate() {
0902: public boolean execute(Object o) {
0903: if (o instanceof Step) {
0904: Step step = (Step) o;
0905: MessageAddress target = step.getOptions()
0906: .getTarget();
0907: return agentId.equals(target);
0908: }
0909: return false;
0910: }
0911: };
0912: }
0913:
0914: private static UnaryPredicate createMovePredicate(
0915: final MessageAddress agentId) {
0916: return new UnaryPredicate() {
0917: public boolean execute(Object o) {
0918: if (o instanceof MoveAgent) {
0919: MoveAgent ma = (MoveAgent) o;
0920: MessageAddress a = ma.getSource();
0921: return agentId.equals(a);
0922: }
0923: return false;
0924: }
0925: };
0926: }
0927:
0928: private static UniqueObject query(CollectionSubscription sub,
0929: UID uid) {
0930: Collection real = sub.getCollection();
0931: int n = real.size();
0932: if (n > 0) {
0933: for (Iterator iter = real.iterator(); iter.hasNext();) {
0934: Object o = iter.next();
0935: if (o instanceof UniqueObject) {
0936: UniqueObject uo = (UniqueObject) o;
0937: UID x = uo.getUID();
0938: if (uid.equals(x)) {
0939: return uo;
0940: }
0941: }
0942: }
0943: }
0944: return null;
0945: }
0946:
0947: private static class WPInfo {
0948: public final String agent;
0949: public final String node;
0950: public final String host;
0951: public final long inc;
0952: public final long moveId;
0953:
0954: public WPInfo(String agent, String node, String host, long inc,
0955: long moveId) {
0956: this .agent = agent;
0957: this .node = node;
0958: this .host = host;
0959: this .inc = inc;
0960: this .moveId = moveId;
0961: }
0962:
0963: public String toString() {
0964: return "(agent=" + agent + ", node=" + node + ", host="
0965: + host + ", inc=" + inc + ", moveId=" + moveId
0966: + ")";
0967: }
0968: }
0969:
0970: private class MyAlarm implements Alarm, Comparable {
0971: private final long expirationTime;
0972: private boolean expired = false;
0973:
0974: private final Object id;
0975:
0976: public MyAlarm(long expirationTime, Object id) {
0977: this .expirationTime = expirationTime;
0978: this .id = id;
0979: }
0980:
0981: public Object getId() {
0982: return id;
0983: }
0984:
0985: public long getExpirationTime() {
0986: return expirationTime;
0987: }
0988:
0989: public synchronized void expire() {
0990: if (!expired) {
0991: expired = true;
0992: if (log.isDebugEnabled()) {
0993: log.debug(todd + "Alarm for ticket " + id
0994: + " expired");
0995: }
0996: if (blackboard != null) {
0997: blackboard.signalClientActivity();
0998: } else {
0999: // bug 989?
1000: }
1001: }
1002: }
1003:
1004: public boolean hasExpired() {
1005: return expired;
1006: }
1007:
1008: public synchronized boolean cancel() {
1009: boolean was = expired;
1010: expired = true;
1011: return was;
1012: }
1013:
1014: public boolean equals(Object o) {
1015: if (o instanceof MyAlarm) {
1016: long ot = ((MyAlarm) o).expirationTime;
1017: return (expirationTime == ot);
1018: }
1019: return false;
1020: }
1021:
1022: public int compareTo(Object o) {
1023: long ot = ((MyAlarm) o).expirationTime;
1024: return (expirationTime < ot) ? -1
1025: : (expirationTime > ot) ? 1 : 0;
1026: }
1027:
1028: public String toString() {
1029: return "Alarm {" + "\n expirationTime: " + expirationTime
1030: + "\n expired: " + expired + "\n id: " + id
1031: + "\n}";
1032: }
1033: }
1034:
1035: private static class Entry {
1036: public final Ticket ticket;
1037: public final WPInfo wpi;
1038: public final Step step;
1039: public MyAlarm alarm;
1040: public MoveAgent moveAgent;
1041:
1042: public Entry(Ticket ticket, WPInfo wpi, Step step) {
1043: this .ticket = ticket;
1044: this .wpi = wpi;
1045: this .step = step;
1046: if ((ticket == null) || (step == null)) {
1047: throw new IllegalArgumentException("null ticket/step");
1048: }
1049: }
1050:
1051: public String toString() {
1052: return "Entry {" + "\n ticket: " + ticket
1053: + "\n wp info: " + wpi + "\n step: " + step
1054: + "\n alarm: " + alarm + "\n moveAgent: "
1055: + moveAgent + "\n}";
1056: }
1057: }
1058:
1059: }
|