001: /*
002: * <copyright>
003: *
004: * Copyright 2002-2004 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026:
027: package org.cougaar.core.wp.bootstrap;
028:
029: import java.util.ArrayList;
030: import java.util.Collection;
031: import java.util.Collections;
032: import java.util.HashMap;
033: import java.util.Iterator;
034: import java.util.List;
035: import java.util.Map;
036: import org.cougaar.core.component.Component;
037: import org.cougaar.core.component.ServiceBroker;
038: import org.cougaar.core.component.ServiceRevokedListener;
039: import org.cougaar.core.mts.MessageAddress;
040: import org.cougaar.core.service.AgentIdentificationService;
041: import org.cougaar.core.service.LoggingService;
042: import org.cougaar.core.service.ThreadService;
043: import org.cougaar.core.thread.Schedulable;
044: import org.cougaar.util.GenericStateModelAdapter;
045:
046: /**
047: * This component is a base class for (server) bootstrap advertisers.
048: * <p>
049: * Per-protocol subclasses call {@link #addAdvertiser} to create
050: * an inner Advertiser class per bootstrap location (e.g. an
051: * advertiser for URL http://foo.com:123 and http://bar.com:456).
052: * These advertisers are told to start/stop according to the
053: * AdvertiseService. Advertisers are also told when the local
054: * node's bundles change, in case this data must be posted to
055: * a remote location.
056: */
057: public abstract class AdvertiseBase extends GenericStateModelAdapter
058: implements Component {
059: protected ServiceBroker sb;
060:
061: protected LoggingService log;
062: protected MessageAddress agentId;
063: protected ThreadService threadService;
064:
065: private AdvertiseService advertiseService;
066:
067: protected String agentName;
068:
069: private final Object lock = new Object();
070:
071: // Map<String, Bundle>
072: protected Map bundles = Collections.EMPTY_MAP;
073:
074: // Map<String, Advertiser>
075: protected final Map advertisers = new HashMap();
076:
077: private final AdvertiseService.Client advertiseClient = new AdvertiseService.Client() {
078: public void add(String name, Bundle bundle) {
079: AdvertiseBase.this .add(name, bundle);
080: }
081:
082: public void change(String name, Bundle bundle) {
083: AdvertiseBase.this .change(name, bundle);
084: }
085:
086: public void remove(String name, Bundle bundle) {
087: AdvertiseBase.this .remove(name);
088: }
089: };
090:
091: public void setServiceBroker(ServiceBroker sb) {
092: this .sb = sb;
093: }
094:
095: public void setLoggingService(LoggingService log) {
096: this .log = log;
097: }
098:
099: public void setThreadService(ThreadService threadService) {
100: this .threadService = threadService;
101: }
102:
103: public void load() {
104: super .load();
105:
106: // which agent are we in?
107: AgentIdentificationService ais = (AgentIdentificationService) sb
108: .getService(this , AgentIdentificationService.class,
109: null);
110: agentId = ais.getMessageAddress();
111: sb.releaseService(this , AgentIdentificationService.class, ais);
112: agentName = agentId.getAddress();
113:
114: // register our advertise client
115: advertiseService = (AdvertiseService) sb.getService(
116: advertiseClient, AdvertiseService.class, null);
117: if (advertiseService == null) {
118: throw new RuntimeException(
119: "Unable to obtain AdvertiseService");
120: }
121: }
122:
123: public void unload() {
124: removeAllAdvertisers();
125:
126: if (advertiseService != null) {
127: sb.releaseService(advertiseClient, AdvertiseService.class,
128: advertiseService);
129: advertiseService = null;
130: }
131:
132: if (threadService != null) {
133: // halt our threads?
134: sb.releaseService(this , ThreadService.class, threadService);
135: threadService = null;
136: }
137: if (log != null && log != LoggingService.NULL) {
138: sb.releaseService(this , LoggingService.class, log);
139: log = LoggingService.NULL;
140: }
141:
142: super .unload();
143: }
144:
145: /**
146: * Create a poller for a bootstrap location.
147: * <p>
148: * This method can be overridden to use a custom Advertiser class.
149: */
150: protected abstract Advertiser createAdvertiser(Object bootObj);
151:
152: /**
153: * Get the current bundles.
154: */
155: protected Map getBundles() {
156: synchronized (lock) {
157: return bundles;
158: }
159: }
160:
161: private void add(String name, Bundle bundle) {
162: update(name, bundle);
163: }
164:
165: private void change(String name, Bundle bundle) {
166: update(name, bundle);
167: }
168:
169: private void remove(String name) {
170: update(name, null);
171: }
172:
173: protected void update(String name, Bundle bundle) {
174: synchronized (lock) {
175: // update bundles
176: Bundle b = (Bundle) bundles.get(name);
177: if (bundle == null ? b == null : bundle.equals(b)) {
178: // no change?
179: return;
180: }
181: // copy-on-write
182: Map nb = new HashMap(bundles);
183: if (bundle == null) {
184: nb.remove(name);
185: } else {
186: nb.put(name, bundle);
187: }
188: bundles = Collections.unmodifiableMap(nb);
189: if (advertisers.isEmpty()) {
190: return;
191: }
192: if (log.isDetailEnabled()) {
193: log.detail("updating name=" + name + " bundle="
194: + bundle);
195: }
196: // tell our advertisers
197: for (Iterator iter = advertisers.values().iterator(); iter
198: .hasNext();) {
199: Advertiser a = (Advertiser) iter.next();
200: a.updateLater(name, bundle);
201: }
202: }
203: }
204:
205: protected Object getKey(Object bootObj) {
206: return bootObj;
207: }
208:
209: // this is called by the subclass
210: protected void addAdvertiser(Object bootObj) {
211: Advertiser a;
212: Object key = getKey(bootObj);
213: if (key == null) {
214: return;
215: }
216: synchronized (lock) {
217: a = (Advertiser) advertisers.get(key);
218: if (a != null) {
219: return;
220: }
221: if (log.isInfoEnabled()) {
222: log.info("Creating " + bootObj);
223: }
224: a = createAdvertiser(bootObj);
225: advertisers.put(key, a);
226: }
227: a.startLater();
228: }
229:
230: protected void removeAllAdvertisers() {
231: List l;
232: synchronized (lock) {
233: if (advertisers.isEmpty()) {
234: return;
235: }
236: l = new ArrayList(advertisers.values());
237: advertisers.clear();
238: if (log.isInfoEnabled()) {
239: for (int i = 0; i < l.size(); i++) {
240: log.info("Removing " + l.get(i));
241: }
242: }
243: }
244: for (int i = 0; i < l.size(); i++) {
245: Advertiser a = (Advertiser) l.get(i);
246: if (a != null) {
247: a.stopLater();
248: }
249: }
250: }
251:
252: // this is called by the subclass
253: protected void removeAdvertiser(Object bootObj) {
254: Advertiser a;
255: Object key = getKey(bootObj);
256: if (key == null) {
257: return;
258: }
259: synchronized (lock) {
260: a = (Advertiser) advertisers.remove(key);
261: if (a == null) {
262: return;
263: }
264: if (log.isInfoEnabled()) {
265: log.info("Removing " + bootObj);
266: }
267: }
268: a.stopLater();
269: }
270:
271: /**
272: * This manages the lookup and verification for a single bootObj.
273: */
274: protected abstract class Advertiser implements Runnable {
275:
276: //
277: // construction-time finals:
278: //
279:
280: private final Schedulable thread;
281:
282: protected final Object bootObj;
283:
284: private final List queue = new ArrayList();
285: private final List tmp = new ArrayList();
286:
287: private boolean active;
288:
289: public Advertiser(Object bootObj) {
290: this .bootObj = bootObj;
291:
292: this .thread = threadService.getThread(AdvertiseBase.this ,
293: this , "White pages bootstrap advertiser for "
294: + bootObj, ThreadService.WILL_BLOCK_LANE);
295: }
296:
297: /**
298: *
299: */
300: public abstract void start();
301:
302: /**
303: *
304: */
305: public abstract void update(String name, Bundle bundle);
306:
307: /**
308: *
309: */
310: public abstract void stop();
311:
312: // queuing -- we don't want to block the calling thread!
313:
314: public void startLater() {
315: enqueue(Start.getInstance());
316: }
317:
318: public void stopLater() {
319: enqueue(Stop.getInstance());
320: }
321:
322: public void updateLater(String name, Bundle bundle) {
323: enqueue(new Update(name, bundle));
324: }
325:
326: private void enqueue(Object o) {
327: synchronized (queue) {
328: queue.add(o);
329: }
330: thread.start();
331: }
332:
333: public void run() {
334: int n;
335: synchronized (queue) {
336: n = queue.size();
337: if (n > 0) {
338: tmp.addAll(queue);
339: queue.clear();
340: }
341: }
342: for (int i = 0; i < n; i++) {
343: Object o = tmp.get(i);
344: if (o instanceof Start) {
345: startNow();
346: } else if (o instanceof Stop) {
347: stopNow();
348: } else if (o instanceof Update) {
349: Update u = (Update) o;
350: updateNow(u.getName(), u.getBundle());
351: } else if (log.isErrorEnabled()) {
352: log.error("Invalid queue element: " + o);
353: }
354: }
355: tmp.clear();
356: }
357:
358: private void startNow() {
359: if (active) {
360: return;
361: }
362: if (log.isInfoEnabled()) {
363: log.info("Starting " + bootObj);
364: }
365: start();
366: active = true;
367: }
368:
369: private void stopNow() {
370: if (!active) {
371: return;
372: }
373: if (log.isInfoEnabled()) {
374: log.info("Stopping " + bootObj);
375: }
376: stop();
377: active = false;
378: }
379:
380: private void updateNow(String name, Bundle bundle) {
381: if (log.isInfoEnabled()) {
382: log.info("Updating " + bootObj + " with name=" + name
383: + " bundle=" + bundle);
384: }
385: update(name, bundle);
386: }
387: }
388:
389: //
390: // queue entries for our advertiser
391: //
392:
393: private interface QueueElement {
394: }
395:
396: private static class Start implements QueueElement {
397: private static final Start INSTANCE = new Start();
398:
399: public static Start getInstance() {
400: return INSTANCE;
401: }
402: }
403:
404: private static class Stop implements QueueElement {
405: private static final Stop INSTANCE = new Stop();
406:
407: public static Stop getInstance() {
408: return INSTANCE;
409: }
410: }
411:
412: private static class Update implements QueueElement {
413: private final String name;
414: private final Bundle bundle;
415:
416: public Update(String name, Bundle bundle) {
417: this .name = name;
418: this .bundle = bundle;
419: }
420:
421: public String getName() {
422: return name;
423: }
424:
425: public Bundle getBundle() {
426: return bundle;
427: }
428:
429: public String toString() {
430: return "(update name=" + name + " bundle=" + bundle + ")";
431: }
432: }
433: }
|