001: /**
002: * <copyright>
003: *
004: * Copyright 2002-2004 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */package org.cougaar.pizza.plugin;
026:
027: import org.cougaar.core.agent.service.alarm.Alarm;
028: import org.cougaar.core.blackboard.IncrementalSubscription;
029: import org.cougaar.core.mts.MessageAddress;
030: import org.cougaar.core.plugin.ComponentPlugin;
031: import org.cougaar.core.plugin.PluginAlarm;
032: import org.cougaar.core.relay.Relay;
033: import org.cougaar.core.service.BlackboardService;
034: import org.cougaar.core.service.LoggingService;
035: import org.cougaar.core.service.UIDService;
036: import org.cougaar.core.util.UID;
037: import org.cougaar.multicast.AttributeBasedAddress;
038: import org.cougaar.pizza.Constants;
039: import org.cougaar.pizza.relay.RSVPRelaySource;
040: import org.cougaar.pizza.plugin.util.PizzaPreferenceHelper;
041: import org.cougaar.planning.ldm.asset.Entity;
042:
043: import org.cougaar.util.Arguments;
044: import org.cougaar.util.UnaryPredicate;
045:
046: import java.util.Collection;
047: import java.util.Date;
048: import java.util.Enumeration;
049: import java.util.Iterator;
050:
051: /**
052: * Sends a simple relay invitation to all "FriendsOfMark" (members of the community),
053: * whose responses are automatically collected in the PizzaPreferences object.
054: * <p/>
055: * Waits for a set amount of time, WAIT_FOR_RSVP_DURATION, until it
056: * publishes the pizza preference list to the blackboard. While it's
057: * waiting, replies come back from invitees and update the PizzaPreferences
058: * object in memory.
059: * <p>
060: * It must wait because people may take a while to join the FriendsOfMark community,
061: * and the PlaceOrderPlugin will place the orders as soon as the PizzaPreferences
062: * object is published.
063: * <p>
064: * An alternate way to do this instead of an alarm would be to tell this Plugin in
065: * advance how many people to expect, or allow it to publishChange the
066: * PizzaPreferences object.
067: * @see PizzaPreferences
068: * @see PlaceOrderPlugin
069: */
070: public class InvitePlugin extends ComponentPlugin {
071:
072: private static final String ATTRIBUTE_TYPE = "Role";
073: private static final String ATTRIBUTE_VALUE = "Member";
074:
075: private LoggingService log;
076:
077: /**
078: * the uid service gives me the uid for the relay with a unique id
079: */
080: private UIDService uids;
081:
082: /** initialize args to the empty instance */
083: private Arguments args = Arguments.EMPTY_INSTANCE;
084:
085: /**
086: * my subscription to Relays
087: */
088: private IncrementalSubscription relaySubscription;
089:
090: /**
091: * my subscription to Entities
092: */
093: private IncrementalSubscription entitySubscription;
094:
095: /**
096: * A timer for recurrent events. All access should be synchronized
097: * on timerLock
098: */
099: private Alarm timer = null;
100:
101: /** Lock for accessing timer */
102: private final Object timerLock = new Object();
103:
104: /**
105: * my list of pizza preferences generated from RSVPs
106: */
107: protected PizzaPreferences pizzaPreferences;
108:
109: /**
110: * How long to wait before publish preferences
111: */
112: protected long waitForRSVPDuration = 45000;
113:
114: /**
115: * Have we published preferences
116: */
117: protected boolean publishedPreferences = false;
118:
119: /** "setParameter" is only called if a plugin has parameters */
120: public void setParameter(Object o) {
121: args = new Arguments(o);
122: }
123:
124: public void setLoggingService(LoggingService log) {
125: this .log = log;
126: // Note that by default all logging calls start with our agent name
127: }
128:
129: public void setUIDService(UIDService uids) {
130: this .uids = uids;
131: }
132:
133: /**
134: * Looks for argument to plugin like: "WAIT_FOR_RSVP_DURATION:60000".
135: * <p/>
136: * <pre>
137: * <p/>
138: * For example :
139: * <p/>
140: * <component
141: * name='org.cougaar.pizza.plugin.InvitePlugin'
142: * class='org.cougaar.pizza.plugin.InvitePlugin'
143: * priority='COMPONENT'
144: * insertionpoint='Node.AgentManager.Agent.PluginManager.Plugin'>
145: * <argument>
146: * WAIT_FOR_RSVP_DURATION:60000
147: * </argument>
148: * </component>
149: * <p/>
150: * </pre>
151: *
152: * @return millis to wait
153: */
154: protected long getWaitParameter() {
155: // get wait parameter
156: long waitParam = args.getLong("WAIT_FOR_RSVP_DURATION",
157: waitForRSVPDuration);
158: return waitParam;
159: }
160:
161: /**
162: * We have one subscription, to the relays (the invitation) we produce.
163: * <p/>
164: * Here we also publish the pizza preferences. Initially it only holds
165: * the preference for Alice, the inviting agent.
166: * <p/>
167: * Sets a timer that fires when we have waited long enough for responses
168: * to return.
169: */
170: protected void setupSubscriptions() {
171: if (log.isDebugEnabled())
172: log.debug("setupSubscriptions");
173:
174: // get wait parameter
175: waitForRSVPDuration = getWaitParameter();
176:
177: // create relay subscription
178: relaySubscription = (IncrementalSubscription) blackboard
179: .subscribe(RELAYSOURCEPRED);
180:
181: // create entity subscription
182: entitySubscription = (IncrementalSubscription) blackboard
183: .subscribe(ENTITYPRED);
184:
185: // If this agent moves or restarts, its possible we've already
186: // invited people to the party. Get any pre-published PizzaPreferences.
187: getPizzaPreferencesFromBB();
188:
189: // If we havent already published the pizzaPreferences,
190: // then set a timer, to allow RSVPs to come in, before
191: // we publish it
192: if (!publishedPreferences) {
193: // wait for a time for responses to get back
194: if (log.isInfoEnabled()) {
195: log.info("Waiting " + (waitForRSVPDuration / 1000)
196: + " seconds before publishing pizza prefs.");
197: }
198:
199: startTimer(waitForRSVPDuration);
200: } else {
201: // Must have restarted when this plugins job was done
202: if (log.isInfoEnabled())
203: log
204: .info("Restarting when RSVPs already collected and published.");
205: }
206: }
207:
208: /**
209: * Query the Blackboard for a pre-existing PizzaPreferences object.
210: * This might happen if the agent moved or restarted.
211: * If it is found, record it and the fact it was already published.
212: * If not, check for an RSVPRelaySource off of which to grab the
213: * (unpublished) PizzaPreferences.
214: */
215: private void getPizzaPreferencesFromBB() {
216: Collection pprefs = blackboard.query(new UnaryPredicate() {
217: public boolean execute(Object o) {
218: return o instanceof PizzaPreferences;
219: }
220: });
221: if (!pprefs.isEmpty()) {
222: this .pizzaPreferences = (PizzaPreferences) pprefs
223: .iterator().next();
224: publishedPreferences = true;
225: } else if (!relaySubscription.isEmpty()) {
226: // If get here, haven't yet published the PizzaPreferences. But
227: // have published a Relay with a PizzaPreferences object. Grab it back.
228: RSVPRelaySource rSource = (RSVPRelaySource) relaySubscription
229: .first();
230: this .pizzaPreferences = rSource.getPizzaPrefs();
231: }
232: }
233:
234: /**
235: * Onc we have the self org, publish the invite RSVPRelay if we haven't yet.
236: * When the timer expires, assume all replies have come in, and publish the collected
237: * PizzaPreferences object for the PlaceOrderPlugin.
238: */
239: protected void execute() {
240: // When we have the self org, but havent yet created our
241: // reply catalog, we need to invite people to the party
242: if (!entitySubscription.isEmpty() && (pizzaPreferences == null)) {
243: // When we publish the relay, we create a pizza preference locally (not published).
244: // That PizzaPreferences is auto-updated by the Relay responses. Publishing
245: // it is just to let the PlaceOrderPlugin know it can start working
246: publishRelay();
247: }
248:
249: // Wait for everyone to register with the community,
250: // so my ABA Relay gets to them, and then they reply, so it is safe to
251: // assume we have all the replies. Then publish the pizza preferences object so
252: // PlaceOrderPlugin can order the pizza.
253: checkTimer();
254: } // end of execute()
255:
256: /**
257: * Create a PizzaPreferences object to collect local results, and
258: * publish my RSVP relay inviting people to the party. The relay itself
259: * will update the PizzaPreferences object as replies come in.
260: */
261: protected void publishRelay() {
262: // Create recipient addresses: a multicast address, going to all the
263: // members of my community of friends. The infrastructure will take care
264: // of ensuring that as people join the community, they get a copy of
265: // any message sent to this ABA. You just have to wait for people
266: // to join the community.
267: MessageAddress target = AttributeBasedAddress
268: .getAttributeBasedAddress(Constants.COMMUNITY,
269: ATTRIBUTE_TYPE, ATTRIBUTE_VALUE);
270:
271: // create pizza preferences - updated as RSVPs arrive
272: this .pizzaPreferences = new PizzaPreferences(uids.nextUID());
273:
274: // There may be several entities on the blackboard
275: // we want the one that represents the inviting agent
276: Entity selfEntity = getSelfEntity();
277:
278: if (selfEntity == null && log.isWarnEnabled()) {
279: log.warn("Could not find self entity. It's needed to find "
280: + "out my pizza preference.");
281: }
282:
283: // Get the inviters preference
284: String preference = PizzaPreferenceHelper.getPizzaPreference(
285: log, selfEntity);
286:
287: // And record it.
288: pizzaPreferences.addFriendToPizza(agentId.toString(),
289: preference);
290:
291: // send invitation to the ABA
292: Relay sourceRelay = new RSVPRelaySource(uids.nextUID(), target,
293: Constants.INVITATION_QUERY, pizzaPreferences);
294:
295: log.shout("Sending `" + Constants.INVITATION_QUERY
296: + "' to my Buddy list: " + Constants.COMMUNITY);
297:
298: blackboard.publishAdd(sourceRelay);
299: }
300:
301: /**
302: * Returns the Entity representing the agent. Checks the entity subscription and
303: * returns the first element. In this example, there should be only one self entity.
304: * @return local Entity, null if none yet
305: */
306: protected Entity getSelfEntity() {
307: // See if we have any yet -- we should
308: if (entitySubscription.isEmpty()) {
309: log
310: .error("Entity subscription is empty, this should not happen!!!!");
311: return null;
312: }
313:
314: // Look for the local Entity
315: for (Enumeration entities = entitySubscription.elements(); entities
316: .hasMoreElements();) {
317: Entity ent = (Entity) entities.nextElement();
318: if (ent.isLocal())
319: return ent;
320: }
321: return null;
322: }
323:
324: /**
325: * If the timer has expired, assume all the members of the community have joined,
326: * gotten the ABA targeted RSVP Relay, and replied. So publish the
327: * PizzaPreferences object, so the PlaceOrderPlugin can begin.
328: */
329: protected void checkTimer() {
330: if (timerExpired()) {
331: Collection relays = relaySubscription.getCollection();
332: if (publishedPreferences) {
333: if (log.isDebugEnabled()) {
334: log
335: .debug("We already published the invite list, nothing to do.");
336: // Note that the relaySubscription should be empty too, since
337: // we remove the relay when we're done
338: }
339: } else {
340: if (relays.isEmpty()) {
341: log
342: .error("Expecting an RSVPrelay, since the timer has expired.");
343: return;
344: }
345:
346: // Get our (single) published relay
347: RSVPRelaySource sourceRelay = (RSVPRelaySource) relays
348: .iterator().next();
349:
350: if (log.isInfoEnabled()) {
351: log
352: .info("We've waited "
353: + (waitForRSVPDuration / 1000)
354: + " seconds, so we're publishing the preference list.");
355:
356: log.info("Removing source relay: " + sourceRelay
357: + ", and adding pizza preferences: "
358: + pizzaPreferences);
359: }
360:
361: // We're done with the relay -- and any agent which starts up late
362: // and joins the community late is too late to come to the party.
363: // So remove the relay. In other applications, we could leave it.
364: blackboard.publishRemove(sourceRelay);
365:
366: // Publish our final set of attendees with their pizza preferences,
367: // so the PlaceOrderPlugin knows what to get
368: blackboard.publishAdd(pizzaPreferences);
369:
370: log.shout("RSVP time is up. Got " + pizzaPreferences);
371:
372: // Note that we've finished.
373: publishedPreferences = true;
374: }
375: } else {
376: if (log.isInfoEnabled()) {
377: log
378: .info("Timer not expired so not publishing Preferences yet. "
379: + "Now: "
380: + new Date()
381: + ", Timer expiration time is: "
382: + new Date(getTimerExpirationTime()));
383: }
384: }
385: }
386:
387: /**
388: * Schedule a update wakeup after some interval of time
389: * @param delay how long to delay before the timer expires.
390: */
391: protected void startTimer(long delay) {
392: synchronized (timerLock) {
393: // if (logger.isDebugEnabled()) logger.debug("Starting timer " + delay);
394: if (getBlackboardService() == null && log != null
395: && log.isWarnEnabled()) {
396: log
397: .warn("Started service alarm before the blackboard service"
398: + " is available");
399: }
400: timer = createAlarm(System.currentTimeMillis() + delay);
401: getAlarmService().addRealTimeAlarm(timer);
402: }
403: }
404:
405: private Alarm createAlarm(long time) {
406: return new PluginAlarm(time) {
407: public BlackboardService getBlackboardService() {
408: if (blackboard == null) {
409: if (log != null && log.isWarnEnabled()) {
410: log
411: .warn("Alarm to trigger at "
412: + (new Date(getExpirationTime()))
413: + " has expired,"
414: + " but the blackboard service is null. Plugin "
415: + " model state is "
416: + getModelState());
417: }
418: }
419: return blackboard;
420: }
421: };
422: }
423:
424: /**
425: * Test if the timer has expired.
426: * @return false if the timer is not running or has not yet expired
427: */
428: protected boolean timerExpired() {
429: synchronized (timerLock) {
430: return timer != null && timer.hasExpired();
431: }
432: }
433:
434: /** When will (has) the timer expire(d)? */
435: protected long getTimerExpirationTime() {
436: synchronized (timerLock) {
437: if (timer != null) {
438: return timer.getExpirationTime();
439: } else {
440: return 0;
441: }
442: }
443: }
444:
445: /**
446: * Single static predicate that matches RSVPRelaySource objects.
447: * Use a static singleton since we only need one.
448: */
449: private static final UnaryPredicate RELAYSOURCEPRED = new UnaryPredicate() {
450: public boolean execute(Object o) {
451: return (o instanceof RSVPRelaySource);
452: }
453: };
454:
455: /**
456: * Subscribe to Entities, in order to find the self Entity.
457: */
458: private static final UnaryPredicate ENTITYPRED = new UnaryPredicate() {
459: public boolean execute(Object o) {
460: return o instanceof Entity;
461: }
462: };
463: }
|