001: /**
002: * Copyright (C) 2006 NetMind Consulting Bt.
003: *
004: * This library is free software; you can redistribute it and/or
005: * modify it under the terms of the GNU Lesser General Public
006: * License as published by the Free Software Foundation; either
007: * version 3 of the License, or (at your option) any later version.
008: *
009: * This library is distributed in the hope that it will be useful,
010: * but WITHOUT ANY WARRANTY; without even the implied warranty of
011: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
012: * Lesser General Public License for more details.
013: *
014: * You should have received a copy of the GNU Lesser General Public
015: * License along with this library; if not, write to the Free Software
016: * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
017: */package hu.netmind.persistence;
018:
019: import hu.netmind.persistence.parser.QueryStatement;
020: import org.apache.log4j.Logger;
021: import java.util.SortedSet;
022: import java.util.TreeSet;
023: import java.util.Map;
024: import java.util.HashMap;
025: import java.util.HashSet;
026: import java.util.Set;
027: import java.util.Iterator;
028: import java.util.ResourceBundle;
029:
030: /**
031: * This is an implementation of an intelligent, configurationless
032: * read-only cache with change detection.<br>
033: * The main design point is that it does not require any configuration
034: * from the user. It's task is to cache result sets up to a previously
035: * given deadline, and when that is reached, clear from cache. When the
036: * same result is referenced, the deadline may be moved further into the future.
037: * Memory management is dynamic. When a resultset arives into the cache,
038: * it is <strong>always</strong> cached, but if the cache detects, that
039: * there is "not enough memory" (see below) left, it may clear some entries before their
040: * their deadline is reached.<br>
041: * So basically one does not have to configure the size of the cache because
042: * it assumes that if a resultset was not recalled in a given timeframe,
043: * the overhead of selecting from database is acceptable (rather than
044: * always using a predetemined size for the cache, hoping to achieve more
045: * cache hits). Also, memory adapts to usage: When the load is low,
046: * it is more likely, that only a few resultsets are in the cache, because
047: * they expire, and are not likely to be hit anyway. But if the load rises,
048: * more and more results get into the cache, the likelyhood of a hit also
049: * rises, together with the memory allocation.<br>
050: * The cache determines whether there is enough memory by checking the
051: * raw bytes free, and also computes the ratio of allocated vs. free memory.
052: * If this ratio is below a given threshold, then there is enough memory. The
053: * theory is, that the Java VM will allocate more heap when this ratio
054: * is sufficiently small (usually around 60-70%). This leaves two cases:<br>
055: * <ul>
056: * <li>If the cache's ratio is less than the VM's, than the cache will
057: * not force the VM to allocate more space, which in turn means, that
058: * the cache will not grow, although the VM could allocate more memory.</li>
059: * <li>If this ratio is more than the VM's, than the cache will potentially
060: * force the VM to allocate new memory, potentially eating the memory
061: * away from more important tasks.</li>
062: * </ul>
063: * The cache uses the first non-agressive algorithm. The cache itself will not
064: * cause the VM to allocate more heap, but if the application uses more memory
065: * the cache will use proportionally more memory for it's own cause. Note:
066: * The VM tries to maintain free/used ratio between appr. 30-70%.<br>
067: * This cache specializes to store only current searches' result (rather than
068: * current <strong>and</strong> historical results). This is because in the
069: * case of current results, the cache can effectively compute the interval
070: * the result is valid. When determining whether a statement's result is in
071: * the cache, the cache searches all entries (which are all current), and if
072: * the statements serial is above or equals to the result's start serial
073: * (the serial of the first query which caused the entry to be created),
074: * then the result is valid for that query. If a query is received for which
075: * the result may depend on changes inside the transaction, which are not
076: * yet visible to the other transactions, then this query is not handled. This
077: * is mainly because handling transaction-dependent result sets would be
078: * a large overhead for the cache, with little benefit if at all.<br>
079: * In effect, cache hits will occur mostly, when the same non-historical
080: * query, for a common table (not frequently changed) is run multiple times
081: * in short period of time.
082: * @author Brautigam Robert
083: * @version Revision: $Revision$
084: */
085: public class ResultsCache {
086: private static Logger logger = Logger.getLogger(ResultsCache.class);
087:
088: private static int MIN_FREE_BYTES = 512 * 1024; // Min free memory in bytes
089: private static int MIN_FREE_RATE = 60; // Min free memory in percentage to total allocated
090: private static int FREE_RATE = 2; // How many entries to free for a single entry if needed
091: private static long EXPIRATION_INTERVAL = 1 * 60 * 1000; // Expiration in millis
092:
093: private SortedSet entriesByExpiration; // The cache entries sorted by expiration
094: private Map entriesByRepresentation; // Entries by statement representation
095: private Map entriesByTables; // Entries by table names
096: private Object cacheMutex = new Object(); // Mutex for cache
097: private Long startSerial; // The serial on which the cache started
098: private Map serialsByTables; // Last modification serials of tables
099: private StoreContext context;
100:
101: public ResultsCache(StoreContext context) {
102: clear();
103: this .context = context;
104: this .startSerial = null;
105: }
106:
107: private String getRepresentation(QueryStatement stmt, Limits limits) {
108: if (limits != null)
109: return stmt.getStaticRepresentation() + limits.toString();
110: else
111: return stmt.getStaticRepresentation();
112: }
113:
114: /**
115: * Get an entry from the cache.
116: * @param stmt The statement to look for.
117: * @param limits The limits of the query.
118: * @return A SearchResult object if the query was cached, null otherwise.
119: */
120: public SearchResult getEntry(QueryStatement stmt, Limits limits) {
121: // Check whether entry was modified in the same transaction. Only
122: // those results are cached, which are global.
123: if (stmt.getTimeControl().isApplyTransaction())
124: return null;
125: // Get entry
126: String rep = getRepresentation(stmt, limits);
127: if (logger.isDebugEnabled())
128: logger.debug("searching in cache for: " + rep
129: + ", entries: " + entriesByExpiration.size());
130: if ((rep == null) || ("".equals(rep)))
131: return null;
132: CacheEntry entry = null;
133: synchronized (cacheMutex) {
134: entry = (CacheEntry) entriesByRepresentation.get(rep);
135: }
136: if (entry == null)
137: return null;
138: // Check whether query is after result became active
139: if (entry.startSerial > stmt.getTimeControl().getSerial()
140: .longValue())
141: return null;
142: // All OK, result is valid set statistics
143: synchronized (cacheMutex) {
144: entriesByExpiration.remove(entry); // Remove, because it will be re-ordered
145: entry.accessCount++;
146: entry.lastAccess = System.currentTimeMillis();
147: entry.expiration += EXPIRATION_INTERVAL;
148: entriesByExpiration.add(entry);
149: }
150: // Return
151: logger.debug("cache HIT.");
152: return entry.result;
153: }
154:
155: /**
156: * Remove an entry from cache.
157: */
158: private void removeEntry(CacheEntry entry) {
159: synchronized (cacheMutex) {
160: entriesByExpiration.remove(entry);
161: entriesByRepresentation.remove(entry.representation);
162: Iterator tableIterator = entry.tables.iterator();
163: while (tableIterator.hasNext()) {
164: String tableName = (String) tableIterator.next();
165: Set tableEntries = (Set) entriesByTables.get(tableName); // This shouldn't be null
166: tableEntries.remove(entry);
167: if (tableEntries.size() == 0)
168: entriesByTables.remove(tableName);
169: }
170: }
171: }
172:
173: /**
174: * Add an entry to the cache.
175: * @param stmt The statement source of result.
176: * @param limits The limits of result.
177: * @param result The SearchResult object.
178: */
179: public void addEntry(QueryStatement stmt, Limits limits,
180: SearchResult result) {
181: // Check whether entry was modified in the same transaction. Only
182: // those results are cached, which are global.
183: if (stmt.getTimeControl().isApplyTransaction())
184: return;
185: // Rep
186: String rep = getRepresentation(stmt, limits);
187: if (logger.isDebugEnabled())
188: logger.debug("adding to cache: " + rep + ", entries: "
189: + entriesByExpiration.size());
190: if ((rep == null) || ("".equals(rep)))
191: return;
192: // First, determine how many entries to free. By default, all expired
193: // entries are freed, but if there is not enough memory, entries
194: // can be forced to be removed.
195: int forceFreeResultsCount = 0; // By default none are forced
196: long freeMem = Runtime.getRuntime().freeMemory();
197: long totalMem = Runtime.getRuntime().totalMemory();
198: if ((freeMem < MIN_FREE_BYTES)
199: || (100.0 * freeMem / totalMem > MIN_FREE_RATE)) {
200: if (logger.isDebugEnabled())
201: logger.debug("not enough memory to cache, free: "
202: + freeMem + ", total: " + totalMem);
203: // Not enough memory, set force free count
204: forceFreeResultsCount = result.getResult().size()
205: * FREE_RATE + 1;
206: }
207: // Free entries
208: long currentTime = System.currentTimeMillis();
209: long lastExpiration = currentTime;
210: while (((forceFreeResultsCount > 0) || (lastExpiration < currentTime))
211: && (entriesByExpiration.size() > 0)) {
212: // Get top entry
213: CacheEntry entry = null;
214: synchronized (cacheMutex) {
215: entry = (CacheEntry) entriesByExpiration.first();
216: }
217: // Set indicators
218: lastExpiration = entry.expiration;
219: forceFreeResultsCount -= entry.result.getResult().size();
220: // Free it
221: removeEntry(entry);
222: }
223: if (logger.isDebugEnabled())
224: logger.debug("cache entries after free: "
225: + entriesByExpiration.size());
226: // Create new entry
227: CacheEntry entry = new CacheEntry();
228: entry.representation = rep;
229: entry.result = result;
230: entry.accessCount = 0;
231: entry.firstAccess = currentTime;
232: entry.lastAccess = currentTime;
233: entry.expiration = currentTime + EXPIRATION_INTERVAL;
234: entry.tables = stmt.getTables();
235: if (entry.tables == null)
236: entry.tables = stmt.computeTables();
237: entry.startSerial = stmt.getTimeControl().getSerial()
238: .longValue();
239: // Add new entry to cache
240: synchronized (cacheMutex) {
241: // Determine whether entry is current (all table
242: // modifications are previous to entry)
243: Iterator tableIterator = entry.tables.iterator();
244: while (tableIterator.hasNext()) {
245: String tableName = (String) tableIterator.next();
246: Long lastModificationSerial = (Long) serialsByTables
247: .get(tableName);
248: if (lastModificationSerial == null)
249: lastModificationSerial = startSerial;
250: if (lastModificationSerial.longValue() > entry.startSerial)
251: return; // Table is newer than query, so query is historical
252: }
253: // Add to maps
254: entriesByExpiration.add(entry);
255: entriesByRepresentation.put(entry.representation, entry);
256: // Add to table indexed map
257: tableIterator = entry.tables.iterator();
258: while (tableIterator.hasNext()) {
259: String tableName = (String) tableIterator.next();
260: Set tableEntries = (Set) entriesByTables.get(tableName);
261: if (tableEntries == null) {
262: tableEntries = new HashSet();
263: entriesByTables.put(tableName, tableEntries);
264: }
265: tableEntries.add(entry);
266: }
267: }
268: }
269:
270: /**
271: * Initialize the cache with the current serial.
272: */
273: public void init() {
274: synchronized (cacheMutex) {
275: clear();
276: startSerial = context.getNodeManager().getNextSerial();
277: }
278: }
279:
280: /**
281: * Clear the cache.
282: */
283: public void clear() {
284: synchronized (cacheMutex) {
285: entriesByExpiration = new TreeSet();
286: entriesByRepresentation = new HashMap();
287: entriesByTables = new HashMap();
288: serialsByTables = new HashMap();
289: }
290: }
291:
292: /**
293: * Tell the cache, that a table was updated. If an object is updated,
294: * the old resultsets could be theoretically kept, with an other time
295: * control, but empirically that does not add to cache hits, because more
296: * often, only current resultsets are selected.
297: * @param tableName The table to update.
298: * @param modifySerial The modification serial of table.
299: */
300: public void updateEntries(String tableName, Long modifySerial) {
301: // Update table
302: synchronized (cacheMutex) {
303: serialsByTables.put(tableName, modifySerial);
304: }
305: // Get entries
306: Set entries = null;
307: synchronized (cacheMutex) {
308: entries = (Set) entriesByTables.get(tableName);
309: }
310: if (entries != null) {
311: // Remove all entries
312: synchronized (cacheMutex) {
313: Iterator entryIterator = entries.iterator();
314: while (entryIterator.hasNext()) {
315: CacheEntry entry = (CacheEntry) entryIterator
316: .next();
317: entriesByExpiration.remove(entry);
318: entriesByRepresentation
319: .remove(entry.representation);
320: }
321: entriesByTables.remove(tableName);
322: }
323: if (logger.isDebugEnabled())
324: logger.debug("updated cache table '" + tableName
325: + "', entry count: "
326: + entriesByExpiration.size());
327: }
328: }
329:
330: /**
331: * This is a single cache entry.
332: */
333: private class CacheEntry implements Comparable {
334: // Statistics
335: public int accessCount;
336: public long firstAccess;
337: public long lastAccess;
338: public long expiration;
339:
340: // Data
341: public String representation;
342: public Set tables;
343: public SearchResult result;
344:
345: // Valid markers
346: public long startSerial; // Maximum of touched table last changed serials
347:
348: public int compareTo(Object obj) {
349: return (int) (expiration - ((CacheEntry) obj).expiration);
350: }
351: }
352:
353: static {
354: try {
355: ResourceBundle config = ResourceBundle
356: .getBundle("beankeeper");
357: MIN_FREE_BYTES = Integer.valueOf(
358: config.getString("cache.min_free_bytes"))
359: .intValue();
360: MIN_FREE_RATE = Integer.valueOf(
361: config.getString("cache.min_free_rate")).intValue();
362: FREE_RATE = Integer.valueOf(
363: config.getString("cache.force_free_rate"))
364: .intValue();
365: EXPIRATION_INTERVAL = Integer.valueOf(
366: config.getString("cache.expiration")).intValue();
367: } catch (Exception e) {
368: logger
369: .error(
370: "could not read configuration file, using hardcoded defaults.",
371: e);
372: }
373: }
374: }
|