0001: /*
0002: * <copyright>
0003: *
0004: * Copyright 1997-2007 BBNT Solutions, LLC
0005: * under sponsorship of the Defense Advanced Research Projects
0006: * Agency (DARPA).
0007: *
0008: * You can redistribute this software and/or modify it under the
0009: * terms of the Cougaar Open Source License as published on the
0010: * Cougaar Open Source Website (www.cougaar.org).
0011: *
0012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
0013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
0014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
0015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
0016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
0017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
0018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
0019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
0020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
0021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
0022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
0023: *
0024: * </copyright>
0025: */
0026:
0027: package org.cougaar.core.persist;
0028:
0029: import java.io.ByteArrayInputStream;
0030: import java.io.IOException;
0031: import java.io.InputStream;
0032: import java.io.ObjectInputStream;
0033: import java.io.ObjectOutputStream;
0034: import java.io.OutputStream;
0035: import java.io.Serializable;
0036: import java.security.PrivilegedAction;
0037: import java.security.AccessController;
0038: import java.sql.Connection;
0039: import java.text.DateFormat;
0040: import java.text.DecimalFormat;
0041: import java.text.SimpleDateFormat;
0042: import java.util.ArrayList;
0043: import java.util.Collection;
0044: import java.util.Collections;
0045: import java.util.Comparator;
0046: import java.util.Date;
0047: import java.util.HashMap;
0048: import java.util.Iterator;
0049: import java.util.List;
0050: import java.util.Map;
0051: import java.util.Set;
0052: import org.cougaar.bootstrap.SystemProperties;
0053: import org.cougaar.core.adaptivity.OMCRangeList;
0054: import org.cougaar.core.agent.ClusterContextTable;
0055: import org.cougaar.core.blackboard.Envelope;
0056: import org.cougaar.core.blackboard.EnvelopeTuple;
0057: import org.cougaar.core.blackboard.PersistenceEnvelope;
0058: import org.cougaar.core.component.Component;
0059: import org.cougaar.core.component.ServiceBroker;
0060: import org.cougaar.core.component.ServiceProvider;
0061: import org.cougaar.core.component.ServiceRevokedListener;
0062: import org.cougaar.core.logging.LoggingServiceWithPrefix;
0063: import org.cougaar.core.mts.MessageAddress;
0064: import org.cougaar.core.service.AgentIdentificationService;
0065: import org.cougaar.core.service.DataProtectionKey;
0066: import org.cougaar.core.service.DataProtectionKeyEnvelope;
0067: import org.cougaar.core.service.DataProtectionService;
0068: import org.cougaar.core.service.DataProtectionServiceClient;
0069: import org.cougaar.core.service.PersistenceControlService;
0070: import org.cougaar.core.service.PersistenceMetricsService;
0071: import org.cougaar.util.CSVUtility;
0072: import org.cougaar.util.GC;
0073: import org.cougaar.util.GenericStateModelAdapter;
0074: import org.cougaar.util.LinkedByteOutputStream;
0075: import org.cougaar.util.log.Logger;
0076: import org.cougaar.util.log.Logging;
0077:
0078: /**
0079: * This component advertises the {@link PersistenceService} and
0080: * manages all persistence activities except for the actual storage
0081: * of persistence deltas, which is done by {@link PersistencePlugin}s.
0082: * <p>
0083: * As the distributor is about to about to distribute the objects in a
0084: * set of envelopes, those envelopes are passed to an instance of this
0085: * class. The contents of those envelopes are serialized into a
0086: * storage medium. These objects may refer to other plan objects that
0087: * have not changed. These objects are not in any of the envelopes,
0088: * but they must have been stored in earlier deltas. Instead of
0089: * rewriting those objects to the new delta, references to the earlier
0090: * objects are stored instead.
0091: * <p>
0092: * Restoring the state from this series is a bit problematic in that a
0093: * given object may have been written to several deltas; only the
0094: * latest copy is valid and all references from other objects must be
0095: * made to this latest copy and all others should be ignored. This is
0096: * handled by overwriting the value of the earlier objects with newer
0097: * values from later versions of the objects.
0098: *
0099: * @property org.cougaar.core.persistence.class
0100: * Specify the persistence classes to be used (if persistence is enabled).
0101: * The value consists of one or more elements separated by commas,
0102: * where element specifies a persistence class name and zero
0103: * or more semi-colon-separated parameters for that class.
0104: * For example, the default persistence class is:<br>
0105: * -Dorg.cougaar.core.persist.class=org.cougaar.core.persist.FilePersistence\;P<br>
0106: * Here is another example:<br>
0107: * -Dorg.cougaar.core.persist.class=org.cougaar.core.persist.DummyPersistence\;dummy<br>
0108: * Multiple classes can be specified with the "," separator, e.g.:<br>
0109: * -Dorg.cougaar.core.persist.class=Alpha\;a1\;a2\;a3,Beta\;b1\;b2<br>
0110: * where class Alpha is passed [a1, a2, a3] and Beta is passed [b1, b2].<br>
0111: * The interpretation of the parameters depends on the persistence
0112: * class, so see the documentation of the individual plugin classes
0113: * for details.
0114: *
0115: * @property org.cougaar.core.persistence.archivingDisabled
0116: * Set true
0117: * to discard archival deltas. Overridden by
0118: * org.cougaar.core.persistence.archiveCount.
0119: *
0120: * @property org.cougaar.core.persistence.archiveCount
0121: * An integer
0122: * specifying how may persistence archive snapshots to keep. In the
0123: * absence of a value for this property, the archivingDisabled
0124: * property is used to set this value to 0 for disabled and
0125: * Integer.MAX_VALUE for enabled.
0126: *
0127: * @property org.cougaar.core.persistence.clear
0128: * Set true to discard all deltas on startup
0129: *
0130: * @property org.cougaar.core.persistence.consolidationPeriod
0131: * The number of incremental deltas between full deltas (default = 10)
0132: *
0133: * @property org.cougaar.core.persistence.lazyInterval
0134: * specifies the
0135: * interval in milliseconds between the generation of persistence
0136: * deltas. Default is 300000 (5 minutes). This will be overridden if
0137: * the persistence control and adaptivity engines are running.
0138: *
0139: * @property org.cougaar.core.persistence.DataProtectionServiceStubEnabled
0140: * set to true to enable
0141: * a debugging implementation of DataProtectionService if no real one is found.
0142: */
0143: public class PersistenceServiceComponent extends
0144: GenericStateModelAdapter implements Component,
0145: PersistencePluginSupport, PersistenceNames {
0146: private static final long MIN_PERSISTENCE_INTERVAL = 5000L;
0147: private static final long MAX_PERSISTENCE_INTERVAL = 1200000L; // 20 minutes max
0148: private static final String DUMMY_MEDIA_NAME = "dummy";
0149: private static final String FILE_MEDIA_NAME = "P";
0150: private static final char PARAM_SEP = ';';
0151:
0152: private static final long PERSISTENCE_INTERVAL_DFLT = 300000L;
0153: private static final boolean WRITE_DISABLED_DFLT = SystemProperties
0154: .getBoolean(PERSISTENCE_DISABLE_WRITE_PROP);
0155:
0156: private static final String PERSISTENCE_CONSOLIDATION_PERIOD_PROP = PERSISTENCE_PROP_PREFIX
0157: + PERSISTENCE_CONSOLIDATION_PERIOD_NAME;
0158: private static final int PERSISTENCE_CONSOLIDATION_PERIOD_DFLT = 10;
0159:
0160: private static final String[] PERSISTENCE_CLASSES_DFLT = getPersistenceClassesDflt();
0161:
0162: private static String[] getPersistenceClassesDflt() {
0163: String prop = SystemProperties
0164: .getProperty(PERSISTENCE_CLASS_PROP);
0165: if (prop != null) {
0166: try {
0167: return CSVUtility.parse(prop);
0168: } catch (Exception e) {
0169: Logging.getLogger(PersistenceServiceComponent.class)
0170: .error("Failed to parse " + prop, e);
0171: }
0172: }
0173: boolean disabled = SystemProperties.getProperty(
0174: PERSISTENCE_ENABLE_PROP, "false").equals("false");
0175: if (disabled)
0176: return new String[0];
0177: return new String[] { FilePersistence.class.getName()
0178: + PARAM_SEP + FILE_MEDIA_NAME };
0179: }
0180:
0181: private static String getDummyPersistenceClass() {
0182: return DummyPersistence.class.getName() + PARAM_SEP
0183: + DUMMY_MEDIA_NAME + PARAM_SEP
0184: + PERSISTENCE_INTERVAL_PREFIX
0185: + MAX_PERSISTENCE_INTERVAL;
0186: }
0187:
0188: private long PERSISTENCE_INTERVAL = SystemProperties.getLong(
0189: PERSISTENCE_INTERVAL_PROP, PERSISTENCE_INTERVAL_DFLT);
0190: private int PERSISTENCE_CONSOLIDATION_PERIOD = SystemProperties
0191: .getInt(PERSISTENCE_CONSOLIDATION_PERIOD_PROP,
0192: PERSISTENCE_CONSOLIDATION_PERIOD_DFLT);
0193:
0194: private static class RehydrationSet {
0195: PersistencePlugin ppi;
0196: SequenceNumbers sequenceNumbers;
0197:
0198: RehydrationSet(PersistencePlugin ppi, SequenceNumbers sn) {
0199: this .ppi = ppi;
0200: this .sequenceNumbers = sn;
0201: }
0202: }
0203:
0204: private static class ClientStuff extends RehydrationData implements
0205: Serializable {
0206: public void addAssociation(PersistenceAssociation pAssoc) {
0207: envelope.addObject(pAssoc.getObject());
0208: }
0209:
0210: public void setObjects(List l) {
0211: objects = l;
0212: }
0213: }
0214:
0215: private class PersistencePluginInfo {
0216: PersistencePlugin ppi;
0217: long nextPersistenceTime;
0218: int deltaCount = 0;
0219: SequenceNumbers cleanupSequenceNumbers = null;
0220:
0221: PersistencePluginInfo(PersistencePlugin ppi) {
0222: this .ppi = ppi;
0223: if (ppi.getPersistenceInterval() <= 0L) {
0224: setInterval(PERSISTENCE_INTERVAL);
0225: }
0226: if (ppi.getConsolidationPeriod() <= 0) {
0227: setConsolidationPeriod(PERSISTENCE_CONSOLIDATION_PERIOD);
0228: }
0229: }
0230:
0231: long getBehind(long now) {
0232: return now - nextPersistenceTime;
0233: }
0234:
0235: /**
0236: * This is complicated because we want this plugin to be ahead or
0237: * behind to the same degree it was ahead or behind before the
0238: * change. To do this, we get how much we are currently behind and
0239: * multiply that by the ratio of the new and old intervals. The
0240: * new base time and delta count are set so that we appear to be
0241: * behind by the adjusted amount.
0242: */
0243: void setInterval(long newInterval) {
0244: long now = System.currentTimeMillis();
0245: long persistenceInterval = ppi.getPersistenceInterval();
0246: if (persistenceInterval == 0L) {
0247: nextPersistenceTime = newInterval + now;
0248: } else {
0249: long behind = getBehind(now);
0250: long newBehind = behind * newInterval
0251: / persistenceInterval;
0252: nextPersistenceTime = now - newBehind;
0253: }
0254: ppi.setPersistenceInterval(newInterval);
0255: if (logger.isDebugEnabled()) {
0256: logger.debug(ppi.getName() + " persistenceInterval = "
0257: + newInterval);
0258: logger.debug(ppi.getName() + " nextPersistenceTime = "
0259: + nextPersistenceTime);
0260: }
0261: }
0262:
0263: void setConsolidationPeriod(int newPeriod) {
0264: ppi.setConsolidationPeriod(newPeriod);
0265: if (logger.isDebugEnabled()) {
0266: logger.debug(ppi.getName() + " consolidationPeriod = "
0267: + newPeriod);
0268: }
0269: }
0270: }
0271:
0272: private DataProtectionServiceClient dataProtectionServiceClient = new DataProtectionServiceClient() {
0273: public Iterator iterator() {
0274: return getDataProtectionKeyIterator();
0275: }
0276:
0277: public MessageAddress getAgentIdentifier() {
0278: return PersistenceServiceComponent.this .getMessageAddress();
0279: }
0280: };
0281:
0282: private static class PersistenceKeyEnvelope implements
0283: DataProtectionKeyEnvelope {
0284: PersistencePlugin ppi;
0285: int deltaNumber;
0286: DataProtectionKey theKey = null;
0287:
0288: public PersistenceKeyEnvelope(PersistencePlugin ppi,
0289: int deltaNumber) {
0290: this .deltaNumber = deltaNumber;
0291: this .ppi = ppi;
0292: }
0293:
0294: public void setDataProtectionKey(DataProtectionKey aKey)
0295: throws IOException {
0296: theKey = aKey;
0297: ppi.storeDataProtectionKey(deltaNumber, aKey);
0298: }
0299:
0300: public DataProtectionKey getDataProtectionKey()
0301: throws IOException {
0302: if (theKey == null) {
0303: theKey = ppi.retrieveDataProtectionKey(deltaNumber);
0304: }
0305: return theKey;
0306: }
0307: }
0308:
0309: private Iterator getDataProtectionKeyIterator() {
0310: List envelopes = new ArrayList();
0311: RehydrationSet[] rehydrationSets = getRehydrationSets("");
0312: for (int i = 0; i < rehydrationSets.length; i++) {
0313: SequenceNumbers sequenceNumbers = rehydrationSets[i].sequenceNumbers;
0314: final PersistencePlugin ppi = rehydrationSets[i].ppi;
0315: for (int seq = sequenceNumbers.first; seq < sequenceNumbers.current; seq++) {
0316: envelopes.add(new PersistenceKeyEnvelope(ppi, seq));
0317: }
0318: }
0319: return envelopes.iterator();
0320: }
0321:
0322: private PersistencePluginInfo getPluginInfo(String pluginName) {
0323: PersistencePluginInfo ppio = (PersistencePluginInfo) plugins
0324: .get(pluginName);
0325: if (ppio != null)
0326: return ppio;
0327: throw new IllegalArgumentException(
0328: "No such persistence medium: " + pluginName);
0329: }
0330:
0331: private String getAgentName() {
0332: MessageAddress addr = getMessageAddress();
0333: return (addr == null ? "null" : addr.toString());
0334: }
0335:
0336: public class PersistenceControlServiceImpl implements
0337: PersistenceControlService {
0338: public String[] getControlNames() {
0339: return new String[0];
0340: }
0341:
0342: public OMCRangeList getControlValues(String controlName) {
0343: throw new IllegalArgumentException("No such control: "
0344: + controlName);
0345: }
0346:
0347: public void setControlValue(String controlName,
0348: Comparable newValue) {
0349: throw new IllegalArgumentException("No such control: "
0350: + controlName);
0351: }
0352:
0353: public String[] getMediaNames() {
0354: return (String[]) plugins.keySet().toArray(
0355: new String[plugins.size()]);
0356: }
0357:
0358: public String[] getMediaControlNames(String mediaName) {
0359: getPluginInfo(mediaName); // Test existence
0360: return new String[] { PERSISTENCE_INTERVAL_NAME,
0361: PERSISTENCE_CONSOLIDATION_PERIOD_NAME, };
0362: }
0363:
0364: public OMCRangeList getMediaControlValues(String mediaName,
0365: String controlName) {
0366: getPluginInfo(mediaName);// Test existence
0367: if (controlName.equals(PERSISTENCE_INTERVAL_NAME)) {
0368: return new OMCRangeList(new Long(
0369: MIN_PERSISTENCE_INTERVAL), new Long(
0370: MAX_PERSISTENCE_INTERVAL));
0371: }
0372: if (controlName
0373: .equals(PERSISTENCE_CONSOLIDATION_PERIOD_NAME)) {
0374: return new OMCRangeList(new Integer(1), new Integer(20));
0375: }
0376: throw new IllegalArgumentException(mediaName
0377: + " has no control named: " + controlName);
0378: }
0379:
0380: public void setMediaControlValue(String mediaName,
0381: String controlName, Comparable newValue) {
0382: PersistencePluginInfo ppio = getPluginInfo(mediaName);
0383: if (controlName.equals(PERSISTENCE_INTERVAL_NAME)) {
0384: ppio.setInterval(((Number) newValue).longValue());
0385: recomputeNextPersistenceTime = true;
0386: return;
0387: }
0388: if (controlName
0389: .equals(PERSISTENCE_CONSOLIDATION_PERIOD_NAME)) {
0390: ppio.setConsolidationPeriod(((Number) newValue)
0391: .intValue());
0392: return;
0393: }
0394: throw new IllegalArgumentException(mediaName
0395: + " has no control named: " + controlName);
0396: }
0397: }
0398:
0399: static {
0400: boolean checkPatch = SystemProperties.getBoolean(
0401: PERSISTENCE_VERIFY_JAVA_IO_PATCH_PROP, true);
0402: if (checkPatch) {
0403: PersistenceInputStream.checkSuperclass();
0404: }
0405: }
0406:
0407: // Component implementation
0408:
0409: public void setServiceBroker(ServiceBroker sb) {
0410: this .sb = sb;
0411: }
0412:
0413: /**
0414: * Ideally, our agent would give us our parameters, but limitations
0415: * in ACME or CSMART may preclude this possibility. So, this method
0416: * is an alternate using system properties. The property named
0417: * org.cougaar.core.persistence.<agent> may have a value that is a
0418: * comma separated list of values of parameters. Each item of the
0419: * list is of the form <name>=<value> where <name> is the simple
0420: * name for the parameter (without the org.cougaar.persistence.
0421: * prefix). Many of the values in this list will contain commas and
0422: * this must be quoted with backslash or quotes.
0423: */
0424: private static List getParametersFromProperties(
0425: MessageAddress agentId) {
0426: List ret = new ArrayList();
0427: String pname = PERSISTENCE_PARAMETERS_PROP + "." + agentId;
0428: String pvalue = SystemProperties.getProperty(pname);
0429: if (pvalue != null) {
0430: String[] ps = CSVUtility.parse(pvalue);
0431: for (int i = 0; i < ps.length; i++) {
0432: ret.add(PERSISTENCE_PROP_PREFIX + ps[i]);
0433: }
0434: }
0435: return ret;
0436: }
0437:
0438: /**
0439: * Optional persistence parameters for this agent
0440: */
0441: public void setParameter(Object o) {
0442: if (o instanceof List) {
0443: params.addAll((List) o);
0444: } else {
0445: throw new IllegalArgumentException("Illegal parameter " + o);
0446: }
0447: }
0448:
0449: /**
0450: * Load
0451: */
0452: public void load() {
0453: super .load();
0454:
0455: // Get our local agent's address
0456: AgentIdentificationService ais = (AgentIdentificationService) sb
0457: .getService(this , AgentIdentificationService.class,
0458: null);
0459: if (ais != null) {
0460: agentId = ais.getMessageAddress();
0461: sb.releaseService(this , AgentIdentificationService.class,
0462: ais);
0463: }
0464:
0465: // Create our logger
0466: logger = Logging.getLogger(this .getClass());
0467: logger = new LoggingServiceWithPrefix(logger, getAgentName()
0468: + ": ");
0469:
0470: // Add persistence parameters defined by system properties
0471: params.addAll(getParametersFromProperties(agentId));
0472:
0473: // Set agent-wide persistence defaults
0474: List overridePluginClasses = new ArrayList();
0475: for (int i = 0, n = params.size(); i < n; i++) {
0476: String fullParam = (String) params.get(i);
0477: if (!fullParam.startsWith(PERSISTENCE_PROP_PREFIX))
0478: continue; // Not mine
0479: String param = fullParam.substring(PERSISTENCE_PROP_PREFIX
0480: .length());
0481: try {
0482: if (param.startsWith(PERSISTENCE_INTERVAL_PREFIX)) {
0483: PERSISTENCE_INTERVAL = Long.parseLong(param
0484: .substring(PERSISTENCE_INTERVAL_PREFIX
0485: .length()));
0486: continue;
0487: }
0488: if (param
0489: .startsWith(PERSISTENCE_CONSOLIDATION_PERIOD_PREFIX)) {
0490: PERSISTENCE_CONSOLIDATION_PERIOD = Integer
0491: .parseInt(param
0492: .substring(PERSISTENCE_CONSOLIDATION_PERIOD_PREFIX
0493: .length()));
0494: continue;
0495: }
0496: if (param.startsWith(PERSISTENCE_DISABLE_WRITE_PREFIX)) {
0497: writeDisabled = "true".equals(param
0498: .substring(PERSISTENCE_DISABLE_WRITE_PREFIX
0499: .length()));
0500: continue;
0501: }
0502: if (param.startsWith(PERSISTENCE_ARCHIVE_NUMBER_PREFIX)) {
0503: archiveNumber = param
0504: .substring(PERSISTENCE_ARCHIVE_NUMBER_PREFIX
0505: .length());
0506: writeDisabled = true; // Setting the archive number only makes sense if not writing
0507: continue;
0508: }
0509: if (param.startsWith(PERSISTENCE_CLASS_PREFIX)) {
0510: String plugin = param
0511: .substring(PERSISTENCE_CLASS_PREFIX
0512: .length());
0513: overridePluginClasses.add(plugin);
0514: continue;
0515: }
0516: } catch (Exception e) {
0517: logger.error("Error parsing parameter: " + fullParam);
0518: }
0519: }
0520: String[] pluginClasses = PERSISTENCE_CLASSES_DFLT;
0521: if (overridePluginClasses.size() > 0) {
0522: // Replace default with specific classes
0523: pluginClasses = new String[overridePluginClasses.size()];
0524: pluginClasses = (String[]) overridePluginClasses
0525: .toArray(pluginClasses);
0526: }
0527:
0528: identityTable = new IdentityTable(logger);
0529: registerServices(sb);
0530: try {
0531: for (int i = 0; i < pluginClasses.length; i++) {
0532: addPlugin(pluginClasses[i]);
0533: }
0534: // There must be at least one writable plugin
0535: boolean haveWritablePlugin = false;
0536: for (Iterator i = plugins.values().iterator(); i.hasNext();) {
0537: PersistencePluginInfo ppio = (PersistencePluginInfo) i
0538: .next();
0539: if (ppio.ppi.isWritable()) {
0540: haveWritablePlugin = true;
0541: break;
0542: }
0543: }
0544: if (!haveWritablePlugin) {
0545: // Add a dummy persistence plugin
0546: addPlugin(getDummyPersistenceClass());
0547: isDummy = true;
0548: }
0549: } catch (Exception e) {
0550: throw new RuntimeException("Exception in load()", e);
0551: }
0552: }
0553:
0554: private void addPlugin(String pluginSpec)
0555: throws PersistenceException {
0556: String[] paramTokens = CSVUtility.parse(pluginSpec, PARAM_SEP);
0557: if (paramTokens.length < 1) {
0558: throw new PersistenceException(
0559: "No plugin class specified: " + pluginSpec);
0560: }
0561: if (paramTokens.length < 2) {
0562: throw new PersistenceException("No plugin name: "
0563: + pluginSpec);
0564: }
0565: try {
0566: Class pluginClass = Class.forName((String) paramTokens[0]);
0567: String pluginName = (String) paramTokens[1];
0568: String[] pluginParams = new String[paramTokens.length - 2];
0569: System.arraycopy(paramTokens, 2, pluginParams, 0,
0570: pluginParams.length);
0571: PersistencePlugin ppi = (PersistencePlugin) pluginClass
0572: .newInstance();
0573: if (writeDisabled)
0574: ppi.setWritable(false); // Force write off
0575: addPlugin(ppi, pluginName, pluginParams);
0576: } catch (ClassNotFoundException cnfe) {
0577: throw new PersistenceException("Bad plugin class", cnfe);
0578: } catch (InstantiationException ie) {
0579: throw new PersistenceException(
0580: "Plugin instantiation failed", ie);
0581: } catch (IllegalAccessException iae) {
0582: throw new PersistenceException(
0583: "Plugin constructor inaccessible", iae);
0584: }
0585: }
0586:
0587: public void unload() {
0588: unregisterServices(sb);
0589: if (dataProtectionService != null) {
0590: sb.releaseService(dataProtectionServiceClient,
0591: DataProtectionService.class, dataProtectionService);
0592: }
0593: }
0594:
0595: private DataProtectionService getDataProtectionService() {
0596: if (dataProtectionService == null) {
0597:
0598: // For running with security, getting the service, which may be
0599: // the security service, is a privileged action
0600: // See RFE 3771
0601: dataProtectionService = (DataProtectionService) AccessController
0602: .doPrivileged(new PrivilegedAction() {
0603: public Object run() {
0604: return sb.getService(
0605: dataProtectionServiceClient,
0606: DataProtectionService.class, null);
0607: }
0608: });
0609:
0610: if (dataProtectionService == null) {
0611: if (logger.isInfoEnabled())
0612: logger.info("No DataProtectionService Available.");
0613: if (SystemProperties
0614: .getBoolean("org.cougaar.core.persistence.DataProtectionServiceStubEnabled")) {
0615: dataProtectionService = new DataProtectionServiceStub();
0616: }
0617: } else {
0618: if (logger.isInfoEnabled())
0619: logger.info("DataProtectionService is "
0620: + dataProtectionService.getClass()
0621: .getName());
0622: }
0623: }
0624: return dataProtectionService;
0625: }
0626:
0627: /**
0628: * Keeps all associations of objects that have been persisted.
0629: */
0630: private IdentityTable identityTable;
0631: private SequenceNumbers sequenceNumbers = null;
0632: private ObjectOutputStream currentOutput;
0633: private List params = new ArrayList();
0634: private MessageAddress agentId;
0635: private List associationsToPersist = new ArrayList();
0636: private boolean previousPersistFailed = false;
0637: private Logger logger;
0638: private DataProtectionService dataProtectionService;
0639: private boolean writeDisabled = WRITE_DISABLED_DFLT;
0640: private String archiveNumber = "";
0641: private Map plugins = new HashMap();
0642: private boolean isDummy = false;
0643: private Map rehydrationResult = null;
0644: private Map clients = new HashMap();
0645: private ServiceBroker sb;
0646: private boolean full; // Private to persist method and methods it calls
0647:
0648: /**
0649: * The current PersistencePlugin being used to generate persistence
0650: * deltas. This is changed just prior to generating a full delta if
0651: * there is a different plugin having priority.
0652: */
0653: private PersistencePluginInfo currentPersistPluginInfo;
0654:
0655: private long previousPersistenceTime = System.currentTimeMillis();
0656:
0657: private long nextPersistenceTime;
0658:
0659: private boolean recomputeNextPersistenceTime = true;
0660:
0661: private PersistenceMetricsServiceImpl metricsService = new PersistenceMetricsServiceImpl();
0662:
0663: private void addPlugin(PersistencePlugin ppi, String pluginName,
0664: String[] pluginParams) throws PersistenceException {
0665: boolean deleteOldPersistence = SystemProperties
0666: .getBoolean("org.cougaar.core.persistence.clear");
0667: if (deleteOldPersistence && logger.isInfoEnabled()) {
0668: logger.info("Clearing old persistence data");
0669: }
0670: ppi.init(this , pluginName, pluginParams, deleteOldPersistence);
0671: plugins.put(ppi.getName(), new PersistencePluginInfo(ppi));
0672: }
0673:
0674: public MessageAddress getMessageAddress() {
0675: return agentId;
0676: }
0677:
0678: public boolean isDummyPersistence() {
0679: return isDummy;
0680: }
0681:
0682: /**
0683: * Gets the system time when persistence should be performed. We do
0684: * persistence periodically with a period such that all the plugins
0685: * will, on the average create persistence deltas with their
0686: * individual periods. The average frequency of persistence is the
0687: * sum of the individual media frequencies. Frequency is the
0688: * reciprocal of period. The computation is:<p>
0689: *
0690: * T = 1/(1/T1 + 1/T2 + ... + 1/Tn)
0691: * <p>
0692: * @return the time of the next persistence delta
0693: */
0694: public long getPersistenceTime() {
0695: if (recomputeNextPersistenceTime) {
0696: double sum = 0.0;
0697: for (Iterator i = plugins.values().iterator(); i.hasNext();) {
0698: PersistencePluginInfo ppio = (PersistencePluginInfo) i
0699: .next();
0700: if (ppio.ppi.isWritable()) {
0701: sum += 1.0 / ppio.ppi.getPersistenceInterval();
0702: }
0703: }
0704: long interval = (long) (1.0 / sum);
0705: nextPersistenceTime = previousPersistenceTime + interval;
0706: recomputeNextPersistenceTime = false;
0707: if (logger.isDebugEnabled())
0708: logger.debug("persistence interval=" + interval);
0709: }
0710: return nextPersistenceTime;
0711: }
0712:
0713: private ClientStuff getClientStuff(PersistenceIdentity clientId) {
0714: if (rehydrationResult == null)
0715: return null; // No rehydration
0716: ClientStuff clientStuff = (ClientStuff) rehydrationResult
0717: .get(clientId);
0718: if (clientStuff == null) {
0719: clientStuff = new ClientStuff();
0720: rehydrationResult.put(clientId, clientStuff);
0721: }
0722: return clientStuff;
0723: }
0724:
0725: private RehydrationData getRehydrationData(
0726: PersistenceIdentity clientId) {
0727: return getClientStuff(clientId);
0728: }
0729:
0730: /**
0731: * Rehydrate a persisted agent. Reads all the deltas in order
0732: * keeping the latest (last encountered) values from every object.
0733: * The rehydrated state is saved in rehydrationResult.
0734: * @param state a PersistenceObject if rehydrating from a saved
0735: * state object. If null, rehydrate from media plugins
0736: */
0737: private void rehydrate(final PersistenceObject pObject) {
0738: if (isDummy && pObject == null) {
0739: return; // Nothing to rehydrate
0740: }
0741: synchronized (identityTable) {
0742: final List rehydrationCollection = new ArrayList();
0743: identityTable
0744: .setRehydrationCollection(rehydrationCollection);
0745: try {
0746: final RehydrationSet[] rehydrationSets = getRehydrationSets(archiveNumber);
0747: if (pObject != null || rehydrationSets.length > 0) { // Deltas exist
0748: try {
0749: final Map[] resultPtr = new Map[1];
0750: Runnable thunk = new Runnable() {
0751: public void run() {
0752: try {
0753: if (pObject != null) {
0754: if (logger.isInfoEnabled()) {
0755: logger
0756: .info("Rehydrating "
0757: + getMessageAddress()
0758: + " from "
0759: + pObject);
0760: }
0761: resultPtr[0] = rehydrateFromBytes(pObject
0762: .getBytes());
0763: } else {
0764: // Loop through the available RehydrationSets and
0765: // attempt to rehydrate from each one until no errors
0766: // occur. This will normally happen on the very first
0767: // set, but might fail if the data has been corrupted
0768: // in some way.
0769: boolean success = false;
0770: for (int i = 0; i < rehydrationSets.length; i++) {
0771: SequenceNumbers rehydrateNumbers = rehydrationSets[i].sequenceNumbers;
0772: PersistencePlugin ppi = rehydrationSets[i].ppi;
0773: if (logger.isInfoEnabled()) {
0774: logger
0775: .info("Rehydrating "
0776: + getMessageAddress()
0777: + " "
0778: + rehydrateNumbers
0779: .toString());
0780: }
0781: try {
0782: while (rehydrateNumbers.first < rehydrateNumbers.current - 1) {
0783: rehydrateOneDelta(
0784: ppi,
0785: rehydrateNumbers.first++,
0786: false);
0787: }
0788: resultPtr[0] = rehydrateOneDelta(
0789: ppi,
0790: rehydrateNumbers.first++,
0791: true);
0792: success = true;
0793: break; // Successful rehydration
0794: } catch (Exception e) { // Rehydration failed
0795: logger
0796: .error(
0797: "Rehydration from "
0798: + rehydrationSets[i]
0799: + " failed: ",
0800: e);
0801: resetRehydration(rehydrationCollection);
0802: continue; // Try next RehydrationSet
0803: }
0804: }
0805: if (!success) {
0806: logger
0807: .error("Rehydration failed. Starting over from scratch");
0808: }
0809: }
0810: } catch (Exception ioe) {
0811: throw new RuntimeException(
0812: "withClusterContext", ioe);
0813: }
0814: }
0815: };
0816:
0817: ClusterContextTable.withClusterContext(
0818: getMessageAddress(), thunk);
0819: Map clientData = resultPtr[0];
0820: if (clientData == null)
0821: return; // Didn't rehydrate
0822: rehydrationResult = new HashMap();
0823: for (Iterator iter = identityTable.iterator(); iter
0824: .hasNext();) {
0825: PersistenceAssociation pAssoc = (PersistenceAssociation) iter
0826: .next();
0827: Object obj = pAssoc.getObject();
0828: PersistenceIdentity clientId = pAssoc
0829: .getClientId();
0830: if (pAssoc.isActive()) {
0831: if (logger.isDetailEnabled())
0832: logger.detail(clientId
0833: + ": addAssociation "
0834: + pAssoc);
0835: getClientStuff(clientId)
0836: .addAssociation(pAssoc);
0837:
0838: // Bug 3588: Do postRehydration work here, only for objects on
0839: // BBoard, instead of for all.
0840: // This is wrong if the Composition on an InActive MPTask is
0841: // wanted and not on another MPTask but could be later.
0842: // Also if an inactive object can become active.
0843: if (obj instanceof ActivePersistenceObject) {
0844: ((ActivePersistenceObject) obj)
0845: .postRehydration(logger);
0846: }
0847: } else {
0848: if (logger.isDetailEnabled())
0849: logger.detail(clientId
0850: + ": inactive " + pAssoc);
0851: }
0852: }
0853: if (logger.isDetailEnabled()) {
0854: printIdentityTable("");
0855: }
0856: for (Iterator i = clientData.entrySet()
0857: .iterator(); i.hasNext();) {
0858: Map.Entry entry = (Map.Entry) i.next();
0859: PersistenceIdentity clientId = (PersistenceIdentity) entry
0860: .getKey();
0861: List clientObjects = (List) entry
0862: .getValue();
0863: ClientStuff clientStuff = getClientStuff(clientId);
0864: clientStuff.setObjects(clientObjects);
0865: if (logger.isDetailEnabled()) {
0866: logger.detail("PersistenceEnvelope of "
0867: + clientId);
0868: logEnvelopeContents(clientStuff
0869: .getPersistenceEnvelope());
0870: logger.detail("Other objects of "
0871: + clientId + ": "
0872: + clientStuff.getObjects());
0873: }
0874: }
0875: clearMarks(identityTable.iterator());
0876: } catch (Exception e) {
0877: logger.error("Error during rehydration", e);
0878: }
0879:
0880: }
0881: } finally {
0882: rehydrationCollection.clear(); // Allow garbage collection
0883: identityTable.setRehydrationCollection(null); // Perform garbage collection
0884: }
0885: }
0886: }
0887:
0888: /**
0889: * Loop through all plugins and get their available sequence number
0890: * sets. Create RehydrationSets from the sequence numbers sets and
0891: * sort them by timestamp.
0892: * @return the sorted RehydrationSets
0893: */
0894: private RehydrationSet[] getRehydrationSets(String suffix) {
0895: List result = new ArrayList();
0896: for (Iterator i = plugins.values().iterator(); i.hasNext();) {
0897: PersistencePluginInfo ppio = (PersistencePluginInfo) i
0898: .next();
0899: SequenceNumbers[] pluginNumbers = ppio.ppi
0900: .readSequenceNumbers(suffix);
0901: for (int j = 0; j < pluginNumbers.length; j++) {
0902: result.add(new RehydrationSet(ppio.ppi,
0903: pluginNumbers[j]));
0904: }
0905: }
0906: Collections.sort(result, new Comparator() {
0907: public int compare(Object o1, Object o2) {
0908: RehydrationSet rs1 = (RehydrationSet) o1;
0909: RehydrationSet rs2 = (RehydrationSet) o2;
0910: int diff = rs1.sequenceNumbers
0911: .compareTo(rs1.sequenceNumbers);
0912: if (diff != 0)
0913: return -diff;
0914: return rs1.ppi.getName().compareTo(rs2.ppi.getName());
0915: }
0916: });
0917: return (RehydrationSet[]) result
0918: .toArray(new RehydrationSet[result.size()]);
0919: }
0920:
0921: private void logEnvelopeContents(Envelope env) {
0922: for (Iterator cc = env.getAllTuples(); cc.hasNext();) {
0923: EnvelopeTuple t = (EnvelopeTuple) cc.next();
0924: String action = "";
0925: switch (t.getAction()) {
0926: case Envelope.ADD:
0927: action = "ADD";
0928: break;
0929: case Envelope.REMOVE:
0930: action = "REMOVE";
0931: break;
0932: case Envelope.CHANGE:
0933: action = "CHANGE";
0934: break;
0935: case Envelope.BULK:
0936: action = "BULK";
0937: break;
0938: }
0939: logger.detail(action + " " + t.getObject());
0940: }
0941: }
0942:
0943: public static String hc(Object o) {
0944: return (Integer.toHexString(System.identityHashCode(o)) + " " + (o == null ? "<null>"
0945: : o.toString()));
0946: }
0947:
0948: /**
0949: * Erase all the effects of a failed rehydration attempt.
0950: */
0951: private void resetRehydration(Collection rehydrationCollection) {
0952: identityTable = new IdentityTable(logger);
0953: rehydrationCollection.clear();
0954: identityTable.setRehydrationCollection(rehydrationCollection);
0955: }
0956:
0957: private Map rehydrateOneDelta(PersistencePlugin ppi,
0958: int deltaNumber, boolean lastDelta) throws IOException,
0959: ClassNotFoundException {
0960: InputStream is = ppi.openInputStream(deltaNumber);
0961: DataProtectionService dataProtectionService = getDataProtectionService();
0962: if (dataProtectionService != null) {
0963: PersistenceKeyEnvelope keyEnvelope = new PersistenceKeyEnvelope(
0964: ppi, deltaNumber);
0965: is = dataProtectionService.getInputStream(keyEnvelope, is);
0966: }
0967: ObjectInputStream ois = new ObjectInputStream(is);
0968: try {
0969: return rehydrateFromStream(ois, deltaNumber, lastDelta);
0970: } finally {
0971: ois.close();
0972: ppi.finishInputStream(deltaNumber);
0973: }
0974: }
0975:
0976: private Map rehydrateFromBytes(byte[] bytes) throws IOException,
0977: ClassNotFoundException {
0978: ByteArrayInputStream bs = new ByteArrayInputStream(bytes);
0979: return rehydrateFromStream(new ObjectInputStream(bs), 0, true);
0980: }
0981:
0982: private Map rehydrateFromStream(ObjectInputStream currentInput,
0983: int deltaNumber, boolean lastDelta) throws IOException,
0984: ClassNotFoundException {
0985: try {
0986: identityTable.setNextId(currentInput.readInt());
0987: int length = currentInput.readInt();
0988: if (logger.isDebugEnabled()) {
0989: logger.debug("Reading " + length + " objects");
0990: }
0991: PersistenceReference[][] referenceArrays = new PersistenceReference[length][];
0992: for (int i = 0; i < length; i++) {
0993: referenceArrays[i] = (PersistenceReference[]) currentInput
0994: .readObject();
0995: }
0996: // byte[] bytes = (byte[]) currentInput.readObject();
0997: // PersistenceInputStream stream = new PersistenceInputStream(bytes);
0998: PersistenceInputStream stream = new PersistenceInputStream(
0999: currentInput, logger);
1000: if (logger.isDetailEnabled()) {
1001: writeHistoryHeader();
1002: }
1003: stream.setIdentityTable(identityTable);
1004: try {
1005: for (int i = 0; i < referenceArrays.length; i++) {
1006: // Side effect: updates identityTable
1007: stream.readAssociation(referenceArrays[i]);
1008: }
1009: if (lastDelta) {
1010: return (Map) stream.readObject();
1011: } else {
1012: return null;
1013: }
1014: } finally {
1015: stream.close();
1016: }
1017: } catch (IOException e) {
1018: logger.error("IOException reading "
1019: + (lastDelta ? "last " : " ") + "delta "
1020: + deltaNumber);
1021: throw e;
1022: }
1023: }
1024:
1025: /**
1026: * Get highest sequence numbers recorded on any medium
1027: */
1028: private int getHighestSequenceNumber() {
1029: int best = 0;
1030: for (Iterator i = plugins.values().iterator(); i.hasNext();) {
1031: PersistencePluginInfo ppio = (PersistencePluginInfo) i
1032: .next();
1033: SequenceNumbers[] availableNumbers = ppio.ppi
1034: .readSequenceNumbers("");
1035: for (int j = 0; j < availableNumbers.length; j++) {
1036: SequenceNumbers t = availableNumbers[j];
1037: best = Math.max(best, t.current);
1038: }
1039: }
1040: return best;
1041: }
1042:
1043: private void initSequenceNumbers() {
1044: int highest = getHighestSequenceNumber();
1045: sequenceNumbers = new SequenceNumbers(highest, highest, System
1046: .currentTimeMillis());
1047: }
1048:
1049: /**
1050: * Persist a List of Envelopes. First the objects in the envelope
1051: * are entered into the persistence identityTable. Then the objects
1052: * are serialized to an ObjectOutputStream preceded with their
1053: * reference id. Other references to objects in the identityTable
1054: * are replaced with reference objects.
1055: */
1056: // private boolean nonEmpty(List subscriberStates) {
1057: // for (Iterator iter = subscriberStates.iterator(); iter.hasNext(); ) {
1058: // PersistenceSubscriberState subscriberState = (PersistenceSubscriberState) iter.next();
1059: // if (subscriberState.pendingEnvelopes.size() > 0) return true;
1060: // if (subscriberState.transactionEnvelopes.size() > 0) return true;
1061: // }
1062: // return false;
1063: // }
1064: private boolean isPersistable(Object o) {
1065: if (o instanceof NotPersistable)
1066: return false;
1067: if (o instanceof Persistable) {
1068: Persistable pbl = (Persistable) o;
1069: return pbl.isPersistable();
1070: }
1071: return true;
1072: }
1073:
1074: private void addEnvelope(Envelope e, PersistenceIdentity clientId)
1075: throws PersistenceException {
1076: if (logger.isDetailEnabled())
1077: logger.detail(clientId + ": addEnvelope " + e);
1078: for (Iterator envelope = e.getAllTuples(); envelope.hasNext();) {
1079: addEnvelopeTuple((EnvelopeTuple) envelope.next(), clientId);
1080: }
1081: }
1082:
1083: private void addEnvelopeTuple(EnvelopeTuple tuple,
1084: PersistenceIdentity clientId) throws PersistenceException {
1085: if (logger.isDetailEnabled())
1086: logger.detail(clientId + ": addEnvelopeTuple " + tuple);
1087: switch (tuple.getAction()) {
1088: case Envelope.BULK:
1089: Collection collection = (Collection) tuple.getObject();
1090: for (Iterator iter2 = collection.iterator(); iter2
1091: .hasNext();) {
1092: addObjectToPersist(iter2.next(), true, clientId);
1093: }
1094: break;
1095: case Envelope.ADD:
1096: case Envelope.CHANGE:
1097: addObjectToPersist(tuple.getObject(), true, clientId);
1098: break;
1099: case Envelope.REMOVE:
1100: addObjectToPersist(tuple.getObject(), false, clientId);
1101: break;
1102: }
1103: }
1104:
1105: private void addObjectToPersist(Object object, boolean newActive,
1106: PersistenceIdentity clientId) throws PersistenceException {
1107: if (!isPersistable(object))
1108: return;
1109: PersistenceAssociation pAssoc = identityTable
1110: .findOrCreate(object);
1111: PersistenceIdentity oldClientId = pAssoc.getClientId();
1112: if (oldClientId == null) {
1113: pAssoc.setClientId(clientId);
1114: } else if (!oldClientId.equals(clientId)) {
1115: throw new PersistenceException(clientId + " not owner");
1116: }
1117: if (logger.isDetailEnabled())
1118: logger.detail(clientId + ": addObjectToPersist " + object
1119: + ", " + newActive);
1120: if (newActive) {
1121: pAssoc.setActive();
1122: } else {
1123: pAssoc.setInactive();
1124: }
1125: if (!full)
1126: addAssociationToPersist(pAssoc);
1127: }
1128:
1129: private void addAssociationToPersist(PersistenceAssociation pAssoc) {
1130: if (pAssoc.isMarked())
1131: return; // Already scheduled to be written
1132: pAssoc.setMarked(true);
1133: addMarkedAssociation(pAssoc);
1134: }
1135:
1136: private void addMarkedAssociation(PersistenceAssociation pAssoc) {
1137: objectsThatMightGoAway.add(pAssoc.getObject());
1138: associationsToPersist.add(pAssoc);
1139: }
1140:
1141: private void clearMarks(Iterator iter) {
1142: while (iter.hasNext()) {
1143: PersistenceAssociation pAssoc = (PersistenceAssociation) iter
1144: .next();
1145: pAssoc.setMarked(false);
1146: }
1147: }
1148:
1149: private void addExistingMarkedAssociations(Iterator iter) {
1150: while (iter.hasNext()) {
1151: PersistenceAssociation pAssoc = (PersistenceAssociation) iter
1152: .next();
1153: if (pAssoc.isMarked()) {
1154: if (logger.isInfoEnabled()) {
1155: logger.info("Previously marked: " + pAssoc);
1156: }
1157: addMarkedAssociation(pAssoc);
1158: }
1159: }
1160: }
1161:
1162: /**
1163: * Select the next plugin to use for persistence. This is only
1164: * possible when the delta that is about to be generated will have
1165: * the full state. For each plugin, we keep track of the
1166: * nextPersistenceTime based on its persistenceInterval. We select
1167: * the plugin with the earliest nextPersistenceTime. If the
1168: * nextPersistenceTime of the selected plugin differs significantly
1169: * from now, the nextPersistenceTimes of all plugins are adjusted to
1170: * eliminate that difference.
1171: *
1172: * Persistence snapshots are taken with a frequency that is the
1173: * average of frequencies of all the plugins. This means that any
1174: * particular plugin will persistence with a frequency greater that
1175: * its specified frequency for a while, but then will be inactive
1176: * for a period while other plugins are used for an interval such
1177: * that its average frequency is close to its spcified frequency.
1178: *
1179: * As an example, consider two plugins A and B with periods of 10
1180: * and 40 respectively. The consolidation period for both is 10. The
1181: * average frequency is 1/10 + 1/40 or 5/40. This means the
1182: * inter-snapshot interval will be 8 and the faster plugin (A) will
1183: * go first since it "nextPersistenceTime will be 10 compared to 40.
1184: * So we have:
1185: * next(A) = 10
1186: * next(B) = 40
1187: * A: 8, 16, 24, ..., 80
1188: * next(A) = 10 + 10 * 10 = 110
1189: * next(B) = 40 (B goes next)
1190: * B: 88, 96, ..., 160
1191: * next(A) = 110 (A goes next)
1192: * next(B) 40 + 400 = 440
1193: * A: 168, 176, ..., 240
1194: * next(A) = 110 + 10 * 10 = 210 (A continues)
1195: * next(B) = 440
1196: * A: 248, 256, ..., 320
1197: * next(A) = 210 + 10 * 10 = 310 (A continues)
1198: * next(B) = 440
1199: * A: 328, 336, ..., 400
1200: * next(A) = 310 + 10 * 10 = 410 (A continues)
1201: * next(B) = 440
1202: * A: 408, 416, ..., 480
1203: * next(A) = 410 + 10 * 10 = 510
1204: * next(B) = 440 (B goes next)
1205: *
1206: * This patterm will continue with A running 4 times as much as B.
1207: * We observe that this leads to fairly long gaps between uses of
1208: * the lower-rate plugin since it uses up its share of the snapshots
1209: * fairly quickly and then waits a long time for its turn to come up
1210: * again. This suggests that it might be wise to reduce the
1211: * consolidation period of infrequent plugins.
1212: */
1213:
1214: private void selectNextPlugin() {
1215: PersistencePluginInfo best = null;
1216: for (Iterator i = plugins.values().iterator(); i.hasNext();) {
1217: PersistencePluginInfo ppio = (PersistencePluginInfo) i
1218: .next();
1219: if (best == null
1220: || ppio.nextPersistenceTime < best.nextPersistenceTime) {
1221: best = ppio;
1222: }
1223: }
1224: long adjustment = System.currentTimeMillis()
1225: - best.nextPersistenceTime;
1226: if (Math.abs(adjustment) > 10000L) {
1227: for (Iterator i = plugins.values().iterator(); i.hasNext();) {
1228: PersistencePluginInfo ppio = (PersistencePluginInfo) i
1229: .next();
1230: ppio.nextPersistenceTime += adjustment;
1231: }
1232: }
1233: currentPersistPluginInfo = best;
1234: }
1235:
1236: /**
1237: * Because PersistenceAssociations have WeakReference objects, the
1238: * actual object in the association may be garbage collected if
1239: * there are no other references to it. This list is used to hold a
1240: * reference to such objects to avoid an NPE when it is not known
1241: * that a reference exists. Only used during the first delta after
1242: * rehydration.
1243: */
1244: private ArrayList objectsThatMightGoAway = new ArrayList();
1245:
1246: private final static Object vmPersistLock = new Object();
1247:
1248: /**
1249: * Process the data from all clients. Envelopes and such are put
1250: * into the identityTable as PersistenceAssociations. The list of
1251: * everything left over is stored in a Map indexed by client id.
1252: */
1253: private Map getClientData() {
1254: Map data = new HashMap(clients.size());
1255: for (Iterator i = clients.entrySet().iterator(); i.hasNext();) {
1256: Map.Entry entry = (Map.Entry) i.next();
1257: PersistenceClient client = (PersistenceClient) entry
1258: .getValue();
1259: PersistenceIdentity clientId = (PersistenceIdentity) entry
1260: .getKey();
1261: try {
1262: List clientData = client.getPersistenceData();
1263: List clientObjects = new ArrayList();
1264: if (logger.isDetailEnabled())
1265: logger.detail(clientId + " clientData: "
1266: + clientData);
1267: data.put(clientId, clientObjects);
1268: for (int j = 0, m = clientData.size(); j < m; j++) {
1269: Object o = clientData.get(j);
1270: if (o instanceof Envelope) {
1271: addEnvelope((Envelope) o, clientId);
1272: } else if (o instanceof EnvelopeTuple) {
1273: addEnvelopeTuple((EnvelopeTuple) o, clientId);
1274: } else {
1275: clientObjects.add(o);
1276: }
1277: }
1278: clientData.clear(); // Allow gc
1279: } catch (Exception e) {
1280: logger.error("Exception in getPersistenceData("
1281: + client + ")", e);
1282: }
1283: }
1284: return data;
1285: }
1286:
1287: /**
1288: * End a persistence epoch by generating a persistence delta.
1289: */
1290: PersistenceObject persist(boolean returnBytes, boolean full) {
1291: if (isDummy && !returnBytes) {
1292: return null;
1293: }
1294: int deltaNumber = -1;
1295: long startCPU = 0L;
1296: //startCPU = CpuClock.cpuTimeMillis();
1297: long startTime = System.currentTimeMillis();
1298: if (logger.isInfoEnabled()) {
1299: logger.info("Persist started");
1300: }
1301: int bytesSerialized = 0;
1302: full |= returnBytes; // Must be a full snapshot to return bytes
1303: Throwable failed = null;
1304: recomputeNextPersistenceTime = true;
1305: PersistenceObject result = null; // Return value if wanted
1306: synchronized (identityTable) {
1307: try {
1308: associationsToPersist.clear();
1309: objectsThatMightGoAway.clear();
1310: addExistingMarkedAssociations(identityTable.iterator());
1311: if (sequenceNumbers == null) {
1312: initSequenceNumbers();
1313: }
1314: if (previousPersistFailed) {
1315: full = true; // Don't trust the deltas, (try to) do a full
1316: previousPersistFailed = false;
1317: }
1318: // The following fixes an edge condition. The very first delta
1319: // (seqno = 0) is always a full delta whether we want it to be
1320: // or not because there are no previous ones. Setting full =
1321: // true removes any ambiguity about its fullness.
1322: if (sequenceNumbers.current == 0)
1323: full = true;
1324: // Every so often generate a full delta to consolidate and
1325: // prevent the number of deltas from increasing without bound.
1326: if (!full && currentPersistPluginInfo != null) {
1327: // Check if its time to consolidate this plugin's snapshots
1328: int consolidationPeriod = currentPersistPluginInfo.ppi
1329: .getConsolidationPeriod();
1330: // nSnapshots is the number already generated
1331: int nSnapshots = sequenceNumbers.current
1332: - sequenceNumbers.first;
1333: if (nSnapshots + 1 >= consolidationPeriod) {
1334: // The next snapshot needs to be full
1335: full = true; // Time for a full snapshot
1336: }
1337: }
1338: if (full || currentPersistPluginInfo == null) {
1339: if (currentPersistPluginInfo != null) {
1340: // Cleanup the existing since the full replaces them.
1341: // N.B., if there are multiple plugins, the cleanup will
1342: // not actually occur until the next time this plugin is
1343: // selected as the best and creates a full snapshot. This
1344: // is because we will select a new plugin below.
1345: currentPersistPluginInfo.cleanupSequenceNumbers = new SequenceNumbers(
1346: sequenceNumbers.first + 1,
1347: sequenceNumbers.current,
1348: sequenceNumbers.timestamp);
1349: }
1350: sequenceNumbers.first = sequenceNumbers.current;
1351: selectNextPlugin();
1352: }
1353: if (!currentPersistPluginInfo.ppi.checkOwnership()) {
1354: return null; // We are dead. Don't persist
1355: }
1356: if (sequenceNumbers.current == sequenceNumbers.first)
1357: full = true;
1358: currentPersistPluginInfo.deltaCount++;
1359: this .full = full; // Global full flag for duration of persist
1360: // Now gather everything to persist from our clients. Side
1361: // effect updates identityTable and if !full, associationsToPersist.
1362: Map clientData = getClientData();
1363: if (full) {
1364: // If full dump, garbage collect unreferenced objects
1365: GC.gc();
1366: for (Iterator iter = identityTable.iterator(); iter
1367: .hasNext();) {
1368: PersistenceAssociation pAssoc = (PersistenceAssociation) iter
1369: .next();
1370: if (!pAssoc.isMarked()) {
1371: Object object = pAssoc.getObject();
1372: // it is just barely possible that another gc might have
1373: // collected some additional objects so do a final check
1374: if (object != null) {
1375: // Prevent additional gc from scavenging the objects
1376: // we are committed to persisting
1377: addAssociationToPersist(pAssoc);
1378: }
1379: }
1380: }
1381: }
1382: deltaNumber = beginTransaction();
1383: try {
1384: if (currentOutput == null && !returnBytes) {
1385: // Only doing dummy persistence
1386: } else {
1387: PersistenceOutputStream stream = new PersistenceOutputStream(
1388: logger);
1389: PersistenceReference[][] referenceArrays;
1390: if (logger.isDetailEnabled()) {
1391: writeHistoryHeader();
1392: }
1393: stream.setIdentityTable(identityTable);
1394: // One agent at a time to avoid inter-agent deadlock due to shared objects
1395: try {
1396: if (logger.isInfoEnabled()) {
1397: logger
1398: .info("Obtaining JVM persist lock");
1399: }
1400: synchronized (vmPersistLock) {
1401: if (logger.isInfoEnabled()) {
1402: logger
1403: .info("Obtained JVM persist lock, serializing");
1404: }
1405: int nObjects = associationsToPersist
1406: .size();
1407: referenceArrays = new PersistenceReference[nObjects][];
1408: for (int i = 0; i < nObjects; i++) {
1409: PersistenceAssociation pAssoc = (PersistenceAssociation) associationsToPersist
1410: .get(i);
1411: if (logger.isDetailEnabled()) {
1412: logger.detail("Persisting "
1413: + pAssoc);
1414: }
1415: referenceArrays[i] = stream
1416: .writeAssociation(pAssoc);
1417: }
1418: stream.writeObject(clientData);
1419: bytesSerialized = stream.size();
1420: if (logger.isInfoEnabled()) {
1421: logger
1422: .info("Serialized "
1423: + bytesSerialized
1424: + " bytes to buffer, releasing lock");
1425: }
1426: } // Ok to let other agents persist while we write out our data
1427: } finally {
1428: stream.close();
1429: } // End of stream protection try-catch
1430: if (returnBytes) {
1431: int estimatedSize = (int) (1.2 * bytesSerialized);
1432: LinkedByteOutputStream returnByteStream = new LinkedByteOutputStream(
1433: estimatedSize);
1434: ObjectOutputStream returnOutput = new ObjectOutputStream(
1435: returnByteStream);
1436: writeFinalOutput(returnOutput,
1437: referenceArrays, stream);
1438: returnOutput.close();
1439: result = new PersistenceObject(
1440: "Persistence state "
1441: + sequenceNumbers.current,
1442: returnByteStream.toByteArray());
1443: if (logger.isInfoEnabled()) {
1444: logger
1445: .info("Copied persistence snapshot to memory buffer"
1446: + " for return to state-capture caller");
1447: }
1448: }
1449: if (currentOutput != null) {
1450: writeFinalOutput(currentOutput,
1451: referenceArrays, stream);
1452: currentOutput.close();
1453: if (logger.isInfoEnabled()) {
1454: logger
1455: .info("Wrote persistence snapshot to output stream");
1456: }
1457: }
1458: } // End of non-dummy persistence
1459: clearMarks(associationsToPersist.iterator());
1460: commitTransaction();
1461: logger.printDot("P");
1462: // Cleanup old deltas and archived snapshots. N.B. The
1463: // cleanup is happening to the plugin that was just used.
1464: // When there are several plugins, this is usually different
1465: // from the plugin whose cleanupSequenceNumbers were set
1466: // above. This cleanup has been pending while the various
1467: // other plugins have been in use. This is _ok_! The
1468: // snapshot we just took is invariably a full snapshot.
1469: if (currentPersistPluginInfo.cleanupSequenceNumbers != null) {
1470: if (logger.isInfoEnabled()) {
1471: logger
1472: .info("Consolidated deltas "
1473: + currentPersistPluginInfo.cleanupSequenceNumbers);
1474: }
1475: currentPersistPluginInfo.ppi
1476: .cleanupOldDeltas(currentPersistPluginInfo.cleanupSequenceNumbers);
1477: currentPersistPluginInfo.ppi.cleanupArchive();
1478: currentPersistPluginInfo.cleanupSequenceNumbers = null;
1479: }
1480: } catch (Exception e) { // Transaction protection
1481: rollbackTransaction();
1482: if (logger.isErrorEnabled()) {
1483: logger.error("Persist failed", e);
1484: }
1485: logger.printDot("X");
1486: throw e;
1487: }
1488: objectsThatMightGoAway.clear();
1489: } catch (Exception e) {
1490: failed = e;
1491: logger.error("Error writing persistence snapshot", e);
1492: } finally {
1493: if (isDummy) {
1494: identityTable.clear(); // Perform garbage collection
1495: }
1496: }
1497: // set persist time to persist completion + epsilon
1498: previousPersistenceTime = System.currentTimeMillis();
1499: // Note currentPersistPluginInfo.nextPersistenceTime is _not_
1500: // relative to the current time; it is relative to the other
1501: // persistence plugins and is occasionally adjusted when it
1502: // drifts too far from real time.
1503: currentPersistPluginInfo.nextPersistenceTime += currentPersistPluginInfo.ppi
1504: .getPersistenceInterval();
1505: }
1506: //long finishCPU = CpuClock.cpuTimeMillis();
1507: long finishCPU = 0l;
1508: long finishTime = System.currentTimeMillis();
1509: PersistenceMetricImpl metric = new PersistenceMetricImpl(
1510: formatDeltaNumber(deltaNumber), startTime, finishTime,
1511: finishCPU - startCPU, bytesSerialized, full, failed,
1512: currentPersistPluginInfo.ppi);
1513: metricsService.addMetric(metric);
1514: if (logger.isInfoEnabled()) {
1515: logger.info(metric.toString());
1516: }
1517: return result;
1518: }
1519:
1520: private void writeFinalOutput(ObjectOutputStream s,
1521: PersistenceReference[][] referenceArrays,
1522: PersistenceOutputStream stream) throws IOException {
1523: s.writeInt(identityTable.getNextId());
1524: s.writeInt(referenceArrays.length);
1525: for (int i = 0; i < referenceArrays.length; i++) {
1526: s.writeObject(referenceArrays[i]);
1527: }
1528: stream.writeBytes(s);
1529: s.flush();
1530: }
1531:
1532: /** The format of timestamps in the log */
1533: private static DateFormat logTimeFormat = new SimpleDateFormat(
1534: "yyyy/MM/dd HH:mm:ss.SSS");
1535:
1536: private static DecimalFormat deltaFormat = new DecimalFormat(
1537: "_00000");
1538:
1539: public static String formatDeltaNumber(int deltaNumber) {
1540: return deltaFormat.format(deltaNumber);
1541: }
1542:
1543: private void writeHistoryHeader() {
1544: if (logger.isDetailEnabled()) {
1545: logger.detail(logTimeFormat.format(new Date(System
1546: .currentTimeMillis())));
1547: }
1548: }
1549:
1550: void printIdentityTable(String id) {
1551: logger.detail("IdentityTable begins");
1552: for (Iterator iter = identityTable.iterator(); iter.hasNext();) {
1553: PersistenceAssociation pAssoc = (PersistenceAssociation) iter
1554: .next();
1555: logger.detail(id + pAssoc);
1556: }
1557: logger.detail("IdentityTable ends");
1558: }
1559:
1560: public Logger getLogger() {
1561: return logger;
1562: }
1563:
1564: static String getObjectName(Object o) {
1565: return o.getClass().getName() + "@"
1566: + System.identityHashCode(o);
1567: }
1568:
1569: private int beginTransaction() throws IOException {
1570: int deltaNumber = sequenceNumbers.current;
1571: OutputStream os = currentPersistPluginInfo.ppi
1572: .openOutputStream(deltaNumber, full);
1573: if (os != null) {
1574: DataProtectionService dataProtectionService = getDataProtectionService();
1575: if (dataProtectionService != null) {
1576: PersistenceKeyEnvelope keyEnvelope = new PersistenceKeyEnvelope(
1577: currentPersistPluginInfo.ppi, deltaNumber);
1578: os = dataProtectionService.getOutputStream(keyEnvelope,
1579: os);
1580: }
1581: currentOutput = new ObjectOutputStream(os);
1582: } else {
1583: currentOutput = null;
1584: }
1585: return deltaNumber;
1586: }
1587:
1588: private void rollbackTransaction() {
1589: currentPersistPluginInfo.ppi.abortOutputStream(sequenceNumbers);
1590: currentOutput = null;
1591: }
1592:
1593: private void commitTransaction() throws PersistenceException {
1594: currentPersistPluginInfo.ppi.lockOwnership();
1595: sequenceNumbers.current += 1;
1596: currentPersistPluginInfo.ppi.finishOutputStream(
1597: sequenceNumbers, full);
1598: currentOutput = null;
1599: currentPersistPluginInfo.ppi.unlockOwnership();
1600: }
1601:
1602: public java.sql.Connection getDatabaseConnection(Object locker) {
1603: return currentPersistPluginInfo.ppi
1604: .getDatabaseConnection(locker);
1605: }
1606:
1607: public void releaseDatabaseConnection(Object locker) {
1608: currentPersistPluginInfo.ppi.releaseDatabaseConnection(locker);
1609: }
1610:
1611: private class PersistenceServiceImpl implements PersistenceService {
1612: PersistenceIdentity clientId;
1613:
1614: PersistenceServiceImpl(PersistenceIdentity clientId) {
1615: this .clientId = clientId;
1616: }
1617:
1618: public RehydrationData getRehydrationData() {
1619: return PersistenceServiceComponent.this
1620: .getRehydrationData(clientId);
1621: }
1622: }
1623:
1624: private class PersistenceServiceForAgentImpl extends
1625: PersistenceServiceImpl implements
1626: PersistenceServiceForAgent {
1627: PersistenceServiceForAgentImpl(PersistenceIdentity clientId) {
1628: super (clientId);
1629: }
1630:
1631: public void rehydrate(PersistenceObject pObject) {
1632: PersistenceServiceComponent.this .rehydrate(pObject);
1633: }
1634:
1635: public void suspend() {
1636: }
1637: }
1638:
1639: private class PersistenceServiceForBlackboardImpl extends
1640: PersistenceServiceImpl implements
1641: PersistenceServiceForBlackboard {
1642: PersistenceServiceForBlackboardImpl(PersistenceIdentity clientId) {
1643: super (clientId);
1644: }
1645:
1646: public PersistenceObject persist(boolean returnBytes,
1647: boolean full) {
1648: return PersistenceServiceComponent.this .persist(
1649: returnBytes, full);
1650: }
1651:
1652: public java.sql.Connection getDatabaseConnection(Object locker) {
1653: return PersistenceServiceComponent.this
1654: .getDatabaseConnection(locker);
1655: }
1656:
1657: public void releaseDatabaseConnection(Object locker) {
1658: PersistenceServiceComponent.this
1659: .releaseDatabaseConnection(locker);
1660: }
1661:
1662: public boolean isDummyPersistence() {
1663: return PersistenceServiceComponent.this
1664: .isDummyPersistence();
1665: }
1666:
1667: public long getPersistenceTime() {
1668: return PersistenceServiceComponent.this
1669: .getPersistenceTime();
1670: }
1671: }
1672:
1673: ServiceProvider serviceProvider = new ServiceProvider() {
1674: public Object getService(ServiceBroker sb, Object requestor,
1675: Class cls) {
1676: if (cls == PersistenceControlService.class) {
1677: return new PersistenceControlServiceImpl();
1678: }
1679: if (cls == PersistenceMetricsService.class) {
1680: return metricsService;
1681: }
1682: if (cls == PersistenceService.class
1683: || cls == PersistenceServiceForBlackboard.class
1684: || cls == PersistenceServiceForAgent.class) {
1685: if (requestor instanceof PersistenceClient) {
1686: PersistenceClient client = (PersistenceClient) requestor;
1687: PersistenceIdentity clientId = client
1688: .getPersistenceIdentity();
1689: clients.put(clientId, client);
1690: if (cls == PersistenceService.class) {
1691: return new PersistenceServiceImpl(clientId);
1692: } else if (cls == PersistenceServiceForBlackboard.class) {
1693: return new PersistenceServiceForBlackboardImpl(
1694: clientId);
1695: } else {
1696: return new PersistenceServiceForAgentImpl(
1697: clientId);
1698: }
1699: } else {
1700: throw new IllegalArgumentException(
1701: "PersistenceService requestor must be a PersistenceClient");
1702: }
1703: }
1704: throw new IllegalArgumentException("Unknown service class");
1705: }
1706:
1707: public void releaseService(ServiceBroker sb, Object requestor,
1708: Class cls, Object svc) {
1709: if (cls == PersistenceControlService.class) {
1710: return;
1711: }
1712: if (cls == PersistenceMetricsService.class) {
1713: return;
1714: }
1715: if (cls == PersistenceServiceForBlackboard.class
1716: || cls == PersistenceService.class
1717: || cls == PersistenceServiceForAgent.class) {
1718: if (svc instanceof PersistenceServiceImpl) {
1719: PersistenceServiceImpl impl = (PersistenceServiceImpl) svc;
1720: clients.remove(impl.clientId);
1721: }
1722: return;
1723: }
1724: throw new IllegalArgumentException("Unknown service class");
1725: }
1726: };
1727:
1728: // More Persistence implementation
1729: private void registerServices(ServiceBroker sb) {
1730: sb.addService(PersistenceMetricsService.class, serviceProvider);
1731: sb.addService(PersistenceControlService.class, serviceProvider);
1732: sb.addService(PersistenceService.class, serviceProvider);
1733: sb.addService(PersistenceServiceForBlackboard.class,
1734: serviceProvider);
1735: sb
1736: .addService(PersistenceServiceForAgent.class,
1737: serviceProvider);
1738: }
1739:
1740: private void unregisterServices(ServiceBroker sb) {
1741: sb.revokeService(PersistenceControlService.class,
1742: serviceProvider);
1743: sb.revokeService(PersistenceMetricsService.class,
1744: serviceProvider);
1745: sb.revokeService(PersistenceService.class, serviceProvider);
1746: sb.revokeService(PersistenceServiceForBlackboard.class,
1747: serviceProvider);
1748: sb.revokeService(PersistenceServiceForAgent.class,
1749: serviceProvider);
1750: }
1751:
1752: public String toString() {
1753: return "Persist(" + getAgentName() + ")";
1754: }
1755: }
|