001: /*
002: * <copyright>
003: *
004: * Copyright 1997-2004 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026:
027: package org.cougaar.core.blackboard;
028:
029: import java.util.Iterator;
030: import java.util.LinkedList;
031:
032: import org.cougaar.util.log.Logger;
033:
034: /**
035: * The ReservationManager coordinates {@link
036: * org.cougaar.core.blackboard.Distributor} persistence to ensure
037: * that only one agent can persist at a time, and that an agent
038: * preparing to persist will not block other agents from
039: * persisting.
040: * <p>
041: * Persistence reservations indicate that a persistence instance
042: * wishes to take a snapshot of its agent. The reservations are held
043: * in a queue (FIFO). When a persistence instance reaches the head of
044: * the queue it has exclusive use of the persistence mechanism. The
045: * reservation will only be held for a certain interval and if not
046: * exercised or re-confirmed within that interval, it is cancelled.
047: * During this interval, the agent should be getting itself into a
048: * well-defined state so the persistence snapshot will be valid.
049: * <p>
050: * If at any time after reaching the head of the queue (and trying to
051: * reach a well-defined state), an agent discovers that its
052: * reservation has been cancelled, it should abandon its attempt to
053: * reach a well-defined state, continue execution, and try again
054: * later.
055: * <p>
056: * If a ReservationManager is created with a timeout of 0, the manager
057: * is effectively disabled. This means that all requests and commits
058: * are satisfied unconditionally, and waitFor and release return
059: * immediately and do nothing. Also no storage is allocated.
060: */
061: public class ReservationManager {
062: private LinkedList queue = null;
063: private long timeout;
064: private boolean committed;
065:
066: private class Item {
067: private Object obj;
068: private long expires;
069:
070: public Item(Object p, long now) {
071: obj = p;
072: updateTimestamp(now);
073: }
074:
075: public boolean hasExpired(long now) {
076: return expires <= now;
077: }
078:
079: public void updateTimestamp(long now) {
080: expires = now + timeout;
081: }
082:
083: public String toString() {
084: return obj.toString();
085: }
086: }
087:
088: public ReservationManager(long timeout) {
089: this .timeout = timeout;
090: if (timeout > 0L) {
091: queue = new LinkedList();
092: }
093: }
094:
095: public synchronized boolean request(Object p) {
096: if (queue == null)
097: return true;
098: long now = System.currentTimeMillis();
099: Item item = findOrCreateItem(p, now);
100: if (!committed)
101: removeExpiredItems(now);
102: boolean result = item == queue.getFirst();
103: return result;
104: }
105:
106: private Item findOrCreateItem(Object p, long now) {
107: Item item = findItem(p);
108: if (item == null) {
109: item = new Item(p, now);
110: queue.add(item);
111: } else {
112: item.updateTimestamp(now);
113: }
114: return item;
115: }
116:
117: public synchronized void waitFor(Object p, Logger logger) {
118: if (queue == null)
119: return;
120: while (true) {
121: long now = System.currentTimeMillis();
122: Item item = findOrCreateItem(p, now);
123: if (!committed)
124: removeExpiredItems(now);
125: if (item == queue.getFirst())
126: return;
127: try {
128: Item firstItem = (Item) queue.getFirst();
129: long delay = firstItem.expires - now;
130: if (logger != null && logger.isInfoEnabled()) {
131: logger.info("waitFor " + delay + " for "
132: + firstItem);
133: }
134: if (delay <= 0) {
135: wait(); // Must be committed, wait for release
136: } else {
137: wait(delay); // Uncommitted, wait for timeout or release.
138: }
139: if (logger != null && logger.isInfoEnabled()) {
140: logger.info("waitFor wait finished");
141: }
142: } catch (InterruptedException ie) {
143: }
144: }
145: }
146:
147: public synchronized boolean commit(Object p) {
148: if (queue == null)
149: return true;
150: if (request(p)) {
151: committed = true;
152: return true;
153: }
154: return false;
155: }
156:
157: public synchronized void release(Object p) {
158: if (queue == null)
159: return;
160: Item item = findItem(p);
161: if (item != null) {
162: queue.remove(item);
163: committed = false;
164: notifyAll();
165: }
166: }
167:
168: private void removeExpiredItems(long now) {
169: for (Iterator i = queue.iterator(); i.hasNext();) {
170: Item item = (Item) i.next();
171: if (item.hasExpired(now)) {
172: i.remove();
173: }
174: }
175: }
176:
177: private Item findItem(Object p) {
178: for (Iterator i = queue.iterator(); i.hasNext();) {
179: Item item = (Item) i.next();
180: if (item.obj == p) {
181: return item;
182: }
183: }
184: return null;
185: }
186: }
|