001: /*
002: * <copyright>
003: *
004: * Copyright 2001-2004 Mobile Intelligence Corp
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026: package org.cougaar.community;
027:
028: import java.util.ArrayList;
029: import java.util.List;
030:
031: import org.cougaar.core.agent.service.alarm.Alarm;
032: import org.cougaar.core.blackboard.BlackboardClientComponent;
033: import org.cougaar.core.component.BindingSite;
034: import org.cougaar.core.component.ServiceAvailableEvent;
035: import org.cougaar.core.component.ServiceAvailableListener;
036: import org.cougaar.core.component.ServiceBroker;
037: import org.cougaar.core.service.AgentIdentificationService;
038: import org.cougaar.core.service.AlarmService;
039: import org.cougaar.core.service.BlackboardService;
040: import org.cougaar.core.service.LoggingService;
041: import org.cougaar.core.service.SchedulerService;
042:
043: /**
044: * BlackboardClient base class used by CommunityService components that require
045: * blackboard access. Primarily used to send Relays between agents and
046: * remote community manager.
047: */
048: public class BlackboardClient extends BlackboardClientComponent {
049:
050: // Supported BB operations
051: public static final int ADD = 0;
052: public static final int CHANGE = 1;
053: public static final int REMOVE = 2;
054:
055: protected LoggingService logger;
056: protected long TIMER_INTERVAL = 5 * 1000;
057: private BBWakeAlarm wakeAlarm;
058: private final List addQueue = new ArrayList(5);
059: private final List changeQueue = new ArrayList(5);
060: private final List removeQueue = new ArrayList(5);
061:
062: private ServiceAvailableListener serviceListener;
063:
064: public BlackboardClient(BindingSite bs) {
065: try {
066: setBindingSite(bs);
067: if (servicesAvailable()) {
068: init();
069: } else {
070: serviceListener = new ServiceAvailableListener() {
071: public void serviceAvailable(
072: ServiceAvailableEvent sae) {
073: //if (sae.getService().equals(BlackboardService.class)) {
074: init();
075: //}
076: }
077: };
078: getServiceBroker().addServiceListener(serviceListener);
079: }
080: } catch (Exception ex) {
081: ex.printStackTrace();
082: }
083: }
084:
085: protected AlarmService getAlarmService() {
086: if (alarmService == null) {
087: setAlarmService((AlarmService) getServiceBroker()
088: .getService(this , AlarmService.class, null));
089: }
090: return alarmService;
091: }
092:
093: private boolean servicesAvailable() {
094: ServiceBroker sb = getServiceBroker();
095: boolean servicesAvailable = sb
096: .hasService(org.cougaar.core.service.AlarmService.class)
097: && sb
098: .hasService(org.cougaar.core.service.SchedulerService.class)
099: && sb
100: .hasService(org.cougaar.core.service.AgentIdentificationService.class)
101: && sb
102: .hasService(org.cougaar.core.service.LoggingService.class)
103: && sb
104: .hasService(org.cougaar.core.service.BlackboardService.class);
105: return servicesAvailable;
106: }
107:
108: static Object lock = new Object();
109:
110: /**
111: * Set essential services and invoke GenericStateModel methods.
112: */
113: private void init() {
114: synchronized (lock) {
115: if (servicesAvailable() && getModelState() == -1) {
116: //System.out.println("init: " + agentId);
117: ServiceBroker sb = getServiceBroker();
118: setSchedulerService((SchedulerService) sb.getService(
119: this , SchedulerService.class, null));
120: setAgentIdentificationService((AgentIdentificationService) sb
121: .getService(this ,
122: AgentIdentificationService.class, null));
123: logger = (LoggingService) sb.getService(this ,
124: LoggingService.class, null);
125: logger = org.cougaar.core.logging.LoggingServiceWithPrefix
126: .add(logger, agentId + ": ");
127: setAlarmService((AlarmService) sb.getService(this ,
128: AlarmService.class, null));
129: setBlackboardService((BlackboardService) sb.getService(
130: this , BlackboardService.class, null));
131: blackboard = (BlackboardService) sb.getService(this ,
132: BlackboardService.class, null);
133: initialize();
134: load();
135: start();
136: if (serviceListener != null) {
137: sb.removeServiceListener(serviceListener);
138: }
139: }
140: }
141: }
142:
143: public void publish(Object obj, int type) {
144: if (logger.isDetailEnabled()) {
145: logger.detail("publish: type=" + type + " obj=" + obj);
146: }
147: switch (type) {
148: case ADD:
149: synchronized (addQueue) {
150: addQueue.add(obj);
151: }
152: break;
153: case CHANGE:
154: synchronized (changeQueue) {
155: changeQueue.add(obj);
156: }
157: break;
158: case REMOVE:
159: synchronized (removeQueue) {
160: removeQueue.add(obj);
161: }
162: break;
163: }
164: if (blackboard != null) {
165: blackboard.signalClientActivity();
166: } else {
167: //if (logger.isDetailEnabled()) {
168: logger.info("Blackboard not available, retrying in "
169: + TIMER_INTERVAL + "ms");
170: //}
171: AlarmService as = getAlarmService();
172: if (as != null) {
173: // Start timer to check service availability later
174: wakeAlarm = new BBWakeAlarm(System.currentTimeMillis()
175: + TIMER_INTERVAL);
176: as.addRealTimeAlarm(wakeAlarm);
177: }
178: }
179: }
180:
181: /**
182: * Process queued requests.
183: */
184: private void fireAll() {
185: if (logger.isDetailEnabled()) {
186: logger.detail("fireall:" + " add(" + addQueue.size() + ")"
187: + " change(" + changeQueue.size() + ")"
188: + " remove(" + removeQueue.size() + ")");
189: }
190: fire(addQueue, ADD);
191: fire(changeQueue, CHANGE);
192: fire(removeQueue, REMOVE);
193: }
194:
195: private void fire(List queue, int type) {
196: int n;
197: List l;
198: synchronized (queue) {
199: n = queue.size();
200: if (n <= 0 || blackboard == null) {
201: return;
202: }
203: l = new ArrayList(queue);
204: queue.clear();
205: }
206: for (int i = 0; i < n; i++) {
207: switch (type) {
208: case ADD:
209: blackboard.publishAdd(l.get(i));
210: if (logger.isDebugEnabled()) {
211: logger.debug("publishAdd: " + l.get(i));
212: }
213: break;
214: case CHANGE:
215: blackboard.publishChange(l.get(i));
216: if (logger.isDebugEnabled()) {
217: logger.debug("publishChange: " + l.get(i));
218: }
219: break;
220: case REMOVE:
221: blackboard.publishRemove(l.get(i));
222: if (logger.isDebugEnabled()) {
223: logger.debug("publishRemove: " + l.get(i));
224: }
225: break;
226: }
227: }
228: }
229:
230: public void setupSubscriptions() {
231: }
232:
233: public void execute() {
234: fireAll(); // Published queued requests
235: }
236:
237: // Timer for periodically checking blackboard availability.
238: // Blackboard activity is signaled once the blackboard service is available
239: // to check for queued requests
240: public class BBWakeAlarm implements Alarm {
241: private long expiresAt;
242: private boolean expired = false;
243:
244: public BBWakeAlarm(long expirationTime) {
245: expiresAt = expirationTime;
246: }
247:
248: public long getExpirationTime() {
249: return expiresAt;
250: }
251:
252: public synchronized void expire() {
253: if (!expired) {
254: expired = true;
255: if (blackboard != null) {
256: blackboard.signalClientActivity();
257: } else { // Not ready yet, wait for awhile
258: wakeAlarm = new BBWakeAlarm(System
259: .currentTimeMillis()
260: + TIMER_INTERVAL);
261: alarmService.addRealTimeAlarm(wakeAlarm);
262: }
263: }
264: }
265:
266: public boolean hasExpired() {
267: return expired;
268: }
269:
270: public synchronized boolean cancel() {
271: boolean was = expired;
272: expired = true;
273: return was;
274: }
275: }
276:
277: }
|