001: /*
002: * <copyright>
003: *
004: * Copyright 2001-2004 Mobile Intelligence Corp
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026:
027: package org.cougaar.community;
028:
029: import java.util.ArrayList;
030: import java.util.Collections;
031: import java.util.List;
032:
033: import javax.naming.directory.ModificationItem;
034:
035: import org.cougaar.core.component.ServiceBroker;
036: import org.cougaar.core.service.AlarmService;
037: import org.cougaar.core.service.LoggingService;
038: import org.cougaar.core.service.AgentIdentificationService;
039: import org.cougaar.core.agent.service.alarm.Alarm;
040: import org.cougaar.core.mts.MessageAddress;
041:
042: import org.cougaar.util.log.Logger;
043:
044: import org.cougaar.core.service.community.CommunityResponseListener;
045: import org.cougaar.core.service.community.Entity;
046:
047: /**
048: * Queue for processing Community Requests at a future time. This is
049: * typically used to hold requests that have failed and are to be retried.
050: * For instance, a WP lookup.
051: */
052: public class CommunityRequestQueue {
053:
054: private List queue = Collections.synchronizedList(new ArrayList());
055: private RequestQueueTimer timer;
056: private DefaultCommunityServiceImpl commSvc;
057: private Logger logger;
058: private String agentName;
059: private ServiceBroker serviceBroker;
060:
061: public CommunityRequestQueue(ServiceBroker sb,
062: DefaultCommunityServiceImpl dcs) {
063: serviceBroker = sb;
064: commSvc = dcs;
065: agentName = getAgentName();
066: logger = (LoggingService) serviceBroker.getService(this ,
067: LoggingService.class, null);
068: }
069:
070: public synchronized void add(long delay, String communityName,
071: int requestType, Entity entity,
072: ModificationItem[] attrMods, long timeout,
073: CommunityResponseListener crl) {
074: if (logger.isDebugEnabled()) {
075: logger.debug(agentName + ": add:" + "delay=" + delay
076: + " community=" + communityName + " type="
077: + requestType + " entity=" + entity);
078: }
079: QueuedRequest req = new QueuedRequest(now() + delay,
080: communityName, requestType, entity, attrMods, timeout,
081: crl);
082: queue.add(req);
083: execute();
084: }
085:
086: protected void execute() {
087: if (logger.isDetailEnabled()) {
088: logger.detail(agentName + ": execute:" + " itemsInQueue="
089: + queue.size());
090: }
091: if (timer != null && !timer.hasExpired()) {
092: timer.cancel();
093: }
094: int n;
095: List l;
096: synchronized (queue) {
097: n = queue.size();
098: if (n <= 0) {
099: return;
100: }
101: l = new ArrayList(queue);
102: queue.clear();
103: }
104: long now = now();
105: long fireAt = 0;
106: for (int i = 0; i < n; i++) {
107: QueuedRequest req = (QueuedRequest) l.get(i);
108: if (req.processAt <= now) {
109: if (logger.isDebugEnabled()) {
110: logger.debug(agentName + ": sendCommunityRequest:"
111: + " community=" + req.communityName
112: + " type=" + req.type + " entity="
113: + req.entity);
114: }
115: commSvc.sendCommunityRequest(req.communityName,
116: req.type, req.entity, req.mods, req.timeout,
117: req.crl);
118: } else { // not yet
119: if (fireAt == 0 || req.processAt < fireAt) {
120: fireAt = req.processAt;
121: }
122: queue.add(req);
123: }
124: }
125: if (fireAt > 0) {
126: timer = new RequestQueueTimer(fireAt);
127: AlarmService alarmService = (AlarmService) serviceBroker
128: .getService(this , AlarmService.class, null);
129: alarmService.addRealTimeAlarm(timer);
130: serviceBroker.releaseService(this , AlarmService.class,
131: alarmService);
132: }
133: }
134:
135: protected String getAgentName() {
136: AgentIdentificationService ais = (AgentIdentificationService) serviceBroker
137: .getService(this , AgentIdentificationService.class,
138: null);
139: MessageAddress addr = ais.getMessageAddress();
140: serviceBroker.releaseService(this ,
141: AgentIdentificationService.class, ais);
142: return addr.toString();
143: }
144:
145: private long now() {
146: return System.currentTimeMillis();
147: }
148:
149: class QueuedRequest {
150: long processAt;
151: String communityName;
152: int type;
153: long timeout;
154: Entity entity;
155: ModificationItem[] mods;
156: CommunityResponseListener crl;
157:
158: QueuedRequest(long time, String cname, int t, Entity e,
159: ModificationItem[] m, long to,
160: CommunityResponseListener l) {
161: processAt = time;
162: communityName = cname;
163: type = t;
164: entity = e;
165: mods = m;
166: this .timeout = to;
167: crl = l;
168: }
169: }
170:
171: protected class RequestQueueTimer implements Alarm {
172: private long expiresAt;
173: private boolean expired = false;
174:
175: public RequestQueueTimer(long expirationTime) {
176: expiresAt = expirationTime;
177: }
178:
179: public long getExpirationTime() {
180: return expiresAt;
181: }
182:
183: public synchronized void expire() {
184: if (!expired) {
185: expired = true;
186: execute();
187: }
188: }
189:
190: public boolean hasExpired() {
191: return expired;
192: }
193:
194: public synchronized boolean cancel() {
195: boolean was = expired;
196: expired = true;
197: return was;
198: }
199: }
200:
201: }
|