001: /*
002: * <copyright>
003: *
004: * Copyright 1997-2004 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026:
027: package org.cougaar.core.agent;
028:
029: import java.util.ArrayList;
030: import java.util.Collection;
031: import java.util.List;
032: import java.util.Set;
033: import org.cougaar.core.blackboard.BlackboardForAgent;
034: import org.cougaar.core.component.Component;
035: import org.cougaar.core.component.ServiceBroker;
036: import org.cougaar.core.component.ServiceProvider;
037: import org.cougaar.core.component.ServiceRevokedListener;
038: import org.cougaar.core.logging.LoggingServiceWithPrefix;
039: import org.cougaar.core.mobility.MobilityClient;
040: import org.cougaar.core.mobility.MobilityService;
041: import org.cougaar.core.mts.MessageAddress;
042: import org.cougaar.core.node.NodeIdentificationService;
043: import org.cougaar.core.persist.PersistenceClient;
044: import org.cougaar.core.persist.PersistenceIdentity;
045: import org.cougaar.core.persist.PersistenceObject;
046: import org.cougaar.core.persist.PersistenceServiceForAgent;
047: import org.cougaar.core.service.AgentIdentificationService;
048: import org.cougaar.core.service.LoggingService;
049: import org.cougaar.util.GenericStateModelAdapter;
050: import org.cougaar.util.IdentityHashSet;
051:
052: /**
053: * This component rehydrates the agent persistence object
054: * and acts as a proxy for agent mobility state capture.
055: *
056: * @see RehydrateLate
057: */
058: public final class RehydrateEarly extends GenericStateModelAdapter
059: implements Component {
060:
061: private ServiceBroker sb;
062:
063: private LoggingService log;
064:
065: private MessageAddress localAgent;
066: private MessageAddress localNode;
067:
068: private AgentStateModelService agentModel;
069:
070: private PersistenceServiceForAgent psfa;
071: private PersistenceClient pc;
072:
073: private final Set moveWatchers = new IdentityHashSet();
074:
075: private MobilityNotificationSP mnsp;
076: private RehydrateLoadSP rlsp;
077:
078: private PersistenceObject persistenceObject;
079:
080: private boolean rehydrated;
081:
082: private boolean is_moving;
083:
084: private MobilityService mobilityService;
085: private MobilityClient mobilityClient;
086:
087: public void setServiceBroker(ServiceBroker sb) {
088: this .sb = sb;
089: }
090:
091: public void load() {
092: super .load();
093:
094: localAgent = find_local_agent();
095:
096: get_logger();
097:
098: localNode = find_local_node();
099:
100: register_persistence();
101:
102: register_mobility();
103:
104: // rehydrate from mobile state (if available)
105: load_early();
106:
107: mnsp = new MobilityNotificationSP();
108: sb.addService(MobilityNotificationService.class, mnsp);
109:
110: rlsp = new RehydrateLoadSP();
111: sb.addService(RehydrateLoadService.class, rlsp);
112:
113: // called later via RehydrateLoadService:
114: //load_late();
115: }
116:
117: public void start() {
118: super .start();
119:
120: // once loaded we revoke our load_late service
121: if (rlsp != null) {
122: sb.revokeService(RehydrateLoadService.class, rlsp);
123: rlsp = null;
124: }
125: }
126:
127: public void suspend() {
128: super .suspend();
129:
130: if (is_moving) {
131: // only capture our mobile state if we're moving, as opposed to
132: // agent/node shutdown
133: is_moving = false;
134: persist_blackboard();
135: }
136:
137: if (psfa != null) {
138: psfa.suspend();
139: }
140: }
141:
142: public void unload() {
143: super .unload();
144:
145: if (mnsp != null) {
146: sb.revokeService(MobilityNotificationService.class, mnsp);
147: mnsp = null;
148: }
149:
150: unregister_mobility();
151: unregister_persistence();
152: }
153:
154: private MessageAddress find_local_agent() {
155: AgentIdentificationService ais = (AgentIdentificationService) sb
156: .getService(this , AgentIdentificationService.class,
157: null);
158: if (ais == null) {
159: return null;
160: }
161: MessageAddress ret = ais.getMessageAddress();
162: sb.releaseService(this , AgentIdentificationService.class, ais);
163: return ret;
164: }
165:
166: private void get_logger() {
167: log = (LoggingService) sb.getService(this ,
168: LoggingService.class, null);
169: String prefix = localAgent + ": ";
170: log = LoggingServiceWithPrefix.add(log, prefix);
171: }
172:
173: private MessageAddress find_local_node() {
174: NodeIdentificationService nis = (NodeIdentificationService) sb
175: .getService(this , NodeIdentificationService.class, null);
176: if (nis == null) {
177: return null;
178: }
179: MessageAddress ret = nis.getMessageAddress();
180: sb.releaseService(this , NodeIdentificationService.class, nis);
181: return ret;
182: }
183:
184: private void load_early() {
185: if (persistenceObject == null) {
186: if (log.isInfoEnabled()) {
187: log.info("No mobile persistence data found");
188: }
189: return;
190: }
191:
192: if (log.isDebugEnabled()) {
193: log
194: .debug("Rehydrating mobile persistence data before loading"
195: + " high priority components");
196: }
197: rehydrate(persistenceObject);
198: persistenceObject = null;
199: rehydrated = true;
200: }
201:
202: private void load_late() {
203: if (rehydrated) {
204: if (log.isDebugEnabled()) {
205: log.debug("Already rehydrated from mobile state");
206: }
207: return;
208: }
209:
210: if (log.isDebugEnabled()) {
211: log
212: .debug("Rehydrating now that we've loading our high priority"
213: + " components");
214: }
215: rehydrate(null);
216: rehydrated = true;
217: }
218:
219: private void set_state(Object o) {
220: if (!(o instanceof PersistenceObject)) {
221: if (log.isErrorEnabled()) {
222: log.error("Invalid setState("
223: + (o == null ? "null" : o.getClass().getName()
224: + " " + o) + ")");
225: }
226: return;
227: }
228:
229: // fill in our mobile persistence state
230: persistenceObject = (PersistenceObject) o;
231: if (log.isInfoEnabled()) {
232: log.info("Set persistence state ("
233: + (persistenceObject == null ? "null"
234: : persistenceObject.getClass().getName())
235: + ")");
236: }
237: }
238:
239: private Object get_state() {
240: if (log.isInfoEnabled()) {
241: log.info("Get persistence state ("
242: + (persistenceObject == null ? "null"
243: : persistenceObject.getClass().getName())
244: + ")");
245: }
246:
247: // we should be suspended now!
248: return persistenceObject;
249: }
250:
251: private void register_persistence() {
252: // get persistence
253: pc = new PersistenceClient() {
254: public PersistenceIdentity getPersistenceIdentity() {
255: String id = getClass().getName() + "_agent";
256: return new PersistenceIdentity(id);
257: }
258:
259: public List getPersistenceData() {
260: // we trigger rehydration but don't have state
261: //
262: // must return a mutable list!
263: return new ArrayList(0);
264: }
265: };
266: psfa = (PersistenceServiceForAgent) sb.getService(pc,
267: PersistenceServiceForAgent.class, null);
268: }
269:
270: private void unregister_persistence() {
271: if (psfa != null) {
272: sb.releaseService(pc, PersistenceServiceForAgent.class,
273: psfa);
274: psfa = null;
275: pc = null;
276: }
277: }
278:
279: private void register_mobility() {
280: if (localAgent == null || localAgent.equals(localNode)) {
281: if (log.isDebugEnabled()) {
282: log.debug("NodeAgent " + localNode
283: + " not registering for mobility");
284: }
285: return;
286: }
287:
288: // get the agent state model, required for mobility
289: agentModel = (AgentStateModelService) sb.getService(this ,
290: AgentStateModelService.class, null);
291: if (agentModel == null) {
292: throw new RuntimeException(
293: "Unable to obtain AgentStateModelService");
294: }
295:
296: // create our mobility proxy for the agent
297: mobilityClient = new MobilityClient() {
298:
299: public MessageAddress getAgentIdentifier() {
300: return localAgent;
301: }
302:
303: public void onDispatch(MessageAddress destinationNode) {
304: announce_move(destinationNode);
305: }
306:
307: public void setState(Object o) {
308: set_state(o);
309: }
310:
311: public Object getState() {
312: return get_state();
313: }
314:
315: // forward all agent state transitions
316: public void initialize() {
317: agentModel.initialize();
318: }
319:
320: public void load() {
321: agentModel.load();
322: }
323:
324: public void start() {
325: agentModel.start();
326: }
327:
328: public void suspend() {
329: agentModel.suspend();
330: }
331:
332: public void resume() {
333: agentModel.resume();
334: }
335:
336: public void stop() {
337: agentModel.stop();
338: }
339:
340: public void halt() {
341: agentModel.halt();
342: }
343:
344: public void unload() {
345: agentModel.unload();
346: }
347:
348: public int getModelState() {
349: return agentModel.getModelState();
350: }
351: };
352:
353: mobilityService = (MobilityService) sb.getService(
354: mobilityClient, MobilityService.class, null);
355: if (mobilityService == null) {
356: if (log.isInfoEnabled()) {
357: log.info("Unable to obtain MobilityService"
358: + ", mobility is disabled");
359: }
360: }
361: }
362:
363: private void unregister_mobility() {
364: if (mobilityService != null) {
365: sb.releaseService(mobilityClient, MobilityService.class,
366: mobilityService);
367: mobilityService = null;
368: mobilityClient = null;
369: }
370:
371: if (agentModel != null) {
372: sb.releaseService(this , AgentStateModelService.class,
373: agentModel);
374: agentModel = null;
375: }
376: }
377:
378: private void rehydrate(PersistenceObject pObj) {
379: psfa.rehydrate(pObj);
380: }
381:
382: private void persist_blackboard() {
383: // suspending after "announce_move", so capture blackboard state as
384: // our mobile persistenceObject.
385: BlackboardForAgent bb = (BlackboardForAgent) sb.getService(
386: this , BlackboardForAgent.class, null);
387: if (bb == null) {
388: throw new RuntimeException(
389: "Unable to obtain BlackboardForAgent"
390: + ", required for mobility persist");
391: }
392: persistenceObject = bb.getPersistenceObject();
393: sb.releaseService(this , BlackboardForAgent.class, bb);
394: bb = null;
395: }
396:
397: private void announce_move(MessageAddress destinationNode) {
398: is_moving = true;
399:
400: if (log.isInfoEnabled()) {
401: log.info("Moving agent from " + localNode + " to "
402: + destinationNode);
403: }
404:
405: // probably safe, but synchronize & copy anyways
406: List l;
407: synchronized (moveWatchers) {
408: l = new ArrayList(moveWatchers);
409: }
410: for (int i = 0, n = l.size(); i < n; i++) {
411: MobilityNotificationClient mnc = (MobilityNotificationClient) l
412: .get(i);
413: mnc.movingTo(destinationNode);
414: }
415: }
416:
417: private void add_move_watcher(MobilityNotificationClient mnc) {
418: synchronized (moveWatchers) {
419: moveWatchers.add(mnc);
420: }
421: }
422:
423: private void remove_move_watcher(MobilityNotificationClient mnc) {
424: synchronized (moveWatchers) {
425: moveWatchers.remove(mnc);
426: }
427: }
428:
429: private final class MobilityNotificationSP implements
430: ServiceProvider {
431: public Object getService(ServiceBroker sb, Object requestor,
432: Class serviceClass) {
433: if (!(MobilityNotificationService.class
434: .isAssignableFrom(serviceClass))) {
435: return null;
436: }
437: if (!(requestor instanceof MobilityNotificationClient)) {
438: throw new IllegalArgumentException(
439: "Requestor must implement MobilityNotificationClient");
440: }
441: MobilityNotificationClient mnc = (MobilityNotificationClient) requestor;
442: add_move_watcher(mnc);
443: return new MobilityNotificationS(mnc);
444: }
445:
446: public void releaseService(ServiceBroker sb, Object requestor,
447: Class serviceClass, Object service) {
448: if (service instanceof MobilityNotificationS) {
449: MobilityNotificationClient mnc = ((MobilityNotificationS) service).mnc;
450: remove_move_watcher(mnc);
451: }
452: }
453: }
454:
455: private static final class MobilityNotificationS implements
456: MobilityNotificationService {
457: public final MobilityNotificationClient mnc;
458:
459: public MobilityNotificationS(MobilityNotificationClient mnc) {
460: this .mnc = mnc;
461: }
462: // nothing to do
463: }
464:
465: private final class RehydrateLoadSP implements ServiceProvider {
466: private final RehydrateLoadService rls;
467:
468: public RehydrateLoadSP() {
469: rls = new RehydrateLoadService() {
470: public void rehydrate() {
471: load_late();
472: }
473: };
474: }
475:
476: public Object getService(ServiceBroker sb, Object requestor,
477: Class serviceClass) {
478: if (RehydrateLoadService.class
479: .isAssignableFrom(serviceClass)) {
480: return rls;
481: } else {
482: return null;
483: }
484: }
485:
486: public void releaseService(ServiceBroker sb, Object requestor,
487: Class serviceClass, Object service) {
488: }
489: }
490: }
|