0001: /*
0002: * <copyright>
0003: *
0004: * Copyright 2001-2004 BBNT Solutions, LLC
0005: * under sponsorship of the Defense Advanced Research Projects
0006: * Agency (DARPA).
0007: *
0008: * You can redistribute this software and/or modify it under the
0009: * terms of the Cougaar Open Source License as published on the
0010: * Cougaar Open Source Website (www.cougaar.org).
0011: *
0012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
0013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
0014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
0015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
0016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
0017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
0018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
0019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
0020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
0021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
0022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
0023: *
0024: * </copyright>
0025: */
0026:
0027: package org.cougaar.core.mobility.service;
0028:
0029: import java.net.URI;
0030: import java.util.ArrayList;
0031: import java.util.HashMap;
0032: import java.util.Iterator;
0033: import java.util.List;
0034: import java.util.Map;
0035: import org.cougaar.core.agent.AgentContainer;
0036: import org.cougaar.core.component.ComponentDescription;
0037: import org.cougaar.core.component.StateTuple;
0038: import org.cougaar.core.mobility.AbstractTicket;
0039: import org.cougaar.core.mobility.AddTicket;
0040: import org.cougaar.core.mobility.MobilityClient;
0041: import org.cougaar.core.mobility.MobilityException;
0042: import org.cougaar.core.mobility.MoveTicket;
0043: import org.cougaar.core.mobility.RemoveTicket;
0044: import org.cougaar.core.mobility.arch.AbstractHandler;
0045: import org.cougaar.core.mobility.arch.AckHandler;
0046: import org.cougaar.core.mobility.arch.ArrivalHandler;
0047: import org.cougaar.core.mobility.arch.DispatchRemoteHandler;
0048: import org.cougaar.core.mobility.arch.DispatchTestHandler;
0049: import org.cougaar.core.mobility.arch.MobilitySupport;
0050: import org.cougaar.core.mobility.arch.NackHandler;
0051: import org.cougaar.core.mobility.ldm.AgentControl;
0052: import org.cougaar.core.mts.MessageAddress;
0053: import org.cougaar.core.service.BlackboardService;
0054: import org.cougaar.core.service.LoggingService;
0055: import org.cougaar.core.service.wp.AddressEntry;
0056: import org.cougaar.core.service.wp.WhitePagesService;
0057: import org.cougaar.core.util.UID;
0058: import org.cougaar.core.util.UniqueObject;
0059: import org.cougaar.util.GenericStateModel;
0060:
0061: /**
0062: * This component coordinates agent mobility and handles agent
0063: * add/remove requests.
0064: */
0065: public class RootMobilityPlugin extends AbstractMobilityPlugin {
0066:
0067: // a map from agent MessageAddress to an AgentEntry
0068: //
0069: // this is used to guarantee only one control at a time,
0070: // and hold onto an agent while it's awaiting the
0071: // control response.
0072: private final Map entries = new HashMap(13);
0073:
0074: //
0075: // handle control add/change/remove.
0076: //
0077: // FIXME refactor into a "switch" with pluggable handlers for each
0078: // ticket class.
0079: //
0080:
0081: /** a new request for the control of a local agent. */
0082: protected void addedAgentControl(AgentControl control) {
0083: if (!(isNode))
0084: return;
0085:
0086: AbstractTicket abstractTicket = control.getAbstractTicket();
0087:
0088: if (log.isDebugEnabled()) {
0089: log.debug("Observed add of " + control.getUID());
0090: }
0091:
0092: if (abstractTicket instanceof AddTicket) {
0093: add_add(control, (AddTicket) abstractTicket);
0094: } else if (abstractTicket instanceof RemoveTicket) {
0095: add_remove(control, (RemoveTicket) abstractTicket);
0096: } else if (abstractTicket instanceof MoveTicket) {
0097: add_move(control, (MoveTicket) abstractTicket);
0098: } else if (abstractTicket instanceof TransferTicket) {
0099: add_transfer(control, (TransferTicket) abstractTicket);
0100: } else {
0101: // ignore
0102: }
0103: }
0104:
0105: /** a control was changed. */
0106: protected void changedAgentControl(AgentControl control) {
0107: if (!(isNode))
0108: return;
0109:
0110: AbstractTicket abstractTicket = control.getAbstractTicket();
0111:
0112: if (log.isDebugEnabled()) {
0113: log.debug("Observed change of " + control.getUID());
0114: }
0115:
0116: if (abstractTicket instanceof AddTicket) {
0117: change_add(control, (AddTicket) abstractTicket);
0118: } else if (abstractTicket instanceof RemoveTicket) {
0119: change_remove(control, (RemoveTicket) abstractTicket);
0120: } else if (abstractTicket instanceof MoveTicket) {
0121: change_move(control, (MoveTicket) abstractTicket);
0122: } else if (abstractTicket instanceof TransferTicket) {
0123: change_transfer(control, (TransferTicket) abstractTicket);
0124: } else {
0125: // ignore
0126: }
0127: }
0128:
0129: /** a control was removed. */
0130: protected void removedAgentControl(AgentControl control) {
0131: if (!(isNode))
0132: return;
0133:
0134: AbstractTicket abstractTicket = control.getAbstractTicket();
0135:
0136: if (log.isDebugEnabled()) {
0137: log.debug("Observed removal of " + control.getUID());
0138: }
0139:
0140: if (abstractTicket instanceof AddTicket) {
0141: remove_add(control, (AddTicket) abstractTicket);
0142: } else if (abstractTicket instanceof RemoveTicket) {
0143: remove_remove(control, (RemoveTicket) abstractTicket);
0144: } else if (abstractTicket instanceof MoveTicket) {
0145: remove_move(control, (MoveTicket) abstractTicket);
0146: } else if (abstractTicket instanceof TransferTicket) {
0147: remove_transfer(control, (TransferTicket) abstractTicket);
0148: } else {
0149: // ignore
0150: }
0151: }
0152:
0153: /** an agent registers as a mobile agent in the local node. */
0154: protected void registerAgent(MessageAddress id,
0155: ComponentDescription desc, MobilityClient agent) {
0156: // add entry to the table
0157: synchronized (entries) {
0158: AgentEntry ae = (AgentEntry) entries.get(id);
0159: boolean isNew;
0160: if (ae == null) {
0161: // new agent
0162: isNew = true;
0163: ae = new AgentEntry(id);
0164: entries.put(id, ae);
0165: } else if (ae.isRegistered) {
0166: // already registered?
0167: throw new RuntimeException("Agent " + id
0168: + " is already registered on node " + nodeId
0169: + ": " + ae);
0170: } else {
0171: // mobile agent arrival
0172: isNew = false;
0173: }
0174: if (log.isDebugEnabled()) {
0175: log.debug("Registered agent " + id + " on node "
0176: + nodeId + ", which "
0177: + (isNew ? "is a new" : "already had an")
0178: + " entry (oid: " + System.identityHashCode(ae)
0179: + "): " + ae + ", the description "
0180: + objectCompare(ae.desc, desc) + " and agent "
0181: + objectCompare(ae.agent, agent));
0182: }
0183: if (ae.state != null) {
0184: Object tmp = ae.state;
0185: if (tmp instanceof LocalMoveState) {
0186: tmp = ((LocalMoveState) tmp).state;
0187: }
0188: ae.state = null;
0189: if (log.isDebugEnabled()) {
0190: log.debug("Setting state for agent " + id);
0191: }
0192: agent.setState(tmp);
0193: }
0194: ae.desc = desc;
0195: ae.agent = agent;
0196: ae.isRegistered = true;
0197: }
0198: }
0199:
0200: /** an agent unregisters itself from the local mobility registry. */
0201: protected void unregisterAgent(MessageAddress id) {
0202: synchronized (entries) {
0203: AgentEntry ae = (AgentEntry) entries.get(id);
0204: if (ae == null || !ae.isRegistered) {
0205: // already removed?
0206: if (log.isErrorEnabled()) {
0207: log.error("Attempted to unregister agent " + id
0208: + " on node " + nodeId
0209: + ", but the agent is not "
0210: + (ae == null ? "listed" : "registered"));
0211: }
0212: return;
0213: }
0214: ae.isRegistered = false;
0215: boolean removed;
0216: if (ae.pendingAction == AgentEntry.NONE) {
0217: // no longer needed
0218: entries.remove(id);
0219: removed = true;
0220: } else {
0221: // agent is unloading as part of move,
0222: // keep the entry in case the move fails
0223: removed = false;
0224: }
0225: if (log.isDebugEnabled()) {
0226: log.debug("Unregistered agent " + id + " on node "
0227: + nodeId + ", "
0228: + (removed ? "removed" : "keeping the")
0229: + " entry (oid: " + System.identityHashCode(ae)
0230: + "): " + ae);
0231: }
0232: }
0233: }
0234:
0235: private static String objectCompare(Object a, Object b) {
0236: String astr = (a == null ? "null" : ("(oid: "
0237: + System.identityHashCode(a) + " " + a + ")"));
0238: if (a == b) {
0239: return "is identical " + astr;
0240: }
0241: String bstr = (b == null ? "null" : ("(oid: "
0242: + System.identityHashCode(b) + " " + b + ")"));
0243: return ((a != null && a.equals(b)) ? "is equivalent"
0244: : "has changed")
0245: + " from prior " + astr + " to new " + bstr;
0246: }
0247:
0248: //
0249: //
0250: //
0251:
0252: private void add_add(AgentControl control, AddTicket addTicket) {
0253: if (log.isDetailEnabled()) {
0254: log.detail("add_add(" + control + ", " + addTicket + ")");
0255: }
0256:
0257: MessageAddress id = addTicket.getMobileAgent();
0258: MessageAddress destNode = addTicket.getDestinationNode();
0259: ComponentDescription desc = addTicket.getComponentDescription();
0260:
0261: // check if this node is the destination node
0262: if ((destNode != null) && (!destNode.equals(nodeId))) {
0263: // not for me! let the RedirectMovePlugin forward the request
0264: // to the other node.
0265: return;
0266: }
0267:
0268: // FIXME consider locking in registry, to prevent multiple
0269: // simultaneous add/removes
0270: Object state = addTicket.getState();
0271: if (state != null) {
0272: throw new UnsupportedOperationException(
0273: "AddTicket with state is not implemented yet");
0274: }
0275:
0276: // run outside this transaction, to
0277: // a) prevent blocking, and
0278: // b) avoid nested transactions (bug 1750)
0279: AddAgentRunner aar = new AddAgentRunner(id, control, desc);
0280: queue(id, aar, aar.pendingTuples);
0281: }
0282:
0283: private void change_add(AgentControl control, AddTicket addTicket) {
0284: if (log.isDetailEnabled()) {
0285: log
0286: .detail("change_add(" + control + ", " + addTicket
0287: + ")");
0288: }
0289: }
0290:
0291: private void remove_add(AgentControl control, AddTicket addTicket) {
0292: if (log.isDetailEnabled()) {
0293: log
0294: .detail("remove_add(" + control + ", " + addTicket
0295: + ")");
0296: }
0297: }
0298:
0299: private void add_remove(AgentControl control,
0300: RemoveTicket removeTicket) {
0301: if (log.isDetailEnabled()) {
0302: log.detail("add_remove(" + control + ", " + removeTicket
0303: + ")");
0304: }
0305:
0306: // handle remove
0307: MessageAddress id = removeTicket.getMobileAgent();
0308: MessageAddress destNode = removeTicket.getDestinationNode();
0309:
0310: // check if this node is the destination node
0311: if ((destNode != null) && (!destNode.equals(nodeId))) {
0312: // not for me! let the RedirectMovePlugin forward the request
0313: // to the other node.
0314: return;
0315: }
0316:
0317: // FIXME consider locking in registry, to prevent multiple
0318: // simultaneous add/removes
0319:
0320: // run outside this transaction, to
0321: // a) prevent blocking, and
0322: // b) avoid nested transactions (bug 1750)
0323: RemoveAgentRunner rar = new RemoveAgentRunner(id, control);
0324: queue(id, rar, rar.pendingTuples);
0325: }
0326:
0327: private void change_remove(AgentControl control,
0328: RemoveTicket removeTicket) {
0329: if (log.isDetailEnabled()) {
0330: log.detail("change_remove(" + control + ", " + removeTicket
0331: + ")");
0332: }
0333: }
0334:
0335: private void remove_remove(AgentControl control,
0336: RemoveTicket removeTicket) {
0337: if (log.isDetailEnabled()) {
0338: log.detail("remove_remove(" + control + ", " + removeTicket
0339: + ")");
0340: }
0341: }
0342:
0343: private void add_move(AgentControl control, MoveTicket moveTicket) {
0344: if (log.isDetailEnabled()) {
0345: log.detail("add_move(" + control + ", " + moveTicket + ")");
0346: }
0347:
0348: MessageAddress id = moveTicket.getMobileAgent();
0349: MessageAddress origNode = moveTicket.getOriginNode();
0350: MessageAddress destNode = moveTicket.getDestinationNode();
0351:
0352: if ((id == null) || (id.equals(nodeId))) {
0353: String s = "Move request " + control.getUID()
0354: + " attempted to move node " + nodeId
0355: + " -- nodes are not movable!";
0356: if (log.isErrorEnabled()) {
0357: log.error(s);
0358: }
0359: Throwable stack = new RuntimeException(s);
0360: control.setStatus(AgentControl.FAILURE, stack);
0361: blackboard.publishChange(control);
0362: return;
0363: }
0364:
0365: if ((origNode != null) && (!(nodeId.equals(origNode)))) {
0366: // FIXME note that this assumes that the agent is
0367: // on this node, and doesn't do a redirect.
0368: String s = "Agent " + id + " is currently on node "
0369: + nodeId
0370: + ", not on the ticket's asserted origin node "
0371: + origNode + " (uid: " + control.getUID() + ")";
0372: if (log.isErrorEnabled()) {
0373: log.error(s);
0374: }
0375: Throwable stack = new RuntimeException(s);
0376: control.setStatus(AgentControl.FAILURE, stack);
0377: blackboard.publishChange(control);
0378: return;
0379: }
0380:
0381: boolean isLocalMove = ((destNode == null) || (nodeId
0382: .equals(destNode)));
0383:
0384: // check to see if we're already at destination node
0385: boolean isTrivialMove = (isLocalMove && !moveTicket
0386: .isForceRestart());
0387:
0388: if (!isTrivialMove) {
0389: // check remote destination node
0390: //
0391: // For now we do a quick check to see if the node
0392: // is registered in the WP.
0393: //
0394: // See bug 1218 for details.
0395: String s = null;
0396: AddressEntry ae = null;
0397: try {
0398: ae = whitePagesService.get(destNode.getAddress(),
0399: "topology", (30000)); // 30 seconds
0400: } catch (Exception e) {
0401: s = e.toString();
0402: }
0403: if (ae == null) {
0404: if (s == null) {
0405: s = "It's not listed in the white pages";
0406: }
0407: } else {
0408: URI uri = ae.getURI();
0409: String path = uri.getPath();
0410: String node = (path == null ? null : path.substring(1));
0411: if (!destNode.getAddress().equals(node)) {
0412: s = "It's not a node agent " + ae;
0413: }
0414: }
0415: if (s != null) {
0416: // destination node is invalid!
0417: s = "Invalid destination node " + destNode
0418: + " for move of agent " + agentId + ": " + s
0419: + ", request uid is " + control.getOwnerUID();
0420: if (log.isErrorEnabled()) {
0421: log.error(s);
0422: }
0423: Throwable stack = new RuntimeException(s);
0424: control.setStatus(AgentControl.FAILURE, stack);
0425: blackboard.publishChange(control);
0426: return;
0427: }
0428: }
0429:
0430: // lookup agent in registry, lock in the move
0431: String errorMsg = null;
0432: ComponentDescription desc = null;
0433: MobilityClient agent = null;
0434: LocalMoveState localMoveState = null;
0435: synchronized (entries) {
0436: // lookup the agent
0437: AgentEntry ae = (AgentEntry) entries.get(id);
0438: if (ae == null) {
0439: // agent is not known on this node
0440: errorMsg = "Agent " + id + " is not on node " + nodeId;
0441: } else if (ae.pendingAction != AgentEntry.NONE) {
0442: // already moving or arriving!
0443: errorMsg = "Agent " + id + " on node " + nodeId
0444: + " is busy with another move request: " + ae;
0445: } else if (!(ae.isRegistered)) {
0446: // agent is not registered on this node
0447: errorMsg = "Agent " + id + " on node " + nodeId
0448: + " is not registered for mobility";
0449: } else {
0450: // get the desc and agent from registration
0451: desc = ae.desc;
0452: agent = ae.agent;
0453: // mark as moving
0454: if (!isTrivialMove) {
0455: ae.pendingAction = AgentEntry.MOVE_DEPART;
0456: ae.control = control;
0457: if (isLocalMove) {
0458: localMoveState = new LocalMoveState();
0459: ae.state = localMoveState;
0460: }
0461: }
0462: }
0463: }
0464:
0465: if (errorMsg != null) {
0466: if (log.isErrorEnabled()) {
0467: log.error(errorMsg);
0468: }
0469: Throwable stack = new RuntimeException(errorMsg);
0470: control.setStatus(AgentControl.FAILURE, stack);
0471: blackboard.publishChange(control);
0472: return;
0473: }
0474:
0475: if (isTrivialMove) {
0476: // trivial success -- the agent is already
0477: // at the destination node
0478: if (log.isInfoEnabled()) {
0479: log.info("Agent " + id + " is already at node "
0480: + nodeId + ", responding with trivial success");
0481: }
0482: control.setStatus(AgentControl.MOVED, null);
0483: blackboard.publishChange(control);
0484: return;
0485: }
0486:
0487: // entries contains this move
0488:
0489: // assume that the agent itself provides the state
0490: MobilityClient stateProvider = agent;
0491: // assume that the agent itself regulates its model
0492: GenericStateModel model = agent;
0493:
0494: MobilitySupportImpl support = new MobilitySupportImpl(agent,
0495: control, null, id, moveTicket);
0496:
0497: AbstractHandler h;
0498: if (isLocalMove) {
0499: h = new DispatchTestHandler(support, model, desc,
0500: stateProvider, localMoveState);
0501: } else {
0502: h = new DispatchRemoteHandler(support, model, desc,
0503: stateProvider);
0504: }
0505:
0506: queue(id, h, support);
0507: }
0508:
0509: private void change_move(AgentControl control, MoveTicket moveTicket) {
0510: if (log.isDetailEnabled()) {
0511: log.detail("change_move(" + control + ", " + moveTicket
0512: + ")");
0513: }
0514: }
0515:
0516: private void remove_move(AgentControl control, MoveTicket moveTicket) {
0517: if (log.isDetailEnabled()) {
0518: log.detail("remove_move(" + control + ", " + moveTicket
0519: + ")");
0520: }
0521: }
0522:
0523: private void add_transfer(AgentControl control,
0524: TransferTicket transferTicket) {
0525: if (log.isDetailEnabled()) {
0526: log.detail("add_transfer(" + control + ", "
0527: + transferTicket + ")");
0528: }
0529:
0530: MoveTicket moveTicket = transferTicket.getMoveTicket();
0531:
0532: MessageAddress destNode = moveTicket.getDestinationNode();
0533: if (destNode == null) {
0534: // not expected, since only remote controls
0535: // create transfers
0536: if (log.isErrorEnabled()) {
0537: log.error("Unexpected agent-transfer "
0538: + control.getUID() + " added on node " + nodeId
0539: + " with null destination node, ticket: "
0540: + moveTicket);
0541: }
0542: return;
0543: }
0544:
0545: if (!(nodeId.equals(destNode))) {
0546: if (!nodeId.equals(control.getSource())) {
0547: // created by this plugin
0548: if (log.isErrorEnabled()) {
0549: log.error("Invalid agent transfer with source "
0550: + control.getSource() + " to node "
0551: + destNode + " doesn't match this node "
0552: + nodeId);
0553: }
0554: }
0555: return;
0556: }
0557:
0558: MessageAddress id = moveTicket.getMobileAgent();
0559:
0560: // get the desc and mobile state
0561: ComponentDescription desc = transferTicket
0562: .getComponentDescription();
0563: Object state = transferTicket.getState();
0564:
0565: // force GC of the agent state once transfer-ADD completes
0566: transferTicket.clearState();
0567:
0568: // make sure agent is not registered, lock in arrival
0569: String errorMsg = null;
0570: synchronized (entries) {
0571: AgentEntry ae = (AgentEntry) entries.get(id);
0572: boolean isNew = false;
0573: if (ae == null) {
0574: isNew = true;
0575: ae = new AgentEntry(id);
0576: entries.put(id, ae);
0577: }
0578: if (ae.pendingAction != AgentEntry.NONE) {
0579: // agent is leaving this node?
0580: errorMsg = "Unable to accept remote agent " + id
0581: + ", a move is already in progress: " + ae;
0582: } else if (ae.isRegistered) {
0583: // already moving or adding the agent?
0584: errorMsg = "Unable to accept remote agent " + id
0585: + ", that agent is already on node " + nodeId
0586: + ": " + ae;
0587: } else {
0588: ae.pendingAction = AgentEntry.MOVE_ARRIVAL;
0589: ae.control = control;
0590: ae.state = state;
0591: if (log.isDebugEnabled()) {
0592: if (isNew) {
0593: log.debug("Created new entry for agent " + id
0594: + " move arrival" + " (oid: "
0595: + System.identityHashCode(ae) + "): "
0596: + ae);
0597: } else {
0598: log.debug("Updated entry for agent " + id
0599: + " move arrival" + " (oid: "
0600: + System.identityHashCode(ae) + "): "
0601: + ae + ", description "
0602: + objectCompare(ae.desc, ae.desc)
0603: + ", agent "
0604: + objectCompare(ae.agent, ae.agent));
0605: }
0606: }
0607: }
0608: }
0609:
0610: if (errorMsg != null) {
0611: if (log.isErrorEnabled()) {
0612: log.error(errorMsg);
0613: }
0614: Throwable stack = new RuntimeException(errorMsg);
0615: control.setStatus(AgentControl.FAILURE, stack);
0616: blackboard.publishChange(control);
0617: return;
0618: }
0619:
0620: MobilitySupportImpl support = new MobilitySupportImpl(null,
0621: null, control, id, moveTicket);
0622:
0623: AbstractHandler h = new ArrivalHandler(support, desc);
0624:
0625: queue(id, h, support);
0626: }
0627:
0628: private void change_transfer(AgentControl control,
0629: TransferTicket transferTicket) {
0630: if (log.isDetailEnabled()) {
0631: log.detail("change_transfer(" + control + ", "
0632: + transferTicket + ")");
0633: }
0634:
0635: MoveTicket moveTicket = transferTicket.getMoveTicket();
0636:
0637: MessageAddress id = moveTicket.getMobileAgent();
0638:
0639: MessageAddress origNode = moveTicket.getOriginNode();
0640: if ((origNode != null) && (!(nodeId.equals(origNode)))) {
0641: if (origNode.equals(control.getSource())) {
0642: // ignore, changed by this plugin
0643: return;
0644: } else {
0645: if (log.isErrorEnabled()) {
0646: log.error("Invalid change in transfer "
0647: + control.getUID()
0648: + ", intended for origin node " + origNode
0649: + ", not local node " + nodeId);
0650: }
0651: return;
0652: }
0653: }
0654:
0655: int status = control.getStatusCode();
0656: if (status == AgentEntry.NONE) {
0657: if (log.isDebugEnabled()) {
0658: log.debug("Ignore change with no status for transfer "
0659: + control.getUID());
0660: }
0661: return;
0662: }
0663:
0664: boolean isNack = (status != AgentControl.MOVED);
0665: Throwable stack = (isNack ? control.getFailureStackTrace()
0666: : null);
0667:
0668: // force GC of the captured agent state
0669: //
0670: // this is important, otherwise the state will be
0671: // transfered again when we publish-remove the
0672: // transfer-control object.
0673: transferTicket.clearState();
0674:
0675: // remove the completed transfer
0676: //
0677: // this may complicate debugging, but it helps ensure
0678: // GC of these transfer-controls even if the original
0679: // move-control is never removed. The other option is
0680: // to wait for the removal of the move-control.
0681: blackboard.publishRemove(control);
0682:
0683: // make sure agent is not registered, lock in arrival
0684: String errorMsg = null;
0685: MobilityClient agent = null;
0686: synchronized (entries) {
0687: AgentEntry ae = (AgentEntry) entries.get(id);
0688: if (ae == null) {
0689: // no such control request
0690: errorMsg = "Unknown agent " + id + " on node " + nodeId
0691: + ", so unable to process move "
0692: + (isNack ? "failure" : "success")
0693: + " response";
0694: } else if (ae.pendingAction != AgentEntry.MOVE_DEPART) {
0695: // agent is not moving, so we're not expecting an [n]ack
0696: errorMsg = "Agent " + id + " is not moving on node "
0697: + nodeId + ", so unable to process move "
0698: + (isNack ? "failure" : "success")
0699: + " response";
0700: } else if (!(ae.isRegistered)) {
0701: // expecting the agent to stay registered during control
0702: // since unregister is in agent's "stop()".
0703: errorMsg = "Agent " + id + " on node " + nodeId
0704: + " is no longer registered";
0705: } else {
0706: agent = ae.agent;
0707: ae.pendingAction = AgentEntry.MOVE_CONFIRM;
0708: ae.control = control;
0709: }
0710: }
0711:
0712: if (errorMsg != null) {
0713: if (log.isErrorEnabled()) {
0714: log.error(errorMsg, stack);
0715: }
0716: return;
0717: }
0718:
0719: // find the original "move" control
0720: UID moveControlUID = control.getOwnerUID();
0721: AgentControl moveControl = findAgentControl(moveControlUID);
0722: if (moveControl == null) {
0723: if (log.isWarnEnabled()) {
0724: log.warn("Agent " + id + " control request "
0725: + moveControlUID + " for transfer "
0726: + control.getUID() + " not found in node "
0727: + nodeId + "'s blackboard, "
0728: + " will be unable to set the control status, "
0729: + " but will complete the control anyways");
0730: }
0731: }
0732:
0733: MobilitySupportImpl support = new MobilitySupportImpl(agent,
0734: moveControl, control, id, moveTicket);
0735:
0736: AbstractHandler h;
0737: if (isNack) {
0738: h = new NackHandler(support, agent, stack);
0739: } else {
0740: h = new AckHandler(support, agent);
0741: }
0742:
0743: queue(id, h, support);
0744: }
0745:
0746: private void remove_transfer(AgentControl control,
0747: TransferTicket transferTicket) {
0748: if (log.isDetailEnabled()) {
0749: log.detail("remove_transfer(" + control + ", "
0750: + transferTicket + ")");
0751: }
0752: }
0753:
0754: //
0755: //
0756: //
0757:
0758: private void queue(MessageAddress id, AbstractHandler h,
0759: MobilitySupportImpl support) {
0760: queue(id, h, support.pendingTuples);
0761: }
0762:
0763: private void queue(final MessageAddress id, final Runnable r,
0764: final List pendingTuples) {
0765: // ensure queue cleanup
0766: Runnable r2 = new Runnable() {
0767: public void run() {
0768: try {
0769: r.run();
0770: } finally {
0771: dequeue(id, r, pendingTuples);
0772: }
0773: }
0774:
0775: public String toString() {
0776: return r.toString();
0777: }
0778: };
0779: queue(r2);
0780: }
0781:
0782: private void dequeue(MessageAddress id, Runnable r2,
0783: final List pendingTuples) {
0784: if (r2 instanceof DispatchRemoteHandler) {
0785: // leave the entry, we're waiting for the response.
0786: if (log.isDebugEnabled()) {
0787: synchronized (entries) {
0788: AgentEntry ae = (AgentEntry) entries.get(id);
0789: log.debug("Completed move action <dispatch> " + r2
0790: + " for agent " + id + " on node " + nodeId
0791: + ", keeping the move entry" + " (oid: "
0792: + System.identityHashCode(ae) + "): " + ae);
0793: }
0794: }
0795: } else {
0796: // remove moving flag
0797: synchronized (entries) {
0798: AgentEntry ae = (AgentEntry) entries.get(id);
0799: if (ae == null) {
0800: // aborted handler?
0801: if (log.isDebugEnabled()) {
0802: log.debug("Completed move action " + r2
0803: + " for agent " + id + " on node "
0804: + nodeId + ", but the entry is null");
0805: }
0806: } else {
0807: ae.pendingAction = AgentEntry.NONE;
0808: if (!(ae.isRegistered)) {
0809: entries.remove(id);
0810: if (log.isDebugEnabled()) {
0811: log.debug("Completed move action " + r2
0812: + " for agent " + id + " on node "
0813: + nodeId + ", removed entry"
0814: + " (oid: "
0815: + System.identityHashCode(ae)
0816: + "): " + ae);
0817: }
0818: } else {
0819: // keep in table for future moves
0820: if (log.isDebugEnabled()) {
0821: log.debug("Completed move action " + r2
0822: + " for agent " + id + " on node "
0823: + nodeId + ", keeping the entry "
0824: + " (oid: "
0825: + System.identityHashCode(ae)
0826: + "): " + ae);
0827: }
0828: }
0829: }
0830: }
0831: }
0832: // queue pending blackboard operations
0833: if (!pendingTuples.isEmpty()) {
0834: Runnable r3 = new Runnable() {
0835: public void run() {
0836: for (Iterator iter = pendingTuples.iterator(); iter
0837: .hasNext();) {
0838: PendingTuple pt = (PendingTuple) iter.next();
0839: if (log.isDebugEnabled()) {
0840: log.debug("Blackboard " + pt);
0841: }
0842: Object obj = pt.obj;
0843: switch (pt.op) {
0844: case PendingTuple.ADD:
0845: blackboard.publishAdd(obj);
0846: break;
0847: case PendingTuple.CHANGE:
0848: blackboard.publishChange(obj);
0849: break;
0850: case PendingTuple.REMOVE:
0851: blackboard.publishRemove(obj);
0852: break;
0853: }
0854: }
0855: }
0856: };
0857: fireLater(r3);
0858: }
0859: }
0860:
0861: private class AgentEntry {
0862:
0863: /**
0864: * pendingAction constants.
0865: */
0866: public static final int NONE = 0;
0867: // local agent add
0868: public static final int ADD = 1;
0869: // local agent remove
0870: public static final int REMOVE = 2;
0871: // sender-side agent is moving away
0872: public static final int MOVE_DEPART = 3;
0873: // target-side agent is being added
0874: public static final int MOVE_ARRIVAL = 4;
0875: // sender-side process the move response
0876: public static final int MOVE_CONFIRM = 5;
0877:
0878: public final MessageAddress id;
0879: public ComponentDescription desc;
0880: public Object state;
0881: public MobilityClient agent;
0882:
0883: public boolean isRegistered;
0884:
0885: public int pendingAction = NONE;
0886:
0887: public AgentControl control;
0888:
0889: public AgentEntry(MessageAddress id) {
0890: this .id = id;
0891: }
0892:
0893: public String getPendingActionAsString() {
0894: switch (pendingAction) {
0895: case NONE:
0896: return "none";
0897: case ADD:
0898: return "add";
0899: case REMOVE:
0900: return "remove";
0901: case MOVE_DEPART:
0902: return "move_depart";
0903: case MOVE_ARRIVAL:
0904: return "move_arrival";
0905: case MOVE_CONFIRM:
0906: return "move_confirm";
0907: default:
0908: return "?";
0909: }
0910: }
0911:
0912: public String toString() {
0913: return "control request for agent "
0914: + id
0915: + ", state is <"
0916: + (isRegistered ? "" : "not ")
0917: + "registered + "
0918: + getPendingActionAsString()
0919: + ">"
0920: + ((control != null) ? (", with ticket " + control
0921: .getAbstractTicket()) : "");
0922: }
0923: }
0924:
0925: private static class PendingTuple {
0926: public static final int ADD = 0;
0927: public static final int CHANGE = 1;
0928: public static final int REMOVE = 2;
0929: public final Object obj;
0930: public final int op;
0931:
0932: public PendingTuple(int op, Object obj) {
0933: this .op = op;
0934: this .obj = obj;
0935: }
0936:
0937: public String toString() {
0938: return "queued "
0939: + ((op == ADD) ? "add" : (op == CHANGE) ? "change"
0940: : (op == REMOVE) ? "remove" : "?")
0941: + " of object "
0942: + ((obj instanceof UniqueObject) ? ("with uid " + (((UniqueObject) obj)
0943: .getUID()))
0944: : (obj != null) ? obj.toString() : "null");
0945: }
0946: }
0947:
0948: private class MobilitySupportImpl extends AbstractMobilitySupport {
0949:
0950: private final List pendingTuples = new ArrayList(3);
0951:
0952: private MobilityClient agent;
0953: private AgentControl moveControl;
0954: private AgentControl transferControl;
0955:
0956: public MobilitySupportImpl(MobilityClient agent,
0957: AgentControl moveControl, AgentControl transferControl,
0958: MessageAddress id, MoveTicket moveTicket) {
0959: super (id, RootMobilityPlugin.this .nodeId, moveTicket,
0960: RootMobilityPlugin.this .log);
0961: this .agent = agent;
0962: this .moveControl = moveControl;
0963: this .transferControl = transferControl;
0964: }
0965:
0966: public void onDispatch() {
0967: MessageAddress destNode = moveTicket.getDestinationNode();
0968: try {
0969: agent.onDispatch(destNode);
0970: } catch (MobilityException me) {
0971: throw me;
0972: } catch (Exception e) {
0973: if (RootMobilityPlugin.this .log.isErrorEnabled()) {
0974: RootMobilityPlugin.this .log.error("Failed agent "
0975: + id + " move to node " + destNode, e);
0976: }
0977: }
0978: }
0979:
0980: public void onArrival() {
0981: if (moveControl != null) {
0982: moveControl.setStatus(AgentControl.MOVED, null);
0983: publishChangeLater(moveControl);
0984: } else {
0985: if (RootMobilityPlugin.this .log.isWarnEnabled()) {
0986: RootMobilityPlugin.this .log
0987: .warn("Unable to set move status for transfer "
0988: + ((transferControl != null) ? transferControl
0989: .getUID().toString()
0990: : "<unknown>"));
0991: }
0992: }
0993: }
0994:
0995: public void onFailure(Throwable throwable) {
0996: moveControl.setStatus(AgentControl.FAILURE, throwable);
0997: publishChangeLater(moveControl);
0998: }
0999:
1000: public void onRemoval() {
1001: }
1002:
1003: public void setPendingModel(GenericStateModel model) {
1004: }
1005:
1006: public GenericStateModel takePendingModel() {
1007: return null;
1008: }
1009:
1010: public void sendTransfer(ComponentDescription desc, Object state) {
1011: TransferTicket transferTicket = new TransferTicket(
1012: moveTicket, desc, state);
1013: AgentControl newTC = createAgentControl(moveControl
1014: .getUID(), moveTicket.getDestinationNode(),
1015: transferTicket);
1016: transferControl = newTC;
1017: publishAddLater(newTC);
1018: }
1019:
1020: public void sendAck() {
1021: transferControl.setStatus(AgentControl.MOVED, null);
1022: publishChangeLater(transferControl);
1023: }
1024:
1025: public void sendNack(Throwable throwable) {
1026: transferControl.setStatus(AgentControl.FAILURE, throwable);
1027: publishChangeLater(transferControl);
1028: }
1029:
1030: public void addAgent(ComponentDescription desc) {
1031: StateTuple tuple = new StateTuple(desc, null);
1032: agentContainer.addAgent(id, tuple);
1033: }
1034:
1035: public void removeAgent() {
1036: agentContainer.removeAgent(id);
1037: }
1038:
1039: private void publishAddLater(Object o) {
1040: addPendingTuple(PendingTuple.ADD, o);
1041: }
1042:
1043: private void publishChangeLater(Object o) {
1044: addPendingTuple(PendingTuple.CHANGE, o);
1045: }
1046:
1047: // private void publishRemoveLater(Object o) {
1048: // addPendingTuple(PendingTuple.REMOVE, o);
1049: // }
1050:
1051: private void addPendingTuple(int op, Object o) {
1052: addPendingTuple(new PendingTuple(op, o));
1053: }
1054:
1055: private void addPendingTuple(PendingTuple pt) {
1056: if (pt == null) {
1057: throw new IllegalArgumentException("null pt");
1058: }
1059: pendingTuples.add(pt);
1060: }
1061: }
1062:
1063: private class AddAgentRunner implements Runnable {
1064:
1065: public final List pendingTuples = new ArrayList(1);
1066:
1067: private final MessageAddress id;
1068: private final AgentControl control;
1069: private final ComponentDescription desc;
1070:
1071: public AddAgentRunner(MessageAddress id, AgentControl control,
1072: ComponentDescription desc) {
1073: this .id = id;
1074: this .control = control;
1075: this .desc = desc;
1076: }
1077:
1078: public void run() {
1079:
1080: // add into this node
1081: if (log.isInfoEnabled()) {
1082: log.info("Add agent " + id + " to node " + nodeId);
1083: }
1084:
1085: int resultState;
1086: Throwable resultStack = null;
1087: try {
1088:
1089: StateTuple tuple = new StateTuple(desc, null);
1090: agentContainer.addAgent(id, tuple);
1091:
1092: // success!
1093: resultState = AgentControl.CREATED;
1094:
1095: if (log.isInfoEnabled()) {
1096: log
1097: .info("Added agent " + id + " to node "
1098: + nodeId);
1099: }
1100:
1101: } catch (Exception e) {
1102: // either already exists or unable to add
1103: //
1104: // HACK: check the exception message
1105: String msg = e.getMessage();
1106: if (msg != null && msg.indexOf(" already exists") > 0) {
1107: // already exists
1108: resultState = AgentControl.ALREADY_EXISTS;
1109: if (log.isErrorEnabled()) {
1110: log.error("Agent " + id
1111: + " already exists on node " + nodeId);
1112: }
1113: } else {
1114: // couldn't add
1115: resultState = AgentControl.FAILURE;
1116: resultStack = e;
1117: if (log.isErrorEnabled()) {
1118: log.error("Unable to add agent " + id, e);
1119: }
1120: }
1121: }
1122:
1123: // set our response state
1124: control.setStatus(resultState, resultStack);
1125:
1126: // publish-change later
1127: PendingTuple pt = new PendingTuple(PendingTuple.CHANGE,
1128: control);
1129: pendingTuples.add(pt);
1130: }
1131: }
1132:
1133: private class RemoveAgentRunner implements Runnable {
1134:
1135: public final List pendingTuples = new ArrayList(1);
1136:
1137: private final MessageAddress id;
1138: private final AgentControl control;
1139:
1140: public RemoveAgentRunner(MessageAddress id, AgentControl control) {
1141: this .id = id;
1142: this .control = control;
1143: }
1144:
1145: public void run() {
1146:
1147: // remove agent from this node
1148: if (log.isInfoEnabled()) {
1149: log.info("Remove agent " + id + " from node " + nodeId);
1150: }
1151:
1152: int resultState;
1153: Throwable resultStack = null;
1154: try {
1155: agentContainer.removeAgent(id);
1156: // success!
1157: resultState = AgentControl.REMOVED;
1158: if (log.isInfoEnabled()) {
1159: log.info("Removed agent " + id + " from node "
1160: + nodeId);
1161: }
1162: } catch (Exception e) {
1163: // either already removed or unable to remove
1164: //
1165: // HACK: check the exception message
1166: String msg = e.getMessage();
1167: if (msg != null && msg.indexOf(" is not loaded") > 0) {
1168: // already exists
1169: resultState = AgentControl.DOES_NOT_EXIST;
1170: if (log.isErrorEnabled()) {
1171: log.error("Agent " + id + " is not on node "
1172: + nodeId);
1173: }
1174: } else {
1175: // couldn't add
1176: resultState = AgentControl.FAILURE;
1177: resultStack = e;
1178: if (log.isErrorEnabled()) {
1179: log.error("Unable to remove agent " + id, e);
1180: }
1181: }
1182: }
1183:
1184: // set our response state
1185: control.setStatus(resultState, resultStack);
1186:
1187: // publish-change later
1188: PendingTuple pt = new PendingTuple(PendingTuple.CHANGE,
1189: control);
1190: pendingTuples.add(pt);
1191: }
1192: }
1193: }
|