001: /*
002: * <copyright>
003: *
004: * Copyright 1997-2004 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026:
027: package org.cougaar.core.persist;
028:
029: import java.io.Serializable;
030: import java.util.ArrayList;
031: import java.util.Collections;
032: import java.util.Iterator;
033: import java.util.List;
034: import java.util.HashSet;
035: import java.util.Set;
036:
037: import org.cougaar.core.blackboard.Envelope;
038: import org.cougaar.core.blackboard.EnvelopeTuple;
039: import org.cougaar.core.blackboard.MessageManager;
040: import org.cougaar.core.blackboard.PersistenceEnvelope;
041: import org.cougaar.core.blackboard.Subscriber;
042: import org.cougaar.core.component.ServiceBroker;
043: import org.cougaar.core.mts.MessageAddress;
044: import org.cougaar.core.service.AgentIdentificationService;
045: import org.cougaar.core.service.LoggingService;
046:
047: /**
048: * {@link Persistence} interface for the {@link
049: * org.cougaar.core.blackboard.Blackboard}.
050: */
051: public class BlackboardPersistence implements Persistence {
052: private PersistenceServiceForBlackboard persistenceService;
053: private LoggingService logger;
054: private MessageAddress agentId;
055: private PersistenceIdentity clientId = new PersistenceIdentity(
056: getClass().getName());
057: private List clientData;
058:
059: private List rehydrationSubscriberStates = null;
060:
061: private Object rehydrationSubscriberStatesLock = new Object();
062:
063: private PersistenceClient persistenceClient = new PersistenceClient() {
064: public PersistenceIdentity getPersistenceIdentity() {
065: return clientId;
066: }
067:
068: public List getPersistenceData() {
069: if (clientData == null) {
070: logger.error("Persistence not initiated by blackboard");
071: return Collections.EMPTY_LIST;
072: }
073: return clientData;
074: }
075: };
076:
077: private static class MyMetaData implements Serializable {
078: List undistributedEnvelopes;
079: List subscriberStates;
080: MessageManager messageManager;
081: Object quiescenceMonitorState;
082: }
083:
084: public static Persistence find(ServiceBroker sb)
085: throws PersistenceException {
086: return new BlackboardPersistence(sb);
087: }
088:
089: private BlackboardPersistence(ServiceBroker sb) {
090: AgentIdentificationService agentIdService = (AgentIdentificationService) sb
091: .getService(this , AgentIdentificationService.class,
092: null);
093: agentId = (MessageAddress) agentIdService.getMessageAddress();
094: sb.releaseService(this , AgentIdentificationService.class,
095: agentIdService);
096: logger = (LoggingService) sb.getService(this ,
097: LoggingService.class, null);
098: persistenceService = (PersistenceServiceForBlackboard) sb
099: .getService(persistenceClient,
100: PersistenceServiceForBlackboard.class, null);
101: }
102:
103: /**
104: * Keeps all associations of objects that have been persisted.
105: */
106:
107: /**
108: * @return true if persistence is disabled, except for
109: * "returnBytes" state-capture persistence
110: */
111: public boolean isDummyPersistence() {
112: return persistenceService.isDummyPersistence();
113: }
114:
115: /**
116: * Gets the system time when persistence should be performed. We do
117: * persistence periodically with a period such that all the plugins
118: * will, on the average create persistence deltas with their
119: * individual periods. The average frequence of persistence is the
120: * sum of the individual media frequencies. Frequency is the
121: * reciprocal of period. The computation is:<p>
122: *
123: * T = 1/(1/T1 + 1/T2 + ... + 1/Tn)
124: * <p>
125: * @return the time of the next persistence delta
126: */
127: public long getPersistenceTime() {
128: return persistenceService.getPersistenceTime();
129: }
130:
131: /**
132: * Rehydrate a persisted agent. Reads all the deltas in
133: * order keeping the latest (last encountered) values from
134: * every object.
135: * @param oldObjects Changes recorded in all but the last delta.
136: * @return List of all envelopes that have not yet been distributed
137: */
138: public RehydrationResult rehydrate(PersistenceEnvelope oldObjects,
139: Object state) {
140: RehydrationData rehydrationData = persistenceService
141: .getRehydrationData();
142: RehydrationResult result = new RehydrationResult();
143: if (rehydrationData != null) {
144: for (Iterator i = rehydrationData.getPersistenceEnvelope()
145: .getAllTuples(); i.hasNext();) {
146: oldObjects.addTuple((EnvelopeTuple) i.next());
147: }
148: List list = rehydrationData.getObjects();
149: assert list.size() == 1; // We published a bunch of envelopes and a MyMetaData
150: MyMetaData meta = (MyMetaData) list.get(0);
151: result.messageManager = meta.messageManager;
152: result.undistributedEnvelopes = meta.undistributedEnvelopes;
153: result.quiescenceMonitorState = meta.quiescenceMonitorState;
154: rehydrationSubscriberStates = meta.subscriberStates;
155: }
156: return result;
157: }
158:
159: /**
160: * Get a set of the Keys of the SubscriberStates in the rehydration info.
161: * Used by the Distributor to track which subscribers have not
162: * rehydrated.
163: */
164: public Set getSubscriberStateKeys() {
165: synchronized (rehydrationSubscriberStatesLock) {
166: if (rehydrationSubscriberStates == null)
167: return Collections.EMPTY_SET;
168: Set keys = new HashSet();
169: for (int i = 0; i < rehydrationSubscriberStates.size(); i++)
170: keys
171: .add(((PersistenceSubscriberState) rehydrationSubscriberStates
172: .get(i)).getKey());
173: return keys;
174: }
175: }
176:
177: public boolean hasSubscriberStates() {
178: synchronized (rehydrationSubscriberStatesLock) {
179: return rehydrationSubscriberStates != null;
180: }
181: }
182:
183: public void discardSubscriberState(Subscriber subscriber) {
184: synchronized (rehydrationSubscriberStatesLock) {
185: if (rehydrationSubscriberStates != null) {
186: for (Iterator subscribers = rehydrationSubscriberStates
187: .iterator(); subscribers.hasNext();) {
188: PersistenceSubscriberState pSubscriber = (PersistenceSubscriberState) subscribers
189: .next();
190: if (pSubscriber.isSameSubscriberAs(subscriber)) {
191: subscribers.remove();
192: if (rehydrationSubscriberStates.size() == 0) {
193: rehydrationSubscriberStates = null;
194: }
195: return;
196: }
197: }
198: }
199: }
200: }
201:
202: public PersistenceSubscriberState getSubscriberState(
203: Subscriber subscriber) {
204: synchronized (rehydrationSubscriberStatesLock) {
205: if (rehydrationSubscriberStates != null) {
206: for (Iterator subscribers = rehydrationSubscriberStates
207: .iterator(); subscribers.hasNext();) {
208: PersistenceSubscriberState pSubscriber = (PersistenceSubscriberState) subscribers
209: .next();
210: if (pSubscriber.isSameSubscriberAs(subscriber)) {
211: if (logger.isDebugEnabled()) {
212: logger.debug("Found " + pSubscriber);
213: }
214: return pSubscriber;
215: }
216: }
217: if (subscriber.shouldBePersisted()
218: && logger.isInfoEnabled()) {
219: logger
220: .info("Failed to find "
221: + new PersistenceSubscriberState(
222: subscriber));
223: }
224: return null;
225: }
226: return null;
227: }
228: }
229:
230: private boolean isPersistable(Object o) {
231: if (o instanceof NotPersistable)
232: return false;
233: if (o instanceof Persistable) {
234: Persistable pbl = (Persistable) o;
235: return pbl.isPersistable();
236: }
237: return true;
238: }
239:
240: private ArrayList copyAndRemoveNotPersistable(List v) {
241: if (v == null)
242: return null;
243: ArrayList result = new ArrayList(v.size());
244: for (Iterator iter = v.iterator(); iter.hasNext();) {
245: // The next line is the source of bug 3595
246: Envelope e = (Envelope) iter.next();
247: Envelope copy = e.newInstance();
248: for (Iterator tuples = e.getAllTuples(); tuples.hasNext();) {
249: EnvelopeTuple tuple = (EnvelopeTuple) tuples.next();
250: Object o = tuple.getObject();
251: if (isPersistable(o)) {
252: copy.addTuple(tuple);
253: } else {
254: if (logger.isDebugEnabled()) {
255: logger.debug("Removing not persistable " + o);
256: }
257: }
258: }
259: result.add(copy);
260: }
261: return result;
262: }
263:
264: /**
265: * End a persistence epoch by generating a persistence delta.
266: * @param epochEnvelopes All envelopes from this epoch that have
267: * been distributed. The effect of these envelopes has already been
268: * captured in the subscriber inboxes or in the consequential
269: * outboxes which are in undistributedEnvelopes.
270: * @param undistributedEnvelopes Envelopes that have not yet been distributed
271: * @param subscriberStates The subscriber states to record
272: */
273: public PersistenceObject persist(List epochEnvelopes,
274: List undistributedEnvelopes, List subscriberStates,
275: boolean returnBytes, boolean full,
276: MessageManager messageManager, Object quiescenceMonitorState) {
277: MyMetaData meta = new MyMetaData();
278: meta.undistributedEnvelopes = copyAndRemoveNotPersistable(undistributedEnvelopes);
279: for (Iterator iter = subscriberStates.iterator(); iter
280: .hasNext();) {
281: PersistenceSubscriberState ss = (PersistenceSubscriberState) iter
282: .next();
283: ss.pendingEnvelopes = copyAndRemoveNotPersistable(ss.pendingEnvelopes);
284: ss.transactionEnvelopes = copyAndRemoveNotPersistable(ss.transactionEnvelopes);
285: }
286: meta.subscriberStates = subscriberStates;
287: meta.messageManager = messageManager;
288: meta.quiescenceMonitorState = quiescenceMonitorState;
289: clientData = new ArrayList(2);
290: clientData.addAll(copyAndRemoveNotPersistable(epochEnvelopes));
291: for (Iterator iter = subscriberStates.iterator(); iter
292: .hasNext();) {
293: PersistenceSubscriberState ss = (PersistenceSubscriberState) iter
294: .next();
295: if (ss.pendingEnvelopes != null) {
296: clientData.addAll(ss.pendingEnvelopes);
297: }
298: if (ss.transactionEnvelopes != null) {
299: clientData.addAll(ss.transactionEnvelopes);
300: }
301: }
302: clientData.add(meta);
303: epochEnvelopes.clear(); // Allow gc
304: PersistenceObject result = persistenceService.persist(
305: returnBytes, full);
306: clientData.clear();
307: return result;
308: }
309:
310: public List getPersistenceData() {
311: List result = clientData;
312: clientData = null;
313: return result;
314: }
315:
316: public java.sql.Connection getDatabaseConnection(Object locker) {
317: return persistenceService.getDatabaseConnection(locker);
318: }
319:
320: public void releaseDatabaseConnection(Object locker) {
321: persistenceService.releaseDatabaseConnection(locker);
322: }
323:
324: // More Persistence implementation
325: public void registerServices(ServiceBroker sb) {
326: }
327:
328: public void unregisterServices(ServiceBroker sb) {
329: }
330:
331: public String toString() {
332: return "Persist(" + agentId + ")";
333: }
334: }
|