Source Code Cross Referenced for PersistenceServiceComponent.java in  » Science » Cougaar12_4 » org » cougaar » core » persist » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Science » Cougaar12_4 » org.cougaar.core.persist 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


0001:        /*
0002:         * <copyright>
0003:         *  
0004:         *  Copyright 1997-2007 BBNT Solutions, LLC
0005:         *  under sponsorship of the Defense Advanced Research Projects
0006:         *  Agency (DARPA).
0007:         * 
0008:         *  You can redistribute this software and/or modify it under the
0009:         *  terms of the Cougaar Open Source License as published on the
0010:         *  Cougaar Open Source Website (www.cougaar.org).
0011:         * 
0012:         *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
0013:         *  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
0014:         *  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
0015:         *  A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
0016:         *  OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
0017:         *  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
0018:         *  LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
0019:         *  DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
0020:         *  THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
0021:         *  (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
0022:         *  OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
0023:         *  
0024:         * </copyright>
0025:         */
0026:
0027:        package org.cougaar.core.persist;
0028:
0029:        import java.io.ByteArrayInputStream;
0030:        import java.io.IOException;
0031:        import java.io.InputStream;
0032:        import java.io.ObjectInputStream;
0033:        import java.io.ObjectOutputStream;
0034:        import java.io.OutputStream;
0035:        import java.io.Serializable;
0036:        import java.security.PrivilegedAction;
0037:        import java.security.AccessController;
0038:        import java.sql.Connection;
0039:        import java.text.DateFormat;
0040:        import java.text.DecimalFormat;
0041:        import java.text.SimpleDateFormat;
0042:        import java.util.ArrayList;
0043:        import java.util.Collection;
0044:        import java.util.Collections;
0045:        import java.util.Comparator;
0046:        import java.util.Date;
0047:        import java.util.HashMap;
0048:        import java.util.Iterator;
0049:        import java.util.List;
0050:        import java.util.Map;
0051:        import java.util.Set;
0052:        import org.cougaar.bootstrap.SystemProperties;
0053:        import org.cougaar.core.adaptivity.OMCRangeList;
0054:        import org.cougaar.core.agent.ClusterContextTable;
0055:        import org.cougaar.core.blackboard.Envelope;
0056:        import org.cougaar.core.blackboard.EnvelopeTuple;
0057:        import org.cougaar.core.blackboard.PersistenceEnvelope;
0058:        import org.cougaar.core.component.Component;
0059:        import org.cougaar.core.component.ServiceBroker;
0060:        import org.cougaar.core.component.ServiceProvider;
0061:        import org.cougaar.core.component.ServiceRevokedListener;
0062:        import org.cougaar.core.logging.LoggingServiceWithPrefix;
0063:        import org.cougaar.core.mts.MessageAddress;
0064:        import org.cougaar.core.service.AgentIdentificationService;
0065:        import org.cougaar.core.service.DataProtectionKey;
0066:        import org.cougaar.core.service.DataProtectionKeyEnvelope;
0067:        import org.cougaar.core.service.DataProtectionService;
0068:        import org.cougaar.core.service.DataProtectionServiceClient;
0069:        import org.cougaar.core.service.PersistenceControlService;
0070:        import org.cougaar.core.service.PersistenceMetricsService;
0071:        import org.cougaar.util.CSVUtility;
0072:        import org.cougaar.util.GC;
0073:        import org.cougaar.util.GenericStateModelAdapter;
0074:        import org.cougaar.util.LinkedByteOutputStream;
0075:        import org.cougaar.util.log.Logger;
0076:        import org.cougaar.util.log.Logging;
0077:
0078:        /**
0079:         * This component advertises the {@link PersistenceService} and
0080:         * manages all persistence activities except for the actual storage
0081:         * of persistence deltas, which is done by {@link PersistencePlugin}s.
0082:         * <p>
0083:         * As the distributor is about to about to distribute the objects in a
0084:         * set of envelopes, those envelopes are passed to an instance of this
0085:         * class.  The contents of those envelopes are serialized into a
0086:         * storage medium. These objects may refer to other plan objects that
0087:         * have not changed.  These objects are not in any of the envelopes,
0088:         * but they must have been stored in earlier deltas.  Instead of
0089:         * rewriting those objects to the new delta, references to the earlier
0090:         * objects are stored instead.
0091:         * <p>
0092:         * Restoring the state from this series is a bit problematic in that a
0093:         * given object may have been written to several deltas; only the
0094:         * latest copy is valid and all references from other objects must be
0095:         * made to this latest copy and all others should be ignored.  This is
0096:         * handled by overwriting the value of the earlier objects with newer
0097:         * values from later versions of the objects.  
0098:         *
0099:         * @property org.cougaar.core.persistence.class
0100:         * Specify the persistence classes to be used (if persistence is enabled).
0101:         * The value consists of one or more elements separated by commas,
0102:         * where element specifies a persistence class name and zero
0103:         * or more semi-colon-separated parameters for that class.
0104:         * For example, the default persistence class is:<br>
0105:         * &nbsp;&nbsp;-Dorg.cougaar.core.persist.class=org.cougaar.core.persist.FilePersistence\;P<br>
0106:         * Here is another example:<br>
0107:         * &nbsp;&nbsp;-Dorg.cougaar.core.persist.class=org.cougaar.core.persist.DummyPersistence\;dummy<br>
0108:         * Multiple classes can be specified with the "," separator, e.g.:<br>
0109:         * &nbsp;&nbsp;-Dorg.cougaar.core.persist.class=Alpha\;a1\;a2\;a3,Beta\;b1\;b2<br>
0110:         * where class Alpha is passed [a1, a2, a3] and Beta is passed [b1, b2].<br>
0111:         * The interpretation of the parameters depends on the persistence
0112:         * class, so see the documentation of the individual plugin classes
0113:         * for details.
0114:         *
0115:         * @property org.cougaar.core.persistence.archivingDisabled
0116:         * Set true
0117:         * to discard archival deltas. Overridden by
0118:         * org.cougaar.core.persistence.archiveCount.
0119:         *
0120:         * @property org.cougaar.core.persistence.archiveCount
0121:         * An integer
0122:         * specifying how may persistence archive snapshots to keep. In the
0123:         * absence of a value for this property, the archivingDisabled
0124:         * property is used to set this value to 0 for disabled and
0125:         * Integer.MAX_VALUE for enabled.
0126:         *
0127:         * @property org.cougaar.core.persistence.clear
0128:         * Set true to discard all deltas on startup
0129:         *
0130:         * @property org.cougaar.core.persistence.consolidationPeriod
0131:         * The number of incremental deltas between full deltas (default = 10)
0132:         *
0133:         * @property org.cougaar.core.persistence.lazyInterval
0134:         * specifies the
0135:         * interval in milliseconds between the generation of persistence
0136:         * deltas. Default is 300000 (5 minutes). This will be overridden if
0137:         * the persistence control and adaptivity engines are running.
0138:         *
0139:         * @property org.cougaar.core.persistence.DataProtectionServiceStubEnabled
0140:         * set to true to enable 
0141:         * a debugging implementation of DataProtectionService if no real one is found.
0142:         */
0143:        public class PersistenceServiceComponent extends
0144:                GenericStateModelAdapter implements  Component,
0145:                PersistencePluginSupport, PersistenceNames {
0146:            private static final long MIN_PERSISTENCE_INTERVAL = 5000L;
0147:            private static final long MAX_PERSISTENCE_INTERVAL = 1200000L; // 20 minutes max
0148:            private static final String DUMMY_MEDIA_NAME = "dummy";
0149:            private static final String FILE_MEDIA_NAME = "P";
0150:            private static final char PARAM_SEP = ';';
0151:
0152:            private static final long PERSISTENCE_INTERVAL_DFLT = 300000L;
0153:            private static final boolean WRITE_DISABLED_DFLT = SystemProperties
0154:                    .getBoolean(PERSISTENCE_DISABLE_WRITE_PROP);
0155:
0156:            private static final String PERSISTENCE_CONSOLIDATION_PERIOD_PROP = PERSISTENCE_PROP_PREFIX
0157:                    + PERSISTENCE_CONSOLIDATION_PERIOD_NAME;
0158:            private static final int PERSISTENCE_CONSOLIDATION_PERIOD_DFLT = 10;
0159:
0160:            private static final String[] PERSISTENCE_CLASSES_DFLT = getPersistenceClassesDflt();
0161:
0162:            private static String[] getPersistenceClassesDflt() {
0163:                String prop = SystemProperties
0164:                        .getProperty(PERSISTENCE_CLASS_PROP);
0165:                if (prop != null) {
0166:                    try {
0167:                        return CSVUtility.parse(prop);
0168:                    } catch (Exception e) {
0169:                        Logging.getLogger(PersistenceServiceComponent.class)
0170:                                .error("Failed to parse " + prop, e);
0171:                    }
0172:                }
0173:                boolean disabled = SystemProperties.getProperty(
0174:                        PERSISTENCE_ENABLE_PROP, "false").equals("false");
0175:                if (disabled)
0176:                    return new String[0];
0177:                return new String[] { FilePersistence.class.getName()
0178:                        + PARAM_SEP + FILE_MEDIA_NAME };
0179:            }
0180:
0181:            private static String getDummyPersistenceClass() {
0182:                return DummyPersistence.class.getName() + PARAM_SEP
0183:                        + DUMMY_MEDIA_NAME + PARAM_SEP
0184:                        + PERSISTENCE_INTERVAL_PREFIX
0185:                        + MAX_PERSISTENCE_INTERVAL;
0186:            }
0187:
0188:            private long PERSISTENCE_INTERVAL = SystemProperties.getLong(
0189:                    PERSISTENCE_INTERVAL_PROP, PERSISTENCE_INTERVAL_DFLT);
0190:            private int PERSISTENCE_CONSOLIDATION_PERIOD = SystemProperties
0191:                    .getInt(PERSISTENCE_CONSOLIDATION_PERIOD_PROP,
0192:                            PERSISTENCE_CONSOLIDATION_PERIOD_DFLT);
0193:
0194:            private static class RehydrationSet {
0195:                PersistencePlugin ppi;
0196:                SequenceNumbers sequenceNumbers;
0197:
0198:                RehydrationSet(PersistencePlugin ppi, SequenceNumbers sn) {
0199:                    this .ppi = ppi;
0200:                    this .sequenceNumbers = sn;
0201:                }
0202:            }
0203:
0204:            private static class ClientStuff extends RehydrationData implements 
0205:                    Serializable {
0206:                public void addAssociation(PersistenceAssociation pAssoc) {
0207:                    envelope.addObject(pAssoc.getObject());
0208:                }
0209:
0210:                public void setObjects(List l) {
0211:                    objects = l;
0212:                }
0213:            }
0214:
0215:            private class PersistencePluginInfo {
0216:                PersistencePlugin ppi;
0217:                long nextPersistenceTime;
0218:                int deltaCount = 0;
0219:                SequenceNumbers cleanupSequenceNumbers = null;
0220:
0221:                PersistencePluginInfo(PersistencePlugin ppi) {
0222:                    this .ppi = ppi;
0223:                    if (ppi.getPersistenceInterval() <= 0L) {
0224:                        setInterval(PERSISTENCE_INTERVAL);
0225:                    }
0226:                    if (ppi.getConsolidationPeriod() <= 0) {
0227:                        setConsolidationPeriod(PERSISTENCE_CONSOLIDATION_PERIOD);
0228:                    }
0229:                }
0230:
0231:                long getBehind(long now) {
0232:                    return now - nextPersistenceTime;
0233:                }
0234:
0235:                /**
0236:                 * This is complicated because we want this plugin to be ahead or
0237:                 * behind to the same degree it was ahead or behind before the
0238:                 * change. To do this, we get how much we are currently behind and
0239:                 * multiply that by the ratio of the new and old intervals. The
0240:                 * new base time and delta count are set so that we appear to be
0241:                 * behind by the adjusted amount.
0242:                 */
0243:                void setInterval(long newInterval) {
0244:                    long now = System.currentTimeMillis();
0245:                    long persistenceInterval = ppi.getPersistenceInterval();
0246:                    if (persistenceInterval == 0L) {
0247:                        nextPersistenceTime = newInterval + now;
0248:                    } else {
0249:                        long behind = getBehind(now);
0250:                        long newBehind = behind * newInterval
0251:                                / persistenceInterval;
0252:                        nextPersistenceTime = now - newBehind;
0253:                    }
0254:                    ppi.setPersistenceInterval(newInterval);
0255:                    if (logger.isDebugEnabled()) {
0256:                        logger.debug(ppi.getName() + " persistenceInterval = "
0257:                                + newInterval);
0258:                        logger.debug(ppi.getName() + " nextPersistenceTime = "
0259:                                + nextPersistenceTime);
0260:                    }
0261:                }
0262:
0263:                void setConsolidationPeriod(int newPeriod) {
0264:                    ppi.setConsolidationPeriod(newPeriod);
0265:                    if (logger.isDebugEnabled()) {
0266:                        logger.debug(ppi.getName() + " consolidationPeriod = "
0267:                                + newPeriod);
0268:                    }
0269:                }
0270:            }
0271:
0272:            private DataProtectionServiceClient dataProtectionServiceClient = new DataProtectionServiceClient() {
0273:                public Iterator iterator() {
0274:                    return getDataProtectionKeyIterator();
0275:                }
0276:
0277:                public MessageAddress getAgentIdentifier() {
0278:                    return PersistenceServiceComponent.this .getMessageAddress();
0279:                }
0280:            };
0281:
0282:            private static class PersistenceKeyEnvelope implements 
0283:                    DataProtectionKeyEnvelope {
0284:                PersistencePlugin ppi;
0285:                int deltaNumber;
0286:                DataProtectionKey theKey = null;
0287:
0288:                public PersistenceKeyEnvelope(PersistencePlugin ppi,
0289:                        int deltaNumber) {
0290:                    this .deltaNumber = deltaNumber;
0291:                    this .ppi = ppi;
0292:                }
0293:
0294:                public void setDataProtectionKey(DataProtectionKey aKey)
0295:                        throws IOException {
0296:                    theKey = aKey;
0297:                    ppi.storeDataProtectionKey(deltaNumber, aKey);
0298:                }
0299:
0300:                public DataProtectionKey getDataProtectionKey()
0301:                        throws IOException {
0302:                    if (theKey == null) {
0303:                        theKey = ppi.retrieveDataProtectionKey(deltaNumber);
0304:                    }
0305:                    return theKey;
0306:                }
0307:            }
0308:
0309:            private Iterator getDataProtectionKeyIterator() {
0310:                List envelopes = new ArrayList();
0311:                RehydrationSet[] rehydrationSets = getRehydrationSets("");
0312:                for (int i = 0; i < rehydrationSets.length; i++) {
0313:                    SequenceNumbers sequenceNumbers = rehydrationSets[i].sequenceNumbers;
0314:                    final PersistencePlugin ppi = rehydrationSets[i].ppi;
0315:                    for (int seq = sequenceNumbers.first; seq < sequenceNumbers.current; seq++) {
0316:                        envelopes.add(new PersistenceKeyEnvelope(ppi, seq));
0317:                    }
0318:                }
0319:                return envelopes.iterator();
0320:            }
0321:
0322:            private PersistencePluginInfo getPluginInfo(String pluginName) {
0323:                PersistencePluginInfo ppio = (PersistencePluginInfo) plugins
0324:                        .get(pluginName);
0325:                if (ppio != null)
0326:                    return ppio;
0327:                throw new IllegalArgumentException(
0328:                        "No such persistence medium: " + pluginName);
0329:            }
0330:
0331:            private String getAgentName() {
0332:                MessageAddress addr = getMessageAddress();
0333:                return (addr == null ? "null" : addr.toString());
0334:            }
0335:
0336:            public class PersistenceControlServiceImpl implements 
0337:                    PersistenceControlService {
0338:                public String[] getControlNames() {
0339:                    return new String[0];
0340:                }
0341:
0342:                public OMCRangeList getControlValues(String controlName) {
0343:                    throw new IllegalArgumentException("No such control: "
0344:                            + controlName);
0345:                }
0346:
0347:                public void setControlValue(String controlName,
0348:                        Comparable newValue) {
0349:                    throw new IllegalArgumentException("No such control: "
0350:                            + controlName);
0351:                }
0352:
0353:                public String[] getMediaNames() {
0354:                    return (String[]) plugins.keySet().toArray(
0355:                            new String[plugins.size()]);
0356:                }
0357:
0358:                public String[] getMediaControlNames(String mediaName) {
0359:                    getPluginInfo(mediaName); // Test existence
0360:                    return new String[] { PERSISTENCE_INTERVAL_NAME,
0361:                            PERSISTENCE_CONSOLIDATION_PERIOD_NAME, };
0362:                }
0363:
0364:                public OMCRangeList getMediaControlValues(String mediaName,
0365:                        String controlName) {
0366:                    getPluginInfo(mediaName);// Test existence
0367:                    if (controlName.equals(PERSISTENCE_INTERVAL_NAME)) {
0368:                        return new OMCRangeList(new Long(
0369:                                MIN_PERSISTENCE_INTERVAL), new Long(
0370:                                MAX_PERSISTENCE_INTERVAL));
0371:                    }
0372:                    if (controlName
0373:                            .equals(PERSISTENCE_CONSOLIDATION_PERIOD_NAME)) {
0374:                        return new OMCRangeList(new Integer(1), new Integer(20));
0375:                    }
0376:                    throw new IllegalArgumentException(mediaName
0377:                            + " has no control named: " + controlName);
0378:                }
0379:
0380:                public void setMediaControlValue(String mediaName,
0381:                        String controlName, Comparable newValue) {
0382:                    PersistencePluginInfo ppio = getPluginInfo(mediaName);
0383:                    if (controlName.equals(PERSISTENCE_INTERVAL_NAME)) {
0384:                        ppio.setInterval(((Number) newValue).longValue());
0385:                        recomputeNextPersistenceTime = true;
0386:                        return;
0387:                    }
0388:                    if (controlName
0389:                            .equals(PERSISTENCE_CONSOLIDATION_PERIOD_NAME)) {
0390:                        ppio.setConsolidationPeriod(((Number) newValue)
0391:                                .intValue());
0392:                        return;
0393:                    }
0394:                    throw new IllegalArgumentException(mediaName
0395:                            + " has no control named: " + controlName);
0396:                }
0397:            }
0398:
0399:            static {
0400:                boolean checkPatch = SystemProperties.getBoolean(
0401:                        PERSISTENCE_VERIFY_JAVA_IO_PATCH_PROP, true);
0402:                if (checkPatch) {
0403:                    PersistenceInputStream.checkSuperclass();
0404:                }
0405:            }
0406:
0407:            // Component implementation
0408:
0409:            public void setServiceBroker(ServiceBroker sb) {
0410:                this .sb = sb;
0411:            }
0412:
0413:            /**
0414:             * Ideally, our agent would give us our parameters, but limitations
0415:             * in ACME or CSMART may preclude this possibility. So, this method
0416:             * is an alternate using system properties. The property named
0417:             * org.cougaar.core.persistence.<agent> may have a value that is a
0418:             * comma separated list of values of parameters. Each item of the
0419:             * list is of the form <name>=<value> where <name> is the simple
0420:             * name for the parameter (without the org.cougaar.persistence.
0421:             * prefix). Many of the values in this list will contain commas and
0422:             * this must be quoted with backslash or quotes.
0423:             */
0424:            private static List getParametersFromProperties(
0425:                    MessageAddress agentId) {
0426:                List ret = new ArrayList();
0427:                String pname = PERSISTENCE_PARAMETERS_PROP + "." + agentId;
0428:                String pvalue = SystemProperties.getProperty(pname);
0429:                if (pvalue != null) {
0430:                    String[] ps = CSVUtility.parse(pvalue);
0431:                    for (int i = 0; i < ps.length; i++) {
0432:                        ret.add(PERSISTENCE_PROP_PREFIX + ps[i]);
0433:                    }
0434:                }
0435:                return ret;
0436:            }
0437:
0438:            /**
0439:             * Optional persistence parameters for this agent
0440:             */
0441:            public void setParameter(Object o) {
0442:                if (o instanceof  List) {
0443:                    params.addAll((List) o);
0444:                } else {
0445:                    throw new IllegalArgumentException("Illegal parameter " + o);
0446:                }
0447:            }
0448:
0449:            /**
0450:             * Load
0451:             */
0452:            public void load() {
0453:                super .load();
0454:
0455:                // Get our local agent's address
0456:                AgentIdentificationService ais = (AgentIdentificationService) sb
0457:                        .getService(this , AgentIdentificationService.class,
0458:                                null);
0459:                if (ais != null) {
0460:                    agentId = ais.getMessageAddress();
0461:                    sb.releaseService(this , AgentIdentificationService.class,
0462:                            ais);
0463:                }
0464:
0465:                // Create our logger
0466:                logger = Logging.getLogger(this .getClass());
0467:                logger = new LoggingServiceWithPrefix(logger, getAgentName()
0468:                        + ": ");
0469:
0470:                // Add persistence parameters defined by system properties
0471:                params.addAll(getParametersFromProperties(agentId));
0472:
0473:                // Set agent-wide persistence defaults
0474:                List overridePluginClasses = new ArrayList();
0475:                for (int i = 0, n = params.size(); i < n; i++) {
0476:                    String fullParam = (String) params.get(i);
0477:                    if (!fullParam.startsWith(PERSISTENCE_PROP_PREFIX))
0478:                        continue; // Not mine
0479:                    String param = fullParam.substring(PERSISTENCE_PROP_PREFIX
0480:                            .length());
0481:                    try {
0482:                        if (param.startsWith(PERSISTENCE_INTERVAL_PREFIX)) {
0483:                            PERSISTENCE_INTERVAL = Long.parseLong(param
0484:                                    .substring(PERSISTENCE_INTERVAL_PREFIX
0485:                                            .length()));
0486:                            continue;
0487:                        }
0488:                        if (param
0489:                                .startsWith(PERSISTENCE_CONSOLIDATION_PERIOD_PREFIX)) {
0490:                            PERSISTENCE_CONSOLIDATION_PERIOD = Integer
0491:                                    .parseInt(param
0492:                                            .substring(PERSISTENCE_CONSOLIDATION_PERIOD_PREFIX
0493:                                                    .length()));
0494:                            continue;
0495:                        }
0496:                        if (param.startsWith(PERSISTENCE_DISABLE_WRITE_PREFIX)) {
0497:                            writeDisabled = "true".equals(param
0498:                                    .substring(PERSISTENCE_DISABLE_WRITE_PREFIX
0499:                                            .length()));
0500:                            continue;
0501:                        }
0502:                        if (param.startsWith(PERSISTENCE_ARCHIVE_NUMBER_PREFIX)) {
0503:                            archiveNumber = param
0504:                                    .substring(PERSISTENCE_ARCHIVE_NUMBER_PREFIX
0505:                                            .length());
0506:                            writeDisabled = true; // Setting the archive number only makes sense if not writing
0507:                            continue;
0508:                        }
0509:                        if (param.startsWith(PERSISTENCE_CLASS_PREFIX)) {
0510:                            String plugin = param
0511:                                    .substring(PERSISTENCE_CLASS_PREFIX
0512:                                            .length());
0513:                            overridePluginClasses.add(plugin);
0514:                            continue;
0515:                        }
0516:                    } catch (Exception e) {
0517:                        logger.error("Error parsing parameter: " + fullParam);
0518:                    }
0519:                }
0520:                String[] pluginClasses = PERSISTENCE_CLASSES_DFLT;
0521:                if (overridePluginClasses.size() > 0) {
0522:                    // Replace default with specific classes
0523:                    pluginClasses = new String[overridePluginClasses.size()];
0524:                    pluginClasses = (String[]) overridePluginClasses
0525:                            .toArray(pluginClasses);
0526:                }
0527:
0528:                identityTable = new IdentityTable(logger);
0529:                registerServices(sb);
0530:                try {
0531:                    for (int i = 0; i < pluginClasses.length; i++) {
0532:                        addPlugin(pluginClasses[i]);
0533:                    }
0534:                    // There must be at least one writable plugin
0535:                    boolean haveWritablePlugin = false;
0536:                    for (Iterator i = plugins.values().iterator(); i.hasNext();) {
0537:                        PersistencePluginInfo ppio = (PersistencePluginInfo) i
0538:                                .next();
0539:                        if (ppio.ppi.isWritable()) {
0540:                            haveWritablePlugin = true;
0541:                            break;
0542:                        }
0543:                    }
0544:                    if (!haveWritablePlugin) {
0545:                        // Add a dummy persistence plugin
0546:                        addPlugin(getDummyPersistenceClass());
0547:                        isDummy = true;
0548:                    }
0549:                } catch (Exception e) {
0550:                    throw new RuntimeException("Exception in load()", e);
0551:                }
0552:            }
0553:
0554:            private void addPlugin(String pluginSpec)
0555:                    throws PersistenceException {
0556:                String[] paramTokens = CSVUtility.parse(pluginSpec, PARAM_SEP);
0557:                if (paramTokens.length < 1) {
0558:                    throw new PersistenceException(
0559:                            "No plugin class specified: " + pluginSpec);
0560:                }
0561:                if (paramTokens.length < 2) {
0562:                    throw new PersistenceException("No plugin name: "
0563:                            + pluginSpec);
0564:                }
0565:                try {
0566:                    Class pluginClass = Class.forName((String) paramTokens[0]);
0567:                    String pluginName = (String) paramTokens[1];
0568:                    String[] pluginParams = new String[paramTokens.length - 2];
0569:                    System.arraycopy(paramTokens, 2, pluginParams, 0,
0570:                            pluginParams.length);
0571:                    PersistencePlugin ppi = (PersistencePlugin) pluginClass
0572:                            .newInstance();
0573:                    if (writeDisabled)
0574:                        ppi.setWritable(false); // Force write off
0575:                    addPlugin(ppi, pluginName, pluginParams);
0576:                } catch (ClassNotFoundException cnfe) {
0577:                    throw new PersistenceException("Bad plugin class", cnfe);
0578:                } catch (InstantiationException ie) {
0579:                    throw new PersistenceException(
0580:                            "Plugin instantiation failed", ie);
0581:                } catch (IllegalAccessException iae) {
0582:                    throw new PersistenceException(
0583:                            "Plugin constructor inaccessible", iae);
0584:                }
0585:            }
0586:
0587:            public void unload() {
0588:                unregisterServices(sb);
0589:                if (dataProtectionService != null) {
0590:                    sb.releaseService(dataProtectionServiceClient,
0591:                            DataProtectionService.class, dataProtectionService);
0592:                }
0593:            }
0594:
0595:            private DataProtectionService getDataProtectionService() {
0596:                if (dataProtectionService == null) {
0597:
0598:                    // For running with security, getting the service, which may be
0599:                    // the security service, is a privileged action
0600:                    // See RFE 3771
0601:                    dataProtectionService = (DataProtectionService) AccessController
0602:                            .doPrivileged(new PrivilegedAction() {
0603:                                public Object run() {
0604:                                    return sb.getService(
0605:                                            dataProtectionServiceClient,
0606:                                            DataProtectionService.class, null);
0607:                                }
0608:                            });
0609:
0610:                    if (dataProtectionService == null) {
0611:                        if (logger.isInfoEnabled())
0612:                            logger.info("No DataProtectionService Available.");
0613:                        if (SystemProperties
0614:                                .getBoolean("org.cougaar.core.persistence.DataProtectionServiceStubEnabled")) {
0615:                            dataProtectionService = new DataProtectionServiceStub();
0616:                        }
0617:                    } else {
0618:                        if (logger.isInfoEnabled())
0619:                            logger.info("DataProtectionService is "
0620:                                    + dataProtectionService.getClass()
0621:                                            .getName());
0622:                    }
0623:                }
0624:                return dataProtectionService;
0625:            }
0626:
0627:            /**
0628:             * Keeps all associations of objects that have been persisted.
0629:             */
0630:            private IdentityTable identityTable;
0631:            private SequenceNumbers sequenceNumbers = null;
0632:            private ObjectOutputStream currentOutput;
0633:            private List params = new ArrayList();
0634:            private MessageAddress agentId;
0635:            private List associationsToPersist = new ArrayList();
0636:            private boolean previousPersistFailed = false;
0637:            private Logger logger;
0638:            private DataProtectionService dataProtectionService;
0639:            private boolean writeDisabled = WRITE_DISABLED_DFLT;
0640:            private String archiveNumber = "";
0641:            private Map plugins = new HashMap();
0642:            private boolean isDummy = false;
0643:            private Map rehydrationResult = null;
0644:            private Map clients = new HashMap();
0645:            private ServiceBroker sb;
0646:            private boolean full; // Private to persist method and methods it calls
0647:
0648:            /**
0649:             * The current PersistencePlugin being used to generate persistence
0650:             * deltas. This is changed just prior to generating a full delta if
0651:             * there is a different plugin having priority.
0652:             */
0653:            private PersistencePluginInfo currentPersistPluginInfo;
0654:
0655:            private long previousPersistenceTime = System.currentTimeMillis();
0656:
0657:            private long nextPersistenceTime;
0658:
0659:            private boolean recomputeNextPersistenceTime = true;
0660:
0661:            private PersistenceMetricsServiceImpl metricsService = new PersistenceMetricsServiceImpl();
0662:
0663:            private void addPlugin(PersistencePlugin ppi, String pluginName,
0664:                    String[] pluginParams) throws PersistenceException {
0665:                boolean deleteOldPersistence = SystemProperties
0666:                        .getBoolean("org.cougaar.core.persistence.clear");
0667:                if (deleteOldPersistence && logger.isInfoEnabled()) {
0668:                    logger.info("Clearing old persistence data");
0669:                }
0670:                ppi.init(this , pluginName, pluginParams, deleteOldPersistence);
0671:                plugins.put(ppi.getName(), new PersistencePluginInfo(ppi));
0672:            }
0673:
0674:            public MessageAddress getMessageAddress() {
0675:                return agentId;
0676:            }
0677:
0678:            public boolean isDummyPersistence() {
0679:                return isDummy;
0680:            }
0681:
0682:            /**
0683:             * Gets the system time when persistence should be performed. We do
0684:             * persistence periodically with a period such that all the plugins
0685:             * will, on the average create persistence deltas with their
0686:             * individual periods. The average frequency of persistence is the
0687:             * sum of the individual media frequencies. Frequency is the
0688:             * reciprocal of period. The computation is:<p>
0689:             *
0690:             * &nbsp;&nbsp;T = 1/(1/T1 + 1/T2 + ... + 1/Tn)
0691:             * <p>
0692:             * @return the time of the next persistence delta
0693:             */
0694:            public long getPersistenceTime() {
0695:                if (recomputeNextPersistenceTime) {
0696:                    double sum = 0.0;
0697:                    for (Iterator i = plugins.values().iterator(); i.hasNext();) {
0698:                        PersistencePluginInfo ppio = (PersistencePluginInfo) i
0699:                                .next();
0700:                        if (ppio.ppi.isWritable()) {
0701:                            sum += 1.0 / ppio.ppi.getPersistenceInterval();
0702:                        }
0703:                    }
0704:                    long interval = (long) (1.0 / sum);
0705:                    nextPersistenceTime = previousPersistenceTime + interval;
0706:                    recomputeNextPersistenceTime = false;
0707:                    if (logger.isDebugEnabled())
0708:                        logger.debug("persistence interval=" + interval);
0709:                }
0710:                return nextPersistenceTime;
0711:            }
0712:
0713:            private ClientStuff getClientStuff(PersistenceIdentity clientId) {
0714:                if (rehydrationResult == null)
0715:                    return null; // No rehydration
0716:                ClientStuff clientStuff = (ClientStuff) rehydrationResult
0717:                        .get(clientId);
0718:                if (clientStuff == null) {
0719:                    clientStuff = new ClientStuff();
0720:                    rehydrationResult.put(clientId, clientStuff);
0721:                }
0722:                return clientStuff;
0723:            }
0724:
0725:            private RehydrationData getRehydrationData(
0726:                    PersistenceIdentity clientId) {
0727:                return getClientStuff(clientId);
0728:            }
0729:
0730:            /**
0731:             * Rehydrate a persisted agent. Reads all the deltas in order
0732:             * keeping the latest (last encountered) values from every object.
0733:             * The rehydrated state is saved in rehydrationResult.
0734:             * @param state a PersistenceObject if rehydrating from a saved
0735:             * state object. If null, rehydrate from media plugins
0736:             */
0737:            private void rehydrate(final PersistenceObject pObject) {
0738:                if (isDummy && pObject == null) {
0739:                    return; // Nothing to rehydrate
0740:                }
0741:                synchronized (identityTable) {
0742:                    final List rehydrationCollection = new ArrayList();
0743:                    identityTable
0744:                            .setRehydrationCollection(rehydrationCollection);
0745:                    try {
0746:                        final RehydrationSet[] rehydrationSets = getRehydrationSets(archiveNumber);
0747:                        if (pObject != null || rehydrationSets.length > 0) { // Deltas exist
0748:                            try {
0749:                                final Map[] resultPtr = new Map[1];
0750:                                Runnable thunk = new Runnable() {
0751:                                    public void run() {
0752:                                        try {
0753:                                            if (pObject != null) {
0754:                                                if (logger.isInfoEnabled()) {
0755:                                                    logger
0756:                                                            .info("Rehydrating "
0757:                                                                    + getMessageAddress()
0758:                                                                    + " from "
0759:                                                                    + pObject);
0760:                                                }
0761:                                                resultPtr[0] = rehydrateFromBytes(pObject
0762:                                                        .getBytes());
0763:                                            } else {
0764:                                                // Loop through the available RehydrationSets and
0765:                                                // attempt to rehydrate from each one until no errors
0766:                                                // occur. This will normally happen on the very first
0767:                                                // set, but might fail if the data has been corrupted
0768:                                                // in some way.
0769:                                                boolean success = false;
0770:                                                for (int i = 0; i < rehydrationSets.length; i++) {
0771:                                                    SequenceNumbers rehydrateNumbers = rehydrationSets[i].sequenceNumbers;
0772:                                                    PersistencePlugin ppi = rehydrationSets[i].ppi;
0773:                                                    if (logger.isInfoEnabled()) {
0774:                                                        logger
0775:                                                                .info("Rehydrating "
0776:                                                                        + getMessageAddress()
0777:                                                                        + " "
0778:                                                                        + rehydrateNumbers
0779:                                                                                .toString());
0780:                                                    }
0781:                                                    try {
0782:                                                        while (rehydrateNumbers.first < rehydrateNumbers.current - 1) {
0783:                                                            rehydrateOneDelta(
0784:                                                                    ppi,
0785:                                                                    rehydrateNumbers.first++,
0786:                                                                    false);
0787:                                                        }
0788:                                                        resultPtr[0] = rehydrateOneDelta(
0789:                                                                ppi,
0790:                                                                rehydrateNumbers.first++,
0791:                                                                true);
0792:                                                        success = true;
0793:                                                        break; // Successful rehydration
0794:                                                    } catch (Exception e) { // Rehydration failed
0795:                                                        logger
0796:                                                                .error(
0797:                                                                        "Rehydration from "
0798:                                                                                + rehydrationSets[i]
0799:                                                                                + " failed: ",
0800:                                                                        e);
0801:                                                        resetRehydration(rehydrationCollection);
0802:                                                        continue; // Try next RehydrationSet
0803:                                                    }
0804:                                                }
0805:                                                if (!success) {
0806:                                                    logger
0807:                                                            .error("Rehydration failed. Starting over from scratch");
0808:                                                }
0809:                                            }
0810:                                        } catch (Exception ioe) {
0811:                                            throw new RuntimeException(
0812:                                                    "withClusterContext", ioe);
0813:                                        }
0814:                                    }
0815:                                };
0816:
0817:                                ClusterContextTable.withClusterContext(
0818:                                        getMessageAddress(), thunk);
0819:                                Map clientData = resultPtr[0];
0820:                                if (clientData == null)
0821:                                    return; // Didn't rehydrate
0822:                                rehydrationResult = new HashMap();
0823:                                for (Iterator iter = identityTable.iterator(); iter
0824:                                        .hasNext();) {
0825:                                    PersistenceAssociation pAssoc = (PersistenceAssociation) iter
0826:                                            .next();
0827:                                    Object obj = pAssoc.getObject();
0828:                                    PersistenceIdentity clientId = pAssoc
0829:                                            .getClientId();
0830:                                    if (pAssoc.isActive()) {
0831:                                        if (logger.isDetailEnabled())
0832:                                            logger.detail(clientId
0833:                                                    + ": addAssociation "
0834:                                                    + pAssoc);
0835:                                        getClientStuff(clientId)
0836:                                                .addAssociation(pAssoc);
0837:
0838:                                        // Bug 3588: Do postRehydration work here, only for objects on 
0839:                                        // BBoard, instead of for all.
0840:                                        // This is wrong if the Composition on an InActive MPTask is 
0841:                                        // wanted  and not on another MPTask but could be later. 
0842:                                        // Also if an inactive object can become active.
0843:                                        if (obj instanceof  ActivePersistenceObject) {
0844:                                            ((ActivePersistenceObject) obj)
0845:                                                    .postRehydration(logger);
0846:                                        }
0847:                                    } else {
0848:                                        if (logger.isDetailEnabled())
0849:                                            logger.detail(clientId
0850:                                                    + ": inactive " + pAssoc);
0851:                                    }
0852:                                }
0853:                                if (logger.isDetailEnabled()) {
0854:                                    printIdentityTable("");
0855:                                }
0856:                                for (Iterator i = clientData.entrySet()
0857:                                        .iterator(); i.hasNext();) {
0858:                                    Map.Entry entry = (Map.Entry) i.next();
0859:                                    PersistenceIdentity clientId = (PersistenceIdentity) entry
0860:                                            .getKey();
0861:                                    List clientObjects = (List) entry
0862:                                            .getValue();
0863:                                    ClientStuff clientStuff = getClientStuff(clientId);
0864:                                    clientStuff.setObjects(clientObjects);
0865:                                    if (logger.isDetailEnabled()) {
0866:                                        logger.detail("PersistenceEnvelope of "
0867:                                                + clientId);
0868:                                        logEnvelopeContents(clientStuff
0869:                                                .getPersistenceEnvelope());
0870:                                        logger.detail("Other objects of "
0871:                                                + clientId + ": "
0872:                                                + clientStuff.getObjects());
0873:                                    }
0874:                                }
0875:                                clearMarks(identityTable.iterator());
0876:                            } catch (Exception e) {
0877:                                logger.error("Error during rehydration", e);
0878:                            }
0879:
0880:                        }
0881:                    } finally {
0882:                        rehydrationCollection.clear(); // Allow garbage collection
0883:                        identityTable.setRehydrationCollection(null); // Perform garbage collection
0884:                    }
0885:                }
0886:            }
0887:
0888:            /**
0889:             * Loop through all plugins and get their available sequence number
0890:             * sets. Create RehydrationSets from the sequence numbers sets and
0891:             * sort them by timestamp.
0892:             * @return the sorted RehydrationSets
0893:             */
0894:            private RehydrationSet[] getRehydrationSets(String suffix) {
0895:                List result = new ArrayList();
0896:                for (Iterator i = plugins.values().iterator(); i.hasNext();) {
0897:                    PersistencePluginInfo ppio = (PersistencePluginInfo) i
0898:                            .next();
0899:                    SequenceNumbers[] pluginNumbers = ppio.ppi
0900:                            .readSequenceNumbers(suffix);
0901:                    for (int j = 0; j < pluginNumbers.length; j++) {
0902:                        result.add(new RehydrationSet(ppio.ppi,
0903:                                pluginNumbers[j]));
0904:                    }
0905:                }
0906:                Collections.sort(result, new Comparator() {
0907:                    public int compare(Object o1, Object o2) {
0908:                        RehydrationSet rs1 = (RehydrationSet) o1;
0909:                        RehydrationSet rs2 = (RehydrationSet) o2;
0910:                        int diff = rs1.sequenceNumbers
0911:                                .compareTo(rs1.sequenceNumbers);
0912:                        if (diff != 0)
0913:                            return -diff;
0914:                        return rs1.ppi.getName().compareTo(rs2.ppi.getName());
0915:                    }
0916:                });
0917:                return (RehydrationSet[]) result
0918:                        .toArray(new RehydrationSet[result.size()]);
0919:            }
0920:
0921:            private void logEnvelopeContents(Envelope env) {
0922:                for (Iterator cc = env.getAllTuples(); cc.hasNext();) {
0923:                    EnvelopeTuple t = (EnvelopeTuple) cc.next();
0924:                    String action = "";
0925:                    switch (t.getAction()) {
0926:                    case Envelope.ADD:
0927:                        action = "ADD";
0928:                        break;
0929:                    case Envelope.REMOVE:
0930:                        action = "REMOVE";
0931:                        break;
0932:                    case Envelope.CHANGE:
0933:                        action = "CHANGE";
0934:                        break;
0935:                    case Envelope.BULK:
0936:                        action = "BULK";
0937:                        break;
0938:                    }
0939:                    logger.detail(action + " " + t.getObject());
0940:                }
0941:            }
0942:
0943:            public static String hc(Object o) {
0944:                return (Integer.toHexString(System.identityHashCode(o)) + " " + (o == null ? "<null>"
0945:                        : o.toString()));
0946:            }
0947:
0948:            /**
0949:             * Erase all the effects of a failed rehydration attempt.
0950:             */
0951:            private void resetRehydration(Collection rehydrationCollection) {
0952:                identityTable = new IdentityTable(logger);
0953:                rehydrationCollection.clear();
0954:                identityTable.setRehydrationCollection(rehydrationCollection);
0955:            }
0956:
0957:            private Map rehydrateOneDelta(PersistencePlugin ppi,
0958:                    int deltaNumber, boolean lastDelta) throws IOException,
0959:                    ClassNotFoundException {
0960:                InputStream is = ppi.openInputStream(deltaNumber);
0961:                DataProtectionService dataProtectionService = getDataProtectionService();
0962:                if (dataProtectionService != null) {
0963:                    PersistenceKeyEnvelope keyEnvelope = new PersistenceKeyEnvelope(
0964:                            ppi, deltaNumber);
0965:                    is = dataProtectionService.getInputStream(keyEnvelope, is);
0966:                }
0967:                ObjectInputStream ois = new ObjectInputStream(is);
0968:                try {
0969:                    return rehydrateFromStream(ois, deltaNumber, lastDelta);
0970:                } finally {
0971:                    ois.close();
0972:                    ppi.finishInputStream(deltaNumber);
0973:                }
0974:            }
0975:
0976:            private Map rehydrateFromBytes(byte[] bytes) throws IOException,
0977:                    ClassNotFoundException {
0978:                ByteArrayInputStream bs = new ByteArrayInputStream(bytes);
0979:                return rehydrateFromStream(new ObjectInputStream(bs), 0, true);
0980:            }
0981:
0982:            private Map rehydrateFromStream(ObjectInputStream currentInput,
0983:                    int deltaNumber, boolean lastDelta) throws IOException,
0984:                    ClassNotFoundException {
0985:                try {
0986:                    identityTable.setNextId(currentInput.readInt());
0987:                    int length = currentInput.readInt();
0988:                    if (logger.isDebugEnabled()) {
0989:                        logger.debug("Reading " + length + " objects");
0990:                    }
0991:                    PersistenceReference[][] referenceArrays = new PersistenceReference[length][];
0992:                    for (int i = 0; i < length; i++) {
0993:                        referenceArrays[i] = (PersistenceReference[]) currentInput
0994:                                .readObject();
0995:                    }
0996:                    //        byte[] bytes = (byte[]) currentInput.readObject();
0997:                    //        PersistenceInputStream stream = new PersistenceInputStream(bytes);
0998:                    PersistenceInputStream stream = new PersistenceInputStream(
0999:                            currentInput, logger);
1000:                    if (logger.isDetailEnabled()) {
1001:                        writeHistoryHeader();
1002:                    }
1003:                    stream.setIdentityTable(identityTable);
1004:                    try {
1005:                        for (int i = 0; i < referenceArrays.length; i++) {
1006:                            // Side effect: updates identityTable
1007:                            stream.readAssociation(referenceArrays[i]);
1008:                        }
1009:                        if (lastDelta) {
1010:                            return (Map) stream.readObject();
1011:                        } else {
1012:                            return null;
1013:                        }
1014:                    } finally {
1015:                        stream.close();
1016:                    }
1017:                } catch (IOException e) {
1018:                    logger.error("IOException reading "
1019:                            + (lastDelta ? "last " : " ") + "delta "
1020:                            + deltaNumber);
1021:                    throw e;
1022:                }
1023:            }
1024:
1025:            /**
1026:             * Get highest sequence numbers recorded on any medium
1027:             */
1028:            private int getHighestSequenceNumber() {
1029:                int best = 0;
1030:                for (Iterator i = plugins.values().iterator(); i.hasNext();) {
1031:                    PersistencePluginInfo ppio = (PersistencePluginInfo) i
1032:                            .next();
1033:                    SequenceNumbers[] availableNumbers = ppio.ppi
1034:                            .readSequenceNumbers("");
1035:                    for (int j = 0; j < availableNumbers.length; j++) {
1036:                        SequenceNumbers t = availableNumbers[j];
1037:                        best = Math.max(best, t.current);
1038:                    }
1039:                }
1040:                return best;
1041:            }
1042:
1043:            private void initSequenceNumbers() {
1044:                int highest = getHighestSequenceNumber();
1045:                sequenceNumbers = new SequenceNumbers(highest, highest, System
1046:                        .currentTimeMillis());
1047:            }
1048:
1049:            /**
1050:             * Persist a List of Envelopes. First the objects in the envelope
1051:             * are entered into the persistence identityTable. Then the objects
1052:             * are serialized to an ObjectOutputStream preceded with their
1053:             * reference id.  Other references to objects in the identityTable
1054:             * are replaced with reference objects.
1055:             */
1056:            //  private boolean nonEmpty(List subscriberStates) {
1057:            //    for (Iterator iter = subscriberStates.iterator(); iter.hasNext(); ) {
1058:            //      PersistenceSubscriberState subscriberState = (PersistenceSubscriberState) iter.next();
1059:            //      if (subscriberState.pendingEnvelopes.size() > 0) return true;
1060:            //      if (subscriberState.transactionEnvelopes.size() > 0) return true;
1061:            //    }
1062:            //    return false;
1063:            //  }
1064:            private boolean isPersistable(Object o) {
1065:                if (o instanceof  NotPersistable)
1066:                    return false;
1067:                if (o instanceof  Persistable) {
1068:                    Persistable pbl = (Persistable) o;
1069:                    return pbl.isPersistable();
1070:                }
1071:                return true;
1072:            }
1073:
1074:            private void addEnvelope(Envelope e, PersistenceIdentity clientId)
1075:                    throws PersistenceException {
1076:                if (logger.isDetailEnabled())
1077:                    logger.detail(clientId + ": addEnvelope " + e);
1078:                for (Iterator envelope = e.getAllTuples(); envelope.hasNext();) {
1079:                    addEnvelopeTuple((EnvelopeTuple) envelope.next(), clientId);
1080:                }
1081:            }
1082:
1083:            private void addEnvelopeTuple(EnvelopeTuple tuple,
1084:                    PersistenceIdentity clientId) throws PersistenceException {
1085:                if (logger.isDetailEnabled())
1086:                    logger.detail(clientId + ": addEnvelopeTuple " + tuple);
1087:                switch (tuple.getAction()) {
1088:                case Envelope.BULK:
1089:                    Collection collection = (Collection) tuple.getObject();
1090:                    for (Iterator iter2 = collection.iterator(); iter2
1091:                            .hasNext();) {
1092:                        addObjectToPersist(iter2.next(), true, clientId);
1093:                    }
1094:                    break;
1095:                case Envelope.ADD:
1096:                case Envelope.CHANGE:
1097:                    addObjectToPersist(tuple.getObject(), true, clientId);
1098:                    break;
1099:                case Envelope.REMOVE:
1100:                    addObjectToPersist(tuple.getObject(), false, clientId);
1101:                    break;
1102:                }
1103:            }
1104:
1105:            private void addObjectToPersist(Object object, boolean newActive,
1106:                    PersistenceIdentity clientId) throws PersistenceException {
1107:                if (!isPersistable(object))
1108:                    return;
1109:                PersistenceAssociation pAssoc = identityTable
1110:                        .findOrCreate(object);
1111:                PersistenceIdentity oldClientId = pAssoc.getClientId();
1112:                if (oldClientId == null) {
1113:                    pAssoc.setClientId(clientId);
1114:                } else if (!oldClientId.equals(clientId)) {
1115:                    throw new PersistenceException(clientId + " not owner");
1116:                }
1117:                if (logger.isDetailEnabled())
1118:                    logger.detail(clientId + ": addObjectToPersist " + object
1119:                            + ", " + newActive);
1120:                if (newActive) {
1121:                    pAssoc.setActive();
1122:                } else {
1123:                    pAssoc.setInactive();
1124:                }
1125:                if (!full)
1126:                    addAssociationToPersist(pAssoc);
1127:            }
1128:
1129:            private void addAssociationToPersist(PersistenceAssociation pAssoc) {
1130:                if (pAssoc.isMarked())
1131:                    return; // Already scheduled to be written
1132:                pAssoc.setMarked(true);
1133:                addMarkedAssociation(pAssoc);
1134:            }
1135:
1136:            private void addMarkedAssociation(PersistenceAssociation pAssoc) {
1137:                objectsThatMightGoAway.add(pAssoc.getObject());
1138:                associationsToPersist.add(pAssoc);
1139:            }
1140:
1141:            private void clearMarks(Iterator iter) {
1142:                while (iter.hasNext()) {
1143:                    PersistenceAssociation pAssoc = (PersistenceAssociation) iter
1144:                            .next();
1145:                    pAssoc.setMarked(false);
1146:                }
1147:            }
1148:
1149:            private void addExistingMarkedAssociations(Iterator iter) {
1150:                while (iter.hasNext()) {
1151:                    PersistenceAssociation pAssoc = (PersistenceAssociation) iter
1152:                            .next();
1153:                    if (pAssoc.isMarked()) {
1154:                        if (logger.isInfoEnabled()) {
1155:                            logger.info("Previously marked: " + pAssoc);
1156:                        }
1157:                        addMarkedAssociation(pAssoc);
1158:                    }
1159:                }
1160:            }
1161:
1162:            /**
1163:             * Select the next plugin to use for persistence. This is only
1164:             * possible when the delta that is about to be generated will have
1165:             * the full state. For each plugin, we keep track of the
1166:             * nextPersistenceTime based on its persistenceInterval. We select
1167:             * the plugin with the earliest nextPersistenceTime. If the
1168:             * nextPersistenceTime of the selected plugin differs significantly
1169:             * from now, the nextPersistenceTimes of all plugins are adjusted to
1170:             * eliminate that difference.
1171:             *
1172:             * Persistence snapshots are taken with a frequency that is the
1173:             * average of frequencies of all the plugins. This means that any
1174:             * particular plugin will persistence with a frequency greater that
1175:             * its specified frequency for a while, but then will be inactive
1176:             * for a period while other plugins are used for an interval such
1177:             * that its average frequency is close to its spcified frequency.
1178:             *
1179:             * As an example, consider two plugins A and B with periods of 10
1180:             * and 40 respectively. The consolidation period for both is 10. The
1181:             * average frequency is 1/10 + 1/40 or 5/40. This means the
1182:             * inter-snapshot interval will be 8 and the faster plugin (A) will
1183:             * go first since it "nextPersistenceTime will be 10 compared to 40.
1184:             * So we have:
1185:             * next(A) = 10
1186:             * next(B) = 40
1187:             * A: 8, 16, 24, ..., 80
1188:             * next(A) = 10 + 10 * 10 = 110
1189:             * next(B) = 40 (B goes next)
1190:             * B: 88, 96, ..., 160
1191:             * next(A) = 110 (A goes next)
1192:             * next(B) 40 + 400 = 440
1193:             * A: 168, 176, ..., 240
1194:             * next(A) = 110 + 10 * 10 = 210 (A continues)
1195:             * next(B) = 440
1196:             * A: 248, 256, ..., 320
1197:             * next(A) = 210 + 10 * 10 = 310 (A continues)
1198:             * next(B) = 440
1199:             * A: 328, 336, ..., 400
1200:             * next(A) = 310 + 10 * 10 = 410 (A continues)
1201:             * next(B) = 440
1202:             * A: 408, 416, ..., 480
1203:             * next(A) = 410 + 10 * 10 = 510
1204:             * next(B) = 440 (B goes next)
1205:             *
1206:             * This patterm will continue with A running 4 times as much as B.
1207:             * We observe that this leads to fairly long gaps between uses of
1208:             * the lower-rate plugin since it uses up its share of the snapshots
1209:             * fairly quickly and then waits a long time for its turn to come up
1210:             * again. This suggests that it might be wise to reduce the
1211:             * consolidation period of infrequent plugins.
1212:             */
1213:
1214:            private void selectNextPlugin() {
1215:                PersistencePluginInfo best = null;
1216:                for (Iterator i = plugins.values().iterator(); i.hasNext();) {
1217:                    PersistencePluginInfo ppio = (PersistencePluginInfo) i
1218:                            .next();
1219:                    if (best == null
1220:                            || ppio.nextPersistenceTime < best.nextPersistenceTime) {
1221:                        best = ppio;
1222:                    }
1223:                }
1224:                long adjustment = System.currentTimeMillis()
1225:                        - best.nextPersistenceTime;
1226:                if (Math.abs(adjustment) > 10000L) {
1227:                    for (Iterator i = plugins.values().iterator(); i.hasNext();) {
1228:                        PersistencePluginInfo ppio = (PersistencePluginInfo) i
1229:                                .next();
1230:                        ppio.nextPersistenceTime += adjustment;
1231:                    }
1232:                }
1233:                currentPersistPluginInfo = best;
1234:            }
1235:
1236:            /**
1237:             * Because PersistenceAssociations have WeakReference objects, the
1238:             * actual object in the association may be garbage collected if
1239:             * there are no other references to it. This list is used to hold a
1240:             * reference to such objects to avoid an NPE when it is not known
1241:             * that a reference exists. Only used during the first delta after
1242:             * rehydration.
1243:             */
1244:            private ArrayList objectsThatMightGoAway = new ArrayList();
1245:
1246:            private final static Object vmPersistLock = new Object();
1247:
1248:            /**
1249:             * Process the data from all clients. Envelopes and such are put
1250:             * into the identityTable as PersistenceAssociations. The list of
1251:             * everything left over is stored in a Map indexed by client id.
1252:             */
1253:            private Map getClientData() {
1254:                Map data = new HashMap(clients.size());
1255:                for (Iterator i = clients.entrySet().iterator(); i.hasNext();) {
1256:                    Map.Entry entry = (Map.Entry) i.next();
1257:                    PersistenceClient client = (PersistenceClient) entry
1258:                            .getValue();
1259:                    PersistenceIdentity clientId = (PersistenceIdentity) entry
1260:                            .getKey();
1261:                    try {
1262:                        List clientData = client.getPersistenceData();
1263:                        List clientObjects = new ArrayList();
1264:                        if (logger.isDetailEnabled())
1265:                            logger.detail(clientId + " clientData: "
1266:                                    + clientData);
1267:                        data.put(clientId, clientObjects);
1268:                        for (int j = 0, m = clientData.size(); j < m; j++) {
1269:                            Object o = clientData.get(j);
1270:                            if (o instanceof  Envelope) {
1271:                                addEnvelope((Envelope) o, clientId);
1272:                            } else if (o instanceof  EnvelopeTuple) {
1273:                                addEnvelopeTuple((EnvelopeTuple) o, clientId);
1274:                            } else {
1275:                                clientObjects.add(o);
1276:                            }
1277:                        }
1278:                        clientData.clear(); // Allow gc
1279:                    } catch (Exception e) {
1280:                        logger.error("Exception in getPersistenceData("
1281:                                + client + ")", e);
1282:                    }
1283:                }
1284:                return data;
1285:            }
1286:
1287:            /**
1288:             * End a persistence epoch by generating a persistence delta.
1289:             */
1290:            PersistenceObject persist(boolean returnBytes, boolean full) {
1291:                if (isDummy && !returnBytes) {
1292:                    return null;
1293:                }
1294:                int deltaNumber = -1;
1295:                long startCPU = 0L;
1296:                //startCPU = CpuClock.cpuTimeMillis();
1297:                long startTime = System.currentTimeMillis();
1298:                if (logger.isInfoEnabled()) {
1299:                    logger.info("Persist started");
1300:                }
1301:                int bytesSerialized = 0;
1302:                full |= returnBytes; // Must be a full snapshot to return bytes
1303:                Throwable failed = null;
1304:                recomputeNextPersistenceTime = true;
1305:                PersistenceObject result = null; // Return value if wanted
1306:                synchronized (identityTable) {
1307:                    try {
1308:                        associationsToPersist.clear();
1309:                        objectsThatMightGoAway.clear();
1310:                        addExistingMarkedAssociations(identityTable.iterator());
1311:                        if (sequenceNumbers == null) {
1312:                            initSequenceNumbers();
1313:                        }
1314:                        if (previousPersistFailed) {
1315:                            full = true; // Don't trust the deltas, (try to) do a full
1316:                            previousPersistFailed = false;
1317:                        }
1318:                        // The following fixes an edge condition. The very first delta
1319:                        // (seqno = 0) is always a full delta whether we want it to be
1320:                        // or not because there are no previous ones. Setting full =
1321:                        // true removes any ambiguity about its fullness.
1322:                        if (sequenceNumbers.current == 0)
1323:                            full = true;
1324:                        // Every so often generate a full delta to consolidate and
1325:                        // prevent the number of deltas from increasing without bound.
1326:                        if (!full && currentPersistPluginInfo != null) {
1327:                            // Check if its time to consolidate this plugin's snapshots
1328:                            int consolidationPeriod = currentPersistPluginInfo.ppi
1329:                                    .getConsolidationPeriod();
1330:                            // nSnapshots is the number already generated
1331:                            int nSnapshots = sequenceNumbers.current
1332:                                    - sequenceNumbers.first;
1333:                            if (nSnapshots + 1 >= consolidationPeriod) {
1334:                                // The next snapshot needs to be full
1335:                                full = true; // Time for a full snapshot
1336:                            }
1337:                        }
1338:                        if (full || currentPersistPluginInfo == null) {
1339:                            if (currentPersistPluginInfo != null) {
1340:                                // Cleanup the existing since the full replaces them.
1341:                                // N.B., if there are multiple plugins, the cleanup will
1342:                                // not actually occur until the next time this plugin is
1343:                                // selected as the best and creates a full snapshot. This
1344:                                // is because we will select a new plugin below.
1345:                                currentPersistPluginInfo.cleanupSequenceNumbers = new SequenceNumbers(
1346:                                        sequenceNumbers.first + 1,
1347:                                        sequenceNumbers.current,
1348:                                        sequenceNumbers.timestamp);
1349:                            }
1350:                            sequenceNumbers.first = sequenceNumbers.current;
1351:                            selectNextPlugin();
1352:                        }
1353:                        if (!currentPersistPluginInfo.ppi.checkOwnership()) {
1354:                            return null; // We are dead. Don't persist
1355:                        }
1356:                        if (sequenceNumbers.current == sequenceNumbers.first)
1357:                            full = true;
1358:                        currentPersistPluginInfo.deltaCount++;
1359:                        this .full = full; // Global full flag for duration of persist
1360:                        // Now gather everything to persist from our clients. Side
1361:                        // effect updates identityTable and if !full, associationsToPersist.
1362:                        Map clientData = getClientData();
1363:                        if (full) {
1364:                            // If full dump, garbage collect unreferenced objects
1365:                            GC.gc();
1366:                            for (Iterator iter = identityTable.iterator(); iter
1367:                                    .hasNext();) {
1368:                                PersistenceAssociation pAssoc = (PersistenceAssociation) iter
1369:                                        .next();
1370:                                if (!pAssoc.isMarked()) {
1371:                                    Object object = pAssoc.getObject();
1372:                                    // it is just barely possible that another gc might have
1373:                                    // collected some additional objects so do a final check
1374:                                    if (object != null) {
1375:                                        // Prevent additional gc from scavenging the objects
1376:                                        // we are committed to persisting
1377:                                        addAssociationToPersist(pAssoc);
1378:                                    }
1379:                                }
1380:                            }
1381:                        }
1382:                        deltaNumber = beginTransaction();
1383:                        try {
1384:                            if (currentOutput == null && !returnBytes) {
1385:                                // Only doing dummy persistence
1386:                            } else {
1387:                                PersistenceOutputStream stream = new PersistenceOutputStream(
1388:                                        logger);
1389:                                PersistenceReference[][] referenceArrays;
1390:                                if (logger.isDetailEnabled()) {
1391:                                    writeHistoryHeader();
1392:                                }
1393:                                stream.setIdentityTable(identityTable);
1394:                                // One agent at a time to avoid inter-agent deadlock due to shared objects
1395:                                try {
1396:                                    if (logger.isInfoEnabled()) {
1397:                                        logger
1398:                                                .info("Obtaining JVM persist lock");
1399:                                    }
1400:                                    synchronized (vmPersistLock) {
1401:                                        if (logger.isInfoEnabled()) {
1402:                                            logger
1403:                                                    .info("Obtained JVM persist lock, serializing");
1404:                                        }
1405:                                        int nObjects = associationsToPersist
1406:                                                .size();
1407:                                        referenceArrays = new PersistenceReference[nObjects][];
1408:                                        for (int i = 0; i < nObjects; i++) {
1409:                                            PersistenceAssociation pAssoc = (PersistenceAssociation) associationsToPersist
1410:                                                    .get(i);
1411:                                            if (logger.isDetailEnabled()) {
1412:                                                logger.detail("Persisting "
1413:                                                        + pAssoc);
1414:                                            }
1415:                                            referenceArrays[i] = stream
1416:                                                    .writeAssociation(pAssoc);
1417:                                        }
1418:                                        stream.writeObject(clientData);
1419:                                        bytesSerialized = stream.size();
1420:                                        if (logger.isInfoEnabled()) {
1421:                                            logger
1422:                                                    .info("Serialized "
1423:                                                            + bytesSerialized
1424:                                                            + " bytes to buffer, releasing lock");
1425:                                        }
1426:                                    } // Ok to let other agents persist while we write out our data
1427:                                } finally {
1428:                                    stream.close();
1429:                                } // End of stream protection try-catch
1430:                                if (returnBytes) {
1431:                                    int estimatedSize = (int) (1.2 * bytesSerialized);
1432:                                    LinkedByteOutputStream returnByteStream = new LinkedByteOutputStream(
1433:                                            estimatedSize);
1434:                                    ObjectOutputStream returnOutput = new ObjectOutputStream(
1435:                                            returnByteStream);
1436:                                    writeFinalOutput(returnOutput,
1437:                                            referenceArrays, stream);
1438:                                    returnOutput.close();
1439:                                    result = new PersistenceObject(
1440:                                            "Persistence state "
1441:                                                    + sequenceNumbers.current,
1442:                                            returnByteStream.toByteArray());
1443:                                    if (logger.isInfoEnabled()) {
1444:                                        logger
1445:                                                .info("Copied persistence snapshot to memory buffer"
1446:                                                        + " for return to state-capture caller");
1447:                                    }
1448:                                }
1449:                                if (currentOutput != null) {
1450:                                    writeFinalOutput(currentOutput,
1451:                                            referenceArrays, stream);
1452:                                    currentOutput.close();
1453:                                    if (logger.isInfoEnabled()) {
1454:                                        logger
1455:                                                .info("Wrote persistence snapshot to output stream");
1456:                                    }
1457:                                }
1458:                            } // End of non-dummy persistence
1459:                            clearMarks(associationsToPersist.iterator());
1460:                            commitTransaction();
1461:                            logger.printDot("P");
1462:                            // Cleanup old deltas and archived snapshots. N.B. The
1463:                            // cleanup is happening to the plugin that was just used.
1464:                            // When there are several plugins, this is usually different
1465:                            // from the plugin whose cleanupSequenceNumbers were set
1466:                            // above. This cleanup has been pending while the various
1467:                            // other plugins have been in use. This is _ok_! The
1468:                            // snapshot we just took is invariably a full snapshot.
1469:                            if (currentPersistPluginInfo.cleanupSequenceNumbers != null) {
1470:                                if (logger.isInfoEnabled()) {
1471:                                    logger
1472:                                            .info("Consolidated deltas "
1473:                                                    + currentPersistPluginInfo.cleanupSequenceNumbers);
1474:                                }
1475:                                currentPersistPluginInfo.ppi
1476:                                        .cleanupOldDeltas(currentPersistPluginInfo.cleanupSequenceNumbers);
1477:                                currentPersistPluginInfo.ppi.cleanupArchive();
1478:                                currentPersistPluginInfo.cleanupSequenceNumbers = null;
1479:                            }
1480:                        } catch (Exception e) { // Transaction protection
1481:                            rollbackTransaction();
1482:                            if (logger.isErrorEnabled()) {
1483:                                logger.error("Persist failed", e);
1484:                            }
1485:                            logger.printDot("X");
1486:                            throw e;
1487:                        }
1488:                        objectsThatMightGoAway.clear();
1489:                    } catch (Exception e) {
1490:                        failed = e;
1491:                        logger.error("Error writing persistence snapshot", e);
1492:                    } finally {
1493:                        if (isDummy) {
1494:                            identityTable.clear(); // Perform garbage collection
1495:                        }
1496:                    }
1497:                    // set persist time to persist completion + epsilon
1498:                    previousPersistenceTime = System.currentTimeMillis();
1499:                    // Note currentPersistPluginInfo.nextPersistenceTime is _not_
1500:                    // relative to the current time; it is relative to the other
1501:                    // persistence plugins and is occasionally adjusted when it
1502:                    // drifts too far from real time.
1503:                    currentPersistPluginInfo.nextPersistenceTime += currentPersistPluginInfo.ppi
1504:                            .getPersistenceInterval();
1505:                }
1506:                //long finishCPU = CpuClock.cpuTimeMillis();
1507:                long finishCPU = 0l;
1508:                long finishTime = System.currentTimeMillis();
1509:                PersistenceMetricImpl metric = new PersistenceMetricImpl(
1510:                        formatDeltaNumber(deltaNumber), startTime, finishTime,
1511:                        finishCPU - startCPU, bytesSerialized, full, failed,
1512:                        currentPersistPluginInfo.ppi);
1513:                metricsService.addMetric(metric);
1514:                if (logger.isInfoEnabled()) {
1515:                    logger.info(metric.toString());
1516:                }
1517:                return result;
1518:            }
1519:
1520:            private void writeFinalOutput(ObjectOutputStream s,
1521:                    PersistenceReference[][] referenceArrays,
1522:                    PersistenceOutputStream stream) throws IOException {
1523:                s.writeInt(identityTable.getNextId());
1524:                s.writeInt(referenceArrays.length);
1525:                for (int i = 0; i < referenceArrays.length; i++) {
1526:                    s.writeObject(referenceArrays[i]);
1527:                }
1528:                stream.writeBytes(s);
1529:                s.flush();
1530:            }
1531:
1532:            /** The format of timestamps in the log */
1533:            private static DateFormat logTimeFormat = new SimpleDateFormat(
1534:                    "yyyy/MM/dd HH:mm:ss.SSS");
1535:
1536:            private static DecimalFormat deltaFormat = new DecimalFormat(
1537:                    "_00000");
1538:
1539:            public static String formatDeltaNumber(int deltaNumber) {
1540:                return deltaFormat.format(deltaNumber);
1541:            }
1542:
1543:            private void writeHistoryHeader() {
1544:                if (logger.isDetailEnabled()) {
1545:                    logger.detail(logTimeFormat.format(new Date(System
1546:                            .currentTimeMillis())));
1547:                }
1548:            }
1549:
1550:            void printIdentityTable(String id) {
1551:                logger.detail("IdentityTable begins");
1552:                for (Iterator iter = identityTable.iterator(); iter.hasNext();) {
1553:                    PersistenceAssociation pAssoc = (PersistenceAssociation) iter
1554:                            .next();
1555:                    logger.detail(id + pAssoc);
1556:                }
1557:                logger.detail("IdentityTable ends");
1558:            }
1559:
1560:            public Logger getLogger() {
1561:                return logger;
1562:            }
1563:
1564:            static String getObjectName(Object o) {
1565:                return o.getClass().getName() + "@"
1566:                        + System.identityHashCode(o);
1567:            }
1568:
1569:            private int beginTransaction() throws IOException {
1570:                int deltaNumber = sequenceNumbers.current;
1571:                OutputStream os = currentPersistPluginInfo.ppi
1572:                        .openOutputStream(deltaNumber, full);
1573:                if (os != null) {
1574:                    DataProtectionService dataProtectionService = getDataProtectionService();
1575:                    if (dataProtectionService != null) {
1576:                        PersistenceKeyEnvelope keyEnvelope = new PersistenceKeyEnvelope(
1577:                                currentPersistPluginInfo.ppi, deltaNumber);
1578:                        os = dataProtectionService.getOutputStream(keyEnvelope,
1579:                                os);
1580:                    }
1581:                    currentOutput = new ObjectOutputStream(os);
1582:                } else {
1583:                    currentOutput = null;
1584:                }
1585:                return deltaNumber;
1586:            }
1587:
1588:            private void rollbackTransaction() {
1589:                currentPersistPluginInfo.ppi.abortOutputStream(sequenceNumbers);
1590:                currentOutput = null;
1591:            }
1592:
1593:            private void commitTransaction() throws PersistenceException {
1594:                currentPersistPluginInfo.ppi.lockOwnership();
1595:                sequenceNumbers.current += 1;
1596:                currentPersistPluginInfo.ppi.finishOutputStream(
1597:                        sequenceNumbers, full);
1598:                currentOutput = null;
1599:                currentPersistPluginInfo.ppi.unlockOwnership();
1600:            }
1601:
1602:            public java.sql.Connection getDatabaseConnection(Object locker) {
1603:                return currentPersistPluginInfo.ppi
1604:                        .getDatabaseConnection(locker);
1605:            }
1606:
1607:            public void releaseDatabaseConnection(Object locker) {
1608:                currentPersistPluginInfo.ppi.releaseDatabaseConnection(locker);
1609:            }
1610:
1611:            private class PersistenceServiceImpl implements  PersistenceService {
1612:                PersistenceIdentity clientId;
1613:
1614:                PersistenceServiceImpl(PersistenceIdentity clientId) {
1615:                    this .clientId = clientId;
1616:                }
1617:
1618:                public RehydrationData getRehydrationData() {
1619:                    return PersistenceServiceComponent.this 
1620:                            .getRehydrationData(clientId);
1621:                }
1622:            }
1623:
1624:            private class PersistenceServiceForAgentImpl extends
1625:                    PersistenceServiceImpl implements 
1626:                    PersistenceServiceForAgent {
1627:                PersistenceServiceForAgentImpl(PersistenceIdentity clientId) {
1628:                    super (clientId);
1629:                }
1630:
1631:                public void rehydrate(PersistenceObject pObject) {
1632:                    PersistenceServiceComponent.this .rehydrate(pObject);
1633:                }
1634:
1635:                public void suspend() {
1636:                }
1637:            }
1638:
1639:            private class PersistenceServiceForBlackboardImpl extends
1640:                    PersistenceServiceImpl implements 
1641:                    PersistenceServiceForBlackboard {
1642:                PersistenceServiceForBlackboardImpl(PersistenceIdentity clientId) {
1643:                    super (clientId);
1644:                }
1645:
1646:                public PersistenceObject persist(boolean returnBytes,
1647:                        boolean full) {
1648:                    return PersistenceServiceComponent.this .persist(
1649:                            returnBytes, full);
1650:                }
1651:
1652:                public java.sql.Connection getDatabaseConnection(Object locker) {
1653:                    return PersistenceServiceComponent.this 
1654:                            .getDatabaseConnection(locker);
1655:                }
1656:
1657:                public void releaseDatabaseConnection(Object locker) {
1658:                    PersistenceServiceComponent.this 
1659:                            .releaseDatabaseConnection(locker);
1660:                }
1661:
1662:                public boolean isDummyPersistence() {
1663:                    return PersistenceServiceComponent.this 
1664:                            .isDummyPersistence();
1665:                }
1666:
1667:                public long getPersistenceTime() {
1668:                    return PersistenceServiceComponent.this 
1669:                            .getPersistenceTime();
1670:                }
1671:            }
1672:
1673:            ServiceProvider serviceProvider = new ServiceProvider() {
1674:                public Object getService(ServiceBroker sb, Object requestor,
1675:                        Class cls) {
1676:                    if (cls == PersistenceControlService.class) {
1677:                        return new PersistenceControlServiceImpl();
1678:                    }
1679:                    if (cls == PersistenceMetricsService.class) {
1680:                        return metricsService;
1681:                    }
1682:                    if (cls == PersistenceService.class
1683:                            || cls == PersistenceServiceForBlackboard.class
1684:                            || cls == PersistenceServiceForAgent.class) {
1685:                        if (requestor instanceof  PersistenceClient) {
1686:                            PersistenceClient client = (PersistenceClient) requestor;
1687:                            PersistenceIdentity clientId = client
1688:                                    .getPersistenceIdentity();
1689:                            clients.put(clientId, client);
1690:                            if (cls == PersistenceService.class) {
1691:                                return new PersistenceServiceImpl(clientId);
1692:                            } else if (cls == PersistenceServiceForBlackboard.class) {
1693:                                return new PersistenceServiceForBlackboardImpl(
1694:                                        clientId);
1695:                            } else {
1696:                                return new PersistenceServiceForAgentImpl(
1697:                                        clientId);
1698:                            }
1699:                        } else {
1700:                            throw new IllegalArgumentException(
1701:                                    "PersistenceService requestor must be a PersistenceClient");
1702:                        }
1703:                    }
1704:                    throw new IllegalArgumentException("Unknown service class");
1705:                }
1706:
1707:                public void releaseService(ServiceBroker sb, Object requestor,
1708:                        Class cls, Object svc) {
1709:                    if (cls == PersistenceControlService.class) {
1710:                        return;
1711:                    }
1712:                    if (cls == PersistenceMetricsService.class) {
1713:                        return;
1714:                    }
1715:                    if (cls == PersistenceServiceForBlackboard.class
1716:                            || cls == PersistenceService.class
1717:                            || cls == PersistenceServiceForAgent.class) {
1718:                        if (svc instanceof  PersistenceServiceImpl) {
1719:                            PersistenceServiceImpl impl = (PersistenceServiceImpl) svc;
1720:                            clients.remove(impl.clientId);
1721:                        }
1722:                        return;
1723:                    }
1724:                    throw new IllegalArgumentException("Unknown service class");
1725:                }
1726:            };
1727:
1728:            // More Persistence implementation
1729:            private void registerServices(ServiceBroker sb) {
1730:                sb.addService(PersistenceMetricsService.class, serviceProvider);
1731:                sb.addService(PersistenceControlService.class, serviceProvider);
1732:                sb.addService(PersistenceService.class, serviceProvider);
1733:                sb.addService(PersistenceServiceForBlackboard.class,
1734:                        serviceProvider);
1735:                sb
1736:                        .addService(PersistenceServiceForAgent.class,
1737:                                serviceProvider);
1738:            }
1739:
1740:            private void unregisterServices(ServiceBroker sb) {
1741:                sb.revokeService(PersistenceControlService.class,
1742:                        serviceProvider);
1743:                sb.revokeService(PersistenceMetricsService.class,
1744:                        serviceProvider);
1745:                sb.revokeService(PersistenceService.class, serviceProvider);
1746:                sb.revokeService(PersistenceServiceForBlackboard.class,
1747:                        serviceProvider);
1748:                sb.revokeService(PersistenceServiceForAgent.class,
1749:                        serviceProvider);
1750:            }
1751:
1752:            public String toString() {
1753:                return "Persist(" + getAgentName() + ")";
1754:            }
1755:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.