0001: /**
0002: * Sequoia: Database clustering technology.
0003: * Copyright (C) 2002-2004 French National Institute For Research In Computer
0004: * Science And Control (INRIA).
0005: * Copyright (C) 2005 AmicoSoft, Inc. dba Emic Networks
0006: * Contact: sequoia@continuent.org
0007: *
0008: * Licensed under the Apache License, Version 2.0 (the "License");
0009: * you may not use this file except in compliance with the License.
0010: * You may obtain a copy of the License at
0011: *
0012: * http://www.apache.org/licenses/LICENSE-2.0
0013: *
0014: * Unless required by applicable law or agreed to in writing, software
0015: * distributed under the License is distributed on an "AS IS" BASIS,
0016: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0017: * See the License for the specific language governing permissions and
0018: * limitations under the License.
0019: *
0020: * Initial developer(s): Emmanuel Cecchet.
0021: * Contributor(s): Julie Marguerite, Sara Bouchenak, Nicolas Modrzyk.
0022: */package org.continuent.sequoia.controller.cache.result;
0023:
0024: import java.util.ArrayList;
0025: import java.util.HashMap;
0026: import java.util.HashSet;
0027: import java.util.Iterator;
0028:
0029: import org.continuent.sequoia.common.i18n.Translate;
0030: import org.continuent.sequoia.common.protocol.Field;
0031: import org.continuent.sequoia.common.sql.schema.DatabaseSchema;
0032: import org.continuent.sequoia.common.xml.DatabasesXmlTags;
0033: import org.continuent.sequoia.controller.backend.result.ControllerResultSet;
0034: import org.continuent.sequoia.controller.cache.CacheException;
0035: import org.continuent.sequoia.controller.cache.CacheStatistics;
0036: import org.continuent.sequoia.controller.cache.result.entries.AbstractResultCacheEntry;
0037: import org.continuent.sequoia.controller.cache.result.entries.ResultCacheEntryEager;
0038: import org.continuent.sequoia.controller.cache.result.entries.ResultCacheEntryNoCache;
0039: import org.continuent.sequoia.controller.cache.result.entries.ResultCacheEntryRelaxed;
0040: import org.continuent.sequoia.controller.cache.result.schema.CacheDatabaseSchema;
0041: import org.continuent.sequoia.controller.cache.result.schema.CacheDatabaseTable;
0042: import org.continuent.sequoia.controller.cache.result.threads.EagerCacheThread;
0043: import org.continuent.sequoia.controller.cache.result.threads.RelaxedCacheThread;
0044: import org.continuent.sequoia.controller.requests.AbstractWriteRequest;
0045: import org.continuent.sequoia.controller.requests.CreateRequest;
0046: import org.continuent.sequoia.controller.requests.ParsingGranularities;
0047: import org.continuent.sequoia.controller.requests.RequestType;
0048: import org.continuent.sequoia.controller.requests.SelectRequest;
0049: import org.continuent.sequoia.controller.requests.UpdateRequest;
0050:
0051: /**
0052: * This is a query cache implementation with tunable granularity. <br>
0053: * Cache invalidation granularity can take on of the following values:
0054: * <ul>
0055: * <li><code>NO_INVALIDATE</code>: no invalidation, the cache is
0056: * inconsistent and this should just be used to determine hit ratio upper bound.
0057: * </li>
0058: * <li><code>DATABASE</code>: the cache is flushed each time the database is
0059: * updated (every INSERT, UPDATE, DELETE, ... statement).</li>
0060: * <li><code>TABLE</code>: table granularity, entries in the cache are
0061: * invalidated based on table dependencies.</li>
0062: * <li><code>COLUMN</code>: column granularity, entries in the cache are
0063: * invalidated based on column dependencies</li>
0064: * <li><code>COLUMN_UNIQUE</code>: same as <code>COLUMN</code> except that
0065: * <code>UNIQUE</code> queries that selects a single row based on a key are
0066: * invalidated only when needed.
0067: * </ul>
0068: *
0069: * @author <a href="mailto:Emmanuel.Cecchet@inria.fr">Emmanuel Cecchet </a>
0070: * @author <a href="mailto:Julie.Marguerite@inria.fr">Julie Marguerite </a>
0071: * @author <a href="mailto:Sara.Bouchenak@epfl.ch">Sara Bouchenak </a>
0072: * @author <a href="mailto:Nicolas.Modrzyk@inrialpes.fr">Nicolas Modrzyk </a>
0073: * @version 1.0
0074: */
0075: public abstract class ResultCache extends AbstractResultCache {
0076: //
0077: // How the code is organized?
0078: //
0079: // 1. Member variables
0080: // 2. Constructor
0081: // 3. Cache management
0082: // 4. Transaction management
0083: // 5. Debug/Monitoring
0084: //
0085:
0086: // Max number of cache entries
0087: private int maxEntries;
0088: /** Pending query timeout in ms. Default is: 0 (wait forever). */
0089: private long pendingQueryTimeout = 0;
0090: // queries: SQL -> AbstractResultCacheEntry
0091: private HashMap queries;
0092: // Pending SQL requests (String)
0093: private HashSet pendingQueries;
0094: // The rules to apply for this cache
0095: private HashSet cachingRules;
0096: private ResultCacheRule defaultRule;
0097: private ArrayList relaxedCache;
0098:
0099: // LRU (head) of cache entries for replacement
0100: private AbstractResultCacheEntry lruHead;
0101: // LRU (tail) of cache entries for replacement
0102: private AbstractResultCacheEntry lruTail;
0103:
0104: // Database schema
0105: protected CacheDatabaseSchema cdbs;
0106:
0107: private CacheStatistics stats;
0108:
0109: private RelaxedCacheThread relaxedThread;
0110: private static final boolean[] TRUE_TRUE = new boolean[] { true,
0111: true };
0112: private boolean flushingCache;
0113: private EagerCacheThread eagerThread;
0114: private ArrayList eagerCache;
0115:
0116: /*
0117: * Constructor
0118: */
0119:
0120: /**
0121: * Creates a new <code>Cache</code> instance.
0122: *
0123: * @param maxEntries maximum number of cache entries
0124: * @param pendingTimeout pending queries timeout
0125: */
0126: public ResultCache(int maxEntries, int pendingTimeout) {
0127: this .maxEntries = maxEntries;
0128: this .pendingQueryTimeout = pendingTimeout;
0129: cdbs = null;
0130: stats = new CacheStatistics();
0131: queries = new HashMap(1000, (float) 0.75);
0132: pendingQueries = new HashSet();
0133: cachingRules = new HashSet();
0134: relaxedCache = new ArrayList();
0135: eagerCache = new ArrayList();
0136: lruHead = null;
0137: lruTail = null;
0138: defaultRule = null;
0139: relaxedThread = new RelaxedCacheThread(this );
0140: relaxedThread.setPriority(9);
0141: relaxedThread.start();
0142: eagerThread = new EagerCacheThread(this );
0143: eagerThread.setPriority(9);
0144: eagerThread.start();
0145: }
0146:
0147: /**
0148: * Shutdown the result cache and all its threads.
0149: */
0150: public synchronized void shutdown() {
0151: relaxedThread.shutdown();
0152: eagerThread.shutdown();
0153: }
0154:
0155: /**
0156: * Returns the pending query timeout in seconds.
0157: *
0158: * @return the pending query timeout.
0159: * @see #setPendingQueryTimeout
0160: */
0161: public int getPendingQueryTimeout() {
0162: return (int) (pendingQueryTimeout / 1000);
0163: }
0164:
0165: /**
0166: * Sets the pending query timeout in seconds.
0167: *
0168: * @param pendingQueryTimeout the pending query timeout to set.
0169: * @see #getPendingQueryTimeout
0170: */
0171: public void setPendingQueryTimeout(int pendingQueryTimeout) {
0172: this .pendingQueryTimeout = pendingQueryTimeout * 1000L;
0173: }
0174:
0175: /**
0176: * Possibly we want to access the queries in the cache for timing purposes
0177: *
0178: * @return the <code>HashMap</code> of queries (not synchronized)
0179: */
0180: public HashMap getQueries() {
0181: return this .queries;
0182: }
0183:
0184: /**
0185: * Sets the <code>DatabaseSchema</code> of the current virtual database.
0186: *
0187: * @param dbs a <code>DatabaseSchema</code> value
0188: * @see org.continuent.sequoia.controller.cache.result.schema.CacheDatabaseSchema
0189: */
0190: public void setDatabaseSchema(DatabaseSchema dbs) {
0191: if (cdbs == null) {
0192: logger.info(Translate
0193: .get("resultcache.setting.database.schema"));
0194: cdbs = new CacheDatabaseSchema(dbs);
0195: } else { // Schema is updated, compute the diff !
0196: CacheDatabaseSchema newSchema = new CacheDatabaseSchema(dbs);
0197: ArrayList tables = cdbs.getTables();
0198: ArrayList newTables = newSchema.getTables();
0199: if (newTables == null) { // New schema is empty (no backend is active anymore)
0200: logger.info(Translate
0201: .get("resultcache.flusing.whole.cache"));
0202: flushCache();
0203: cdbs = null;
0204: return;
0205: }
0206:
0207: // Remove extra-tables
0208: for (int i = 0; i < tables.size(); i++) {
0209: CacheDatabaseTable t = (CacheDatabaseTable) tables
0210: .get(i);
0211: if (!newSchema.hasTable(t.getName())) {
0212: t.invalidateAll();
0213: cdbs.removeTable(t);
0214: if (logger.isInfoEnabled())
0215: logger.info(Translate.get(
0216: "resultcache.removing.table", t
0217: .getName()));
0218: }
0219: }
0220:
0221: // Add missing tables
0222: int size = newTables.size();
0223: for (int i = 0; i < size; i++) {
0224: CacheDatabaseTable t = (CacheDatabaseTable) newTables
0225: .get(i);
0226: if (!cdbs.hasTable(t.getName())) {
0227: cdbs.addTable(t);
0228: if (logger.isInfoEnabled())
0229: logger.info(Translate
0230: .get("resultcache.adding.table", t
0231: .getName()));
0232: }
0233: }
0234: }
0235: }
0236:
0237: /**
0238: * Merge the given <code>DatabaseSchema</code> with the current one.
0239: *
0240: * @param dbs a <code>DatabaseSchema</code> value
0241: * @see org.continuent.sequoia.controller.cache.result.schema.CacheDatabaseSchema
0242: */
0243: public void mergeDatabaseSchema(DatabaseSchema dbs) {
0244: try {
0245: logger.info(Translate
0246: .get("resultcache.merging.new.database.schema"));
0247: cdbs.mergeSchema(new CacheDatabaseSchema(dbs));
0248: } catch (Exception e) {
0249: logger.error(Translate.get(
0250: "resultcache.error.while.merging", e));
0251: }
0252: }
0253:
0254: /**
0255: * Add a rule for this <code>ResultCache</code>
0256: *
0257: * @param rule that contains information on the action to perform for a
0258: * specific query
0259: */
0260: public void addCachingRule(ResultCacheRule rule) {
0261: cachingRules.add(rule);
0262: }
0263:
0264: /**
0265: * @see org.continuent.sequoia.controller.cache.result.AbstractResultCache#getDefaultRule()
0266: */
0267: public ResultCacheRule getDefaultRule() {
0268: return defaultRule;
0269: }
0270:
0271: /**
0272: * @see org.continuent.sequoia.controller.cache.result.AbstractResultCache#setDefaultRule(ResultCacheRule)
0273: */
0274: public void setDefaultRule(ResultCacheRule defaultRule) {
0275: this .defaultRule = defaultRule;
0276: }
0277:
0278: /**
0279: * Finds the behavior of the cache with the given query skeleton. If the query
0280: * match a pattern of a rule then we get the associated action for this,
0281: * otherwise we look for the default behavior.
0282: *
0283: * @param request to get action for
0284: * @return the <code>CacheBehavior</code> associated for this query.
0285: */
0286: private CacheBehavior getCacheBehavior(SelectRequest request) {
0287: CacheBehavior behavior = null;
0288: for (Iterator iter = cachingRules.iterator(); iter.hasNext();) {
0289: behavior = ((ResultCacheRule) iter.next()).matches(request);
0290: if (behavior != null) {
0291: break;
0292: }
0293: }
0294: if (behavior == null)
0295: behavior = defaultRule.getCacheBehavior();
0296: if (logger.isDebugEnabled())
0297: logger.debug(Translate.get(
0298: "resultcache.behavior.for.request",
0299: new String[] { request.getUniqueKey(),
0300: behavior.getType() }));
0301: return behavior;
0302: }
0303:
0304: /*
0305: * Cache Management
0306: */
0307:
0308: /**
0309: * Creates a unique cache entry key from the given request. The key is
0310: * currently composed of the login name and the request SQL statement.
0311: *
0312: * @param request the request to generate the key from
0313: * @return a unique cache key for this request
0314: */
0315: private String getCacheKeyFromRequest(SelectRequest request) {
0316: return request.getLogin() + "," + request.getUniqueKey();
0317: }
0318:
0319: /**
0320: * Do we need invalidation after an update request, given a
0321: * ControllerResultSet. Note that this method is meant to be used with unique
0322: * queries where the ControllerResultSet is the result of a pk selection (like
0323: * an Entity Bean).
0324: *
0325: * @param result that could be in the cache
0326: * @param request the update we want to get updated values from
0327: * @return boolean[] {needInvalidate,needToSendQuery}
0328: */
0329: public boolean[] needInvalidate(ControllerResultSet result,
0330: UpdateRequest request) {
0331: HashMap updatedValues = request.getUpdatedValues();
0332: boolean needInvalidate = false;
0333: boolean needToSendQuery = false;
0334: String value;
0335: String columnName;
0336: try {
0337: // If we don't have exactly one row, we don't handle the optimization
0338: if ((result == null) || (result.getData() == null)
0339: || (result.getData().size() != 1))
0340: return TRUE_TRUE;
0341: } catch (Exception e) {
0342: return TRUE_TRUE;
0343: }
0344: Field[] fields = result.getFields();
0345: ArrayList data = result.getData();
0346: int size = fields.length;
0347: for (Iterator iter = updatedValues.keySet().iterator(); iter
0348: .hasNext();) {
0349: columnName = (String) iter.next();
0350: value = (String) updatedValues.get(columnName);
0351: for (int i = 0; i < size; i++) { // Find the corresponding column in the ResultSet by comparing column
0352: // names
0353:
0354: // We can have something like:
0355: // FIRSTNAME and ADDRESS.FIRSTNAME
0356: if (columnName.equals(fields[i].getFieldName())) {
0357: Object o = ((Object[]) data.get(0))[i];
0358: if (!value.equals(o)) {
0359: // The value from the cache entry is different we need to update
0360: // the
0361: // cache and the database
0362: return TRUE_TRUE;
0363: } else
0364: break;
0365: }
0366: }
0367: // We don't need to invalidate the cache because the columns affected are
0368: // different but we need to send the query to the database.
0369: needToSendQuery = true;
0370: // We don't stop here though because other columns could be updated and
0371: // we
0372: // could need invalidation
0373: }
0374: return new boolean[] { needInvalidate, needToSendQuery };
0375: }
0376:
0377: /**
0378: * Adds an entry request/reply to the cache. Note that if the request was
0379: * already in the cache, only the result is updated.
0380: *
0381: * @param request the request
0382: * @param result the result corresponding to the request
0383: * @exception CacheException if an error occurs
0384: */
0385: public void addToCache(SelectRequest request,
0386: ControllerResultSet result) throws CacheException {
0387: boolean notifyThread = false;
0388:
0389: // Sanity checks
0390: if (request.getCacheAbility() == RequestType.UNCACHEABLE)
0391: throw new CacheException(Translate.get(
0392: "resultcache.uncacheable.request", request
0393: .getUniqueKey()));
0394:
0395: try {
0396: synchronized (pendingQueries) {
0397: // Remove the pending query from the list and wake up
0398: // all waiting queries
0399: removeFromPendingQueries(request);
0400:
0401: String sqlQuery = getCacheKeyFromRequest(request);
0402:
0403: if (result == null)
0404: throw new CacheException(Translate.get(
0405: "resultcache.null.result", sqlQuery));
0406:
0407: // Check against streamable ResultSets
0408: if (result.hasMoreData()) {
0409: logger.info(Translate.get(
0410: "resultcache.streamed.resultset", request
0411: .getSqlShortForm(20)));
0412: return;
0413: }
0414:
0415: if (logger.isDebugEnabled())
0416: logger.debug(Translate.get(
0417: "resultcache.adding.query", sqlQuery));
0418:
0419: AbstractResultCacheEntry ce;
0420: synchronized (queries) {
0421: // Check first that the query is not already in the cache
0422: ce = (AbstractResultCacheEntry) queries
0423: .get(sqlQuery);
0424: if (ce == null) {
0425: // Not in cache, add this entry
0426: // check the rule
0427: CacheBehavior behavior = getCacheBehavior(request);
0428: ce = behavior.getCacheEntry(request, result,
0429: this );
0430: if (ce instanceof ResultCacheEntryNoCache)
0431: return;
0432:
0433: // Test size of cache
0434: if (maxEntries > 0) {
0435: int size = queries.size();
0436: if (size >= maxEntries)
0437: // LRU replacement policy: Remove the oldest cache entry
0438: removeOldest();
0439: }
0440: // Add to the cache
0441: queries.put(sqlQuery, ce);
0442:
0443: notifyThread = true;
0444: } else { // Oh, oh, already in cache ...
0445: if (ce.isValid())
0446: logger
0447: .warn(Translate
0448: .get(
0449: "resultcache.modifying.result.valid.entry",
0450: sqlQuery));
0451: ce.setResult(result);
0452: }
0453:
0454: // Update LRU
0455: if (lruHead != null) {
0456: lruHead.setPrev(ce);
0457: ce.setNext(lruHead);
0458: ce.setPrev(null);
0459: }
0460: if (lruTail == null)
0461: lruTail = ce;
0462: lruHead = ce; // This is also fine if LRUHead == null
0463: }
0464: processAddToCache(ce);
0465:
0466: // process thread notification out of the synchronized block on
0467: // pending queries to avoid deadlock, while adding/removing
0468: // on cache
0469: if (notifyThread) {
0470: // relaxed entry
0471: if (ce instanceof ResultCacheEntryRelaxed) {
0472: ResultCacheEntryRelaxed qcer = (ResultCacheEntryRelaxed) ce;
0473: synchronized (relaxedThread) {
0474: relaxedCache.add(qcer);
0475: if (qcer.getDeadline() < relaxedThread
0476: .getThreadWakeUpTime()
0477: || relaxedThread
0478: .getThreadWakeUpTime() == 0) {
0479: relaxedThread.notify();
0480: }
0481: }
0482: } else if (ce instanceof ResultCacheEntryEager) {
0483: // eager entry
0484: ResultCacheEntryEager qcee = (ResultCacheEntryEager) ce;
0485: if (qcee.getDeadline() != AbstractResultCacheEntry.NO_DEADLINE) { // Only deal with entries that specify a timeout
0486: synchronized (eagerThread) {
0487: eagerCache.add(qcee);
0488: if (qcee.getDeadline() < eagerThread
0489: .getThreadWakeUpTime()
0490: || eagerThread
0491: .getThreadWakeUpTime() == 0) {
0492: eagerThread.notify();
0493: }
0494: }
0495: }
0496: }
0497: }
0498: }
0499: } catch (OutOfMemoryError oome) {
0500: flushCache();
0501: System.gc();
0502: logger.warn(Translate
0503: .get("cache.memory.error.cache.flushed", this
0504: .getClass()));
0505: }
0506: }
0507:
0508: /**
0509: * Process the add to cache to update implementation specific data structures.
0510: *
0511: * @param qe to add to the cache.
0512: */
0513: protected abstract void processAddToCache(
0514: AbstractResultCacheEntry qe);
0515:
0516: /**
0517: * Gets the result to the given request from the cache. The returned
0518: * <code>AbstractResultCacheEntry</code> is <code>null</code> if the
0519: * request is not present in the cache.
0520: * <p>
0521: * An invalid <code>AbstractResultCacheEntry</code> may be returned (it
0522: * means that the result is <code>null</code>) but the already parsed query
0523: * can be retrieved from the cache entry.
0524: *
0525: * @param request an SQL select request
0526: * @param addToPendingQueries <code>true</code> if the request must be added
0527: * to the pending query list on a cache miss
0528: * @return the <code>AbstractResultCacheEntry</code> if found, else
0529: * <code>null</code>
0530: */
0531: public AbstractResultCacheEntry getFromCache(SelectRequest request,
0532: boolean addToPendingQueries) {
0533: stats.addSelect();
0534:
0535: if (request.getCacheAbility() == RequestType.UNCACHEABLE) {
0536: stats.addUncacheable();
0537: return null;
0538: }
0539:
0540: String sqlQuery = getCacheKeyFromRequest(request);
0541:
0542: // Check if we have the same query pending
0543: synchronized (pendingQueries) {
0544: if (addToPendingQueries) {
0545: long timeout = pendingQueryTimeout;
0546: // Yes, wait for the result
0547: // As we use a single lock for all pending queries, we use a
0548: // while to re-check that this wake-up was for us!
0549: while (pendingQueries.contains(sqlQuery)) {
0550: try {
0551: if (logger.isDebugEnabled())
0552: logger
0553: .debug(Translate
0554: .get(
0555: "resultcache.waiting.pending.query",
0556: sqlQuery));
0557:
0558: if (timeout > 0) {
0559: long start = System.currentTimeMillis();
0560: pendingQueries.wait(pendingQueryTimeout);
0561: long end = System.currentTimeMillis();
0562: timeout = timeout - (end - start);
0563: if (timeout <= 0) {
0564: logger
0565: .warn(Translate
0566: .get("resultcache.pending.query.timeout"));
0567: break;
0568: }
0569: } else
0570: pendingQueries.wait();
0571: } catch (InterruptedException e) {
0572: logger
0573: .warn(Translate
0574: .get("resultcache.pending.query.timeout"));
0575: break;
0576: }
0577: }
0578: }
0579:
0580: // Check the cache
0581: AbstractResultCacheEntry ce;
0582: synchronized (queries) {
0583: ce = (AbstractResultCacheEntry) queries.get(sqlQuery);
0584: if (ce == null)
0585: // if ((ce == null) || !ce.isValid())
0586: { // Cache miss or dirty entry
0587: if (addToPendingQueries) {
0588: pendingQueries.add(sqlQuery);
0589: // Add this query to the pending queries
0590: if (logger.isDebugEnabled()) {
0591: logger.debug(Translate
0592: .get("resultcache.cache.miss"));
0593: logger
0594: .debug(Translate
0595: .get(
0596: "resultcache.adding.to.pending.queries",
0597: sqlQuery));
0598: }
0599: }
0600: return null;
0601: } else { // Cache hit (must update LRU)
0602: // Move cache entry to head of LRU
0603: AbstractResultCacheEntry before = ce.getPrev();
0604: if (before != null) {
0605: AbstractResultCacheEntry after = ce.getNext();
0606: before.setNext(after);
0607: if (after != null)
0608: after.setPrev(before);
0609: else
0610: // We were the tail, update the tail
0611: lruTail = before;
0612: ce.setNext(lruHead);
0613: ce.setPrev(null);
0614: if (lruHead != ce)
0615: lruHead.setPrev(ce);
0616: lruHead = ce;
0617: }
0618: // else it was already the LRU head
0619: }
0620: }
0621:
0622: if (ce.getResult() == null) {
0623: if (addToPendingQueries) {
0624: pendingQueries.add(sqlQuery);
0625: // Add this query to the pending queries
0626: if (logger.isDebugEnabled()) {
0627: logger.debug(Translate
0628: .get("resultcache.cache.miss"));
0629: logger
0630: .debug(Translate
0631: .get(
0632: "resultcache.adding.to.pending.queries",
0633: sqlQuery));
0634: }
0635: }
0636: if (ce.isValid() && logger.isInfoEnabled())
0637: logger.info(Translate.get(
0638: "resultcache.valid.entry.without.result",
0639: ce.getRequest().getUniqueKey()));
0640: } else {
0641: if (logger.isDebugEnabled())
0642: logger.debug(Translate.get("resultcache.cache.hit",
0643: sqlQuery));
0644: stats.addHits();
0645: }
0646:
0647: return ce;
0648: }
0649: }
0650:
0651: /**
0652: * Removes an entry from the cache (both request and reply are dropped). The
0653: * request is NOT removed from the pending query list, but it shouldn't be in
0654: * this list.
0655: *
0656: * @param request a <code>SelectRequest</code>
0657: */
0658: public void removeFromCache(SelectRequest request) {
0659: String sqlQuery = request.getUniqueKey();
0660:
0661: if (logger.isDebugEnabled())
0662: logger.debug("Removing from cache: " + sqlQuery);
0663:
0664: synchronized (queries) {
0665: // Remove from the cache
0666: AbstractResultCacheEntry ce = (AbstractResultCacheEntry) queries
0667: .remove(sqlQuery);
0668: if (ce == null)
0669: return; // Was not in the cache!
0670: else {
0671: // Update result set
0672: ce.setResult(null);
0673: // Update LRU
0674: AbstractResultCacheEntry before = ce.getPrev();
0675: AbstractResultCacheEntry after = ce.getNext();
0676: if (before != null) {
0677: before.setNext(after);
0678: if (after != null)
0679: after.setPrev(before);
0680: else
0681: // We were the tail, update the tail
0682: lruTail = before;
0683: } else { // We are the LRUHead
0684: lruHead = ce.getNext();
0685: if (after != null)
0686: after.setPrev(null);
0687: else
0688: // We were the tail, update the tail
0689: lruTail = before;
0690: }
0691: // Remove links to other cache entries for GC
0692: ce.setNext(null);
0693: ce.setPrev(null);
0694: }
0695: }
0696: }
0697:
0698: /**
0699: * Removes an entry from the pending query list.
0700: *
0701: * @param request a <code>SelectRequest</code>
0702: */
0703: public void removeFromPendingQueries(SelectRequest request) {
0704: String sqlQuery = getCacheKeyFromRequest(request);
0705:
0706: synchronized (pendingQueries) {
0707: // Remove the pending query from the list and wake up
0708: // all waiting queries
0709: if (pendingQueries.remove(sqlQuery)) {
0710: if (logger.isDebugEnabled())
0711: logger.debug(Translate.get(
0712: "resultcache.removing.pending.query",
0713: sqlQuery));
0714: pendingQueries.notifyAll();
0715: } else
0716: logger.warn(Translate.get(
0717: "resultcache.removing.pending.query.failed",
0718: sqlQuery));
0719: }
0720: }
0721:
0722: /**
0723: * @see org.continuent.sequoia.controller.cache.result.AbstractResultCache#isUpdateNecessary(org.continuent.sequoia.controller.requests.UpdateRequest)
0724: */
0725: public abstract boolean isUpdateNecessary(UpdateRequest request)
0726: throws CacheException;
0727:
0728: /**
0729: * Notifies the cache that this write request has been issued, so that cache
0730: * coherency can be maintained. If the cache is distributed, this method is
0731: * reponsible for broadcasting this information to other caches.
0732: *
0733: * @param request an <code>AbstractRequest</code> value
0734: * @exception CacheException if an error occurs
0735: */
0736: public void writeNotify(AbstractWriteRequest request)
0737: throws CacheException {
0738: // Update the stats
0739: if (request.isInsert())
0740: stats.addInsert();
0741: else if (request.isUpdate())
0742: stats.addUpdate();
0743: else if (request.isDelete())
0744: stats.addDelete();
0745: else if (request.isCreate()) {
0746: stats.addCreate();
0747: // Create: we only need to update the schema
0748: if (parsingGranularity != ParsingGranularities.NO_PARSING) {
0749: CreateRequest createRequest = (CreateRequest) request;
0750: if (createRequest.altersDatabaseSchema()
0751: && (createRequest.getDatabaseTable() != null))
0752: cdbs.addTable(new CacheDatabaseTable(createRequest
0753: .getDatabaseTable()));
0754: }
0755: return;
0756: } else if (request.isDrop()) {
0757: stats.addDrop();
0758: // Drop: we need to update the schema
0759: if (parsingGranularity != ParsingGranularities.NO_PARSING) {
0760: // Invalidate the cache entries associated with this table
0761: CacheDatabaseTable cdt = cdbs.getTable(request
0762: .getTableName());
0763: if (cdt != null) {
0764: cdt.invalidateAll();
0765: cdbs.removeTable(cdt);
0766: return;
0767: }
0768: // else: the table was not previously cached
0769: // (no previous 'select' requests on the table).
0770: }
0771: } else {
0772: stats.addUnknown();
0773: }
0774: if (logger.isDebugEnabled())
0775: logger.debug("Notifying write " + request.getUniqueKey());
0776:
0777: processWriteNotify(request);
0778: }
0779:
0780: /**
0781: * Implementation specific invalidation of the cache.
0782: *
0783: * @param request Write request that invalidates the cache.
0784: */
0785: protected abstract void processWriteNotify(
0786: AbstractWriteRequest request);
0787:
0788: /**
0789: * Removes all entries from the cache.
0790: */
0791: public void flushCache() {
0792: // Check if we are already flushing the cache
0793: synchronized (this ) {
0794: if (flushingCache)
0795: return;
0796: flushingCache = true;
0797: }
0798:
0799: try {
0800: synchronized (queries) { // Invalidate the whole cache until it is empty
0801: while (!queries.isEmpty()) {
0802: Iterator iter = queries.values().iterator();
0803: ((AbstractResultCacheEntry) iter.next())
0804: .invalidate();
0805: }
0806: }
0807:
0808: synchronized (pendingQueries) { // Clean pending queries to unblock everyone if some queries/backends
0809: // remained in an unstable state.
0810: pendingQueries.clear();
0811: pendingQueries.notifyAll();
0812: }
0813: } finally {
0814: synchronized (this ) {
0815: flushingCache = false;
0816: }
0817: if (logger.isDebugEnabled())
0818: logger
0819: .debug(Translate
0820: .get("resultcache.cache.flushed"));
0821: }
0822: }
0823:
0824: /**
0825: * Get Cache size
0826: *
0827: * @return the approximate size of the cache in bytes
0828: */
0829: public long getCacheSize() {
0830: // No need to synchronize, the implementation returns an int
0831: return queries.size();
0832: }
0833:
0834: /**
0835: * Removes the oldest entry from the cache.
0836: * <p>
0837: * <b>!Warning! </b> This method is not synchronized and should be called in
0838: * the scope of a synchronized(queries)
0839: */
0840: private void removeOldest() {
0841: if (lruTail == null)
0842: return;
0843: // Update the LRU
0844: AbstractResultCacheEntry oldce = lruTail;
0845: lruTail = lruTail.getPrev();
0846: if (lruTail != null)
0847: lruTail.setNext(null);
0848:
0849: if (logger.isDebugEnabled())
0850: logger.debug(Translate.get(
0851: "resultcache.removing.oldest.cache.entry", oldce
0852: .getRequest().getUniqueKey()));
0853:
0854: /*
0855: * We remove the query from the hashtable so that the garbage collector can
0856: * do its job. We need to remove the query from the queries HashTable first
0857: * in case we invalidate an eager cache entry that will call removeFromCache
0858: * (and will try to update the LRU is the entry is still in the queries
0859: * HashTable). So, to be compatible with all type of cache entries: 1.
0860: * queries.remove(ce) 2. ce.invalidate
0861: */
0862: queries.remove(oldce.getRequest().getUniqueKey());
0863:
0864: if (oldce.isValid()) {
0865: oldce.setResult(null);
0866: oldce.invalidate();
0867: }
0868:
0869: stats.addRemove();
0870: }
0871:
0872: /**
0873: * Gets the needed query parsing granularity.
0874: *
0875: * @return needed query parsing granularity
0876: */
0877: public int getParsingGranularity() {
0878: return this .parsingGranularity;
0879: }
0880:
0881: /**
0882: * Retrieve the name of this cache
0883: *
0884: * @return name
0885: */
0886: public abstract String getName();
0887:
0888: //
0889: // Transaction management
0890: //
0891:
0892: /**
0893: * Commit a transaction given its id.
0894: *
0895: * @param transactionId the transaction id
0896: * @throws CacheException if an error occurs
0897: */
0898: public void commit(long transactionId) throws CacheException {
0899: // Ok, the transaction has commited, nothing to do
0900: }
0901:
0902: /**
0903: * Rollback a transaction given its id.
0904: *
0905: * @param transactionId the transaction id
0906: * @throws CacheException if an error occurs
0907: */
0908: public void rollback(long transactionId) throws CacheException {
0909: logger.info(Translate.get(
0910: "resultcache.flushing.cache.cause.rollback",
0911: transactionId));
0912: flushCache();
0913: }
0914:
0915: /*
0916: * Debug/Monitoring
0917: */
0918:
0919: /**
0920: * @see org.continuent.sequoia.controller.cache.result.AbstractResultCache#getCacheData
0921: */
0922: public String[][] getCacheData() throws CacheException {
0923: try {
0924: synchronized (queries) {
0925: String[][] data = new String[queries.size()][];
0926: int count = 0;
0927: for (Iterator iter = queries.values().iterator(); iter
0928: .hasNext(); count++) {
0929: AbstractResultCacheEntry qe = (AbstractResultCacheEntry) iter
0930: .next();
0931: if (qe != null) {
0932: data[count] = qe.toStringTable();
0933: }
0934: }
0935: return data;
0936: }
0937: } catch (Exception e) {
0938: logger.error(Translate.get(
0939: "resultcache.error.retrieving.cache.data", e));
0940: throw new CacheException(e.getMessage());
0941: }
0942: }
0943:
0944: /**
0945: * @see org.continuent.sequoia.controller.cache.result.AbstractResultCache#getCacheStatsData()
0946: */
0947: public String[][] getCacheStatsData() throws CacheException {
0948: String[][] data = new String[1][];
0949: String[] stat = stats.getCacheStatsData();
0950: data[0] = new String[stat.length + 1];
0951: for (int i = 0; i < stat.length; i++)
0952: data[0][i] = stat[i];
0953: data[0][data[0].length - 1] = "" + queries.size();
0954: return data;
0955: }
0956:
0957: /**
0958: * @return Returns the stats.
0959: */
0960: public CacheStatistics getCacheStatistics() {
0961: return stats;
0962: }
0963:
0964: /**
0965: * Returns the eagerCache value.
0966: *
0967: * @return Returns the eagerCache.
0968: */
0969: public ArrayList getEagerCache() {
0970: return eagerCache;
0971: }
0972:
0973: /**
0974: * Returns the relaxedCache value.
0975: *
0976: * @return Returns the relaxedCache.
0977: */
0978: public ArrayList getRelaxedCache() {
0979: return relaxedCache;
0980: }
0981:
0982: /**
0983: * Gets information about the request cache
0984: *
0985: * @return <code>String</code> containing information
0986: */
0987: protected String getXmlImpl() {
0988: StringBuffer info = new StringBuffer();
0989: info.append("<" + DatabasesXmlTags.ELT_ResultCache + " "
0990: + DatabasesXmlTags.ATT_pendingTimeout + "=\""
0991: + pendingQueryTimeout + "\" "
0992: + DatabasesXmlTags.ATT_maxNbOfEntries + "=\""
0993: + maxEntries + "\" " + DatabasesXmlTags.ATT_granularity
0994: + "=\"" + getName() + "\">");
0995: info.append("<" + DatabasesXmlTags.ELT_DefaultResultCacheRule
0996: + " " + DatabasesXmlTags.ATT_timestampResolution
0997: + "=\"" + defaultRule.getTimestampResolution() / 1000
0998: + "\">");
0999: info.append(defaultRule.getCacheBehavior().getXml());
1000: info.append("</" + DatabasesXmlTags.ELT_DefaultResultCacheRule
1001: + ">");
1002: for (Iterator iter = cachingRules.iterator(); iter.hasNext();)
1003: info.append(((ResultCacheRule) iter.next()).getXml());
1004: info.append("</" + DatabasesXmlTags.ELT_ResultCache + ">");
1005: return info.toString();
1006: }
1007:
1008: }
|