001: /*
002: * All content copyright (c) 2003-2007 Terracotta, Inc., except as may otherwise be noted in a separate copyright
003: * notice. All rights reserved.
004: */
005: package com.tcclient.cache;
006:
007: import com.tc.config.lock.LockLevel;
008: import com.tc.logging.TCLogger;
009: import com.tc.object.bytecode.Clearable;
010: import com.tc.object.bytecode.ManagerUtil;
011: import com.tc.object.bytecode.TCMap;
012: import com.tc.util.Assert;
013: import com.tc.util.DebugUtil;
014: import com.tc.util.Util;
015:
016: import java.io.Serializable;
017: import java.util.ArrayList;
018: import java.util.Collection;
019: import java.util.HashMap;
020: import java.util.HashSet;
021: import java.util.Iterator;
022: import java.util.List;
023: import java.util.Map;
024: import java.util.Set;
025:
026: /**
027: * The main class for the cache. It holds CacheData objects
028: */
029: public class CacheDataStore implements Serializable {
030: // Resources
031: private static final TCLogger logger = ManagerUtil
032: .getLogger("com.tc.cache.CacheDataStore");
033:
034: // Config
035: private final CacheConfig config;
036:
037: // Cache state, changes during lifetime
038: private final Map[] store; // <Object,
039: // CacheData>,
040: // values
041: // may be
042: // faulted
043: // out
044: private final Map[] dtmStore; // <Object,
045: // Timestamp>,
046: // values
047: // never
048: // faulted
049: // out
050: private final GlobalKeySet[] globalKeySet;
051:
052: // Local cache stats
053: private transient int hitCount;
054: private transient int missCountExpired;
055: private transient int missCountNotFound;
056:
057: // Local eviction thread
058: private transient CacheInvalidationTimer[] cacheInvalidationTimer;
059:
060: /**
061: * This is a shared object, so this only happens once. In other nodes, the initialize() method is called on load of
062: * this object into the node.
063: */
064: public CacheDataStore(CacheConfig config) {
065: this .config = config;
066:
067: // Set up cache state
068: this .store = new Map[config.getConcurrency()];
069: this .dtmStore = new Map[config.getConcurrency()];
070: initializeStore();
071:
072: this .globalKeySet = new GlobalKeySet[config
073: .getEvictorPoolSize()];
074: initializeGlobalKeySet();
075:
076: this .hitCount = 0;
077: }
078:
079: public CacheConfig getConfig() {
080: return this .config;
081: }
082:
083: private void initializeGlobalKeySet() {
084: for (int i = 0; i < config.getEvictorPoolSize(); i++) {
085: globalKeySet[i] = new GlobalKeySet();
086: }
087: }
088:
089: private void initializeStore() {
090: for (int i = 0; i < config.getConcurrency(); i++) {
091: this .store[i] = new HashMap();
092: this .dtmStore[i] = new HashMap();
093: ((Clearable) dtmStore[i]).setEvictionEnabled(false);
094: }
095: }
096:
097: /**
098: * Called onload to initialize transient per-node state
099: */
100: public void initialize() {
101: logDebug("Initializing CacheDataStore");
102:
103: int startEvictionIndex = 0;
104: this .cacheInvalidationTimer = new CacheInvalidationTimer[config
105: .getEvictorPoolSize()];
106: for (int i = 0; i < config.getEvictorPoolSize(); i++) {
107: int lastEvictionIndex = startEvictionIndex
108: + config.getStoresPerInvalidator();
109: cacheInvalidationTimer[i] = new CacheInvalidationTimer(
110: config.getInvalidatorSleepSeconds(), config
111: .getCacheName()
112: + " invalidation thread" + i);
113:
114: cacheInvalidationTimer[i].start(new CacheEntryInvalidator(
115: globalKeySet[i], startEvictionIndex,
116: lastEvictionIndex, config,
117: ManagerUtil.getManager(), this ));
118: startEvictionIndex = lastEvictionIndex;
119: }
120: }
121:
122: /**
123: * This is used as a DistributedMethod because when one node cancel a timer, other nodes need to cancel the timer as
124: * well.
125: */
126: public void stopInvalidatorThread() {
127: logDebug("stopInvalidatorThread()");
128:
129: for (int i = 0; i < config.getEvictorPoolSize(); i++) {
130: if (cacheInvalidationTimer[i] != null) {
131: cacheInvalidationTimer[i].stop();
132: }
133: }
134: }
135:
136: private int getStoreIndex(Object key) {
137: return Util.hash(key, config.getConcurrency());
138: }
139:
140: private CacheData putInternal(final Object key, final Object value) {
141: logDebug("Put [" + key + ", " + value + "]");
142: Assert.pre(key != null);
143: Assert.pre(value != null);
144:
145: CacheData cd = new CacheData(value, config);
146: cd.accessed();
147: int storeIndex = getStoreIndex(key);
148:
149: CacheData rcd = (CacheData) store[storeIndex].put(key, cd);
150: // Only need to put into the timestamp map only when the invalidator thread will be active
151: if (config.getInvalidatorSleepSeconds() >= 0) {
152: dtmStore[storeIndex].put(key, cd.getTimestamp());
153: }
154: return rcd;
155: }
156:
157: public Object put(final Object key, final Object value) {
158: CacheData rcd = putInternal(key, value);
159:
160: return ((rcd == null) ? null : rcd.getValue());
161: }
162:
163: public void putData(final Object key, final Object value) {
164: putInternal(key, value);
165: }
166:
167: // private void dumpStore() {
168: // for (int i = 0; i < config.getConcurrency(); i++) {
169: // System.err.println("Dump store Client " + manager.getClientID() + "i: " + i + " " + store[i]);
170: // }
171: // }
172:
173: public Object get(final Object key) {
174: logDebug("Get [" + key + "]");
175: Assert.pre(key != null);
176:
177: CacheData cd = null;
178: cd = findCacheDataUnlocked(key);
179: if (cd != null) {
180: if (!cd.isValid()) {
181: missCountExpired++;
182: invalidate(key, cd);
183: return null;
184: } else {
185: hitCount++;
186: cd.accessed();
187: updateTimestampIfNeeded(key, cd);
188: }
189: return cd.getValue();
190: }
191: missCountNotFound++;
192: return null;
193: }
194:
195: private void invalidate(Object key, CacheData cd) {
196: int storeIndex = getStoreIndex(key);
197: if (!cd.isInvalidated()) {
198: ManagerUtil.monitorEnter(store[storeIndex],
199: LockLevel.CONCURRENT);
200: try {
201: cd.invalidate();
202: } finally {
203: ManagerUtil.monitorExit(store[storeIndex]);
204: }
205: }
206: }
207:
208: public boolean isExpired(final Object key) {
209: CacheData rv = findCacheDataUnlocked(key);
210: return rv == null || !rv.isValid();
211: }
212:
213: public Object remove(final Object key) {
214: CacheData cd = findCacheDataUnlocked(key);
215: if (cd == null)
216: return null;
217: removeInternal(key);
218: return cd.getValue();
219: }
220:
221: private void removeInternal(final Object key) {
222: Assert.pre(key != null);
223:
224: int storeIndex = getStoreIndex(key);
225: ((TCMap) store[storeIndex]).__tc_remove_logical(key);
226: ((TCMap) dtmStore[storeIndex]).__tc_remove_logical(key);
227: }
228:
229: public void expire(Object key) {
230: removeInternal(key);
231: config.getCallback().expire(key);
232: }
233:
234: public void clear() {
235: for (int i = 0; i < config.getConcurrency(); i++) {
236: store[i].clear();
237: dtmStore[i].clear();
238: }
239: }
240:
241: public Map getStore(Object key) {
242: int storeIndex = getStoreIndex(key);
243: return store[storeIndex];
244: }
245:
246: public Set entrySet() {
247: Set entrySet = new HashSet();
248: for (int i = 0; i < config.getConcurrency(); i++) {
249: entrySet.addAll(store[i].entrySet());
250: }
251: return entrySet;
252: }
253:
254: public boolean isEmpty() {
255: for (int i = 0; i < config.getConcurrency(); i++) {
256: if (!store[i].isEmpty()) {
257: return false;
258: }
259: }
260: return true;
261: }
262:
263: public Set keySet() {
264: Set keySet = new HashSet();
265: for (int i = 0; i < config.getConcurrency(); i++) {
266: Collection entrySnapshot = ((TCMap) store[i])
267: .__tc_getAllEntriesSnapshot();
268: for (Iterator it = entrySnapshot.iterator(); it.hasNext();) {
269: Map.Entry entry = (Map.Entry) it.next();
270: keySet.add(entry.getKey());
271: }
272: }
273: return keySet;
274: }
275:
276: public boolean containsValue(Object value) {
277: CacheData cd = new CacheData(value, config);
278: for (int i = 0; i < config.getConcurrency(); i++) {
279: if (store[i].containsValue(cd)) {
280: return true;
281: }
282: }
283: return false;
284: }
285:
286: public int size() {
287: int size = 0;
288: for (int i = 0; i < config.getConcurrency(); i++) {
289: size += store[i].size();
290: }
291: return size;
292: }
293:
294: public Collection values() {
295: List values = new ArrayList();
296: for (int i = 0; i < config.getConcurrency(); i++) {
297: Collection entrySnapshot = ((TCMap) store[i])
298: .__tc_getAllEntriesSnapshot();
299: for (Iterator it = entrySnapshot.iterator(); it.hasNext();) {
300: Map.Entry entry = (Map.Entry) it.next();
301: values.add(entry.getValue());
302: }
303: }
304: return values;
305: }
306:
307: void updateTimestampIfNeeded(Object key, CacheData rv) {
308: if (config.getMaxTTLSeconds() <= 0) {
309: return;
310: }
311:
312: Assert.pre(rv != null);
313: final long now = System.currentTimeMillis();
314: final Timestamp t = rv.getTimestamp();
315: if (needsUpdate(rv)) {
316: int storeIndex = getStoreIndex(key);
317: ManagerUtil.monitorEnter(store[storeIndex],
318: LockLevel.CONCURRENT);
319: try {
320: t.setExpiredTimeMillis(now
321: + config.getMaxIdleTimeoutMillis());
322: } finally {
323: ManagerUtil.monitorExit(store[storeIndex]);
324: }
325: }
326: }
327:
328: boolean needsUpdate(CacheData rv) {
329: final long now = System.currentTimeMillis();
330: final Timestamp t = rv.getTimestamp();
331: final long diff = t.getExpiredTimeMillis() - now;
332: return (diff < (config.getMaxIdleTimeoutMillis() / 2) || diff > (config
333: .getMaxIdleTimeoutMillis()));
334: }
335:
336: Timestamp findTimestampUnlocked(final Object key) {
337: int storeIndex = getStoreIndex(key);
338: return (Timestamp) dtmStore[storeIndex].get(key);
339: }
340:
341: CacheData findCacheDataUnlocked(final Object key) {
342: int storeIndex = getStoreIndex(key);
343: final CacheData rv = (CacheData) store[storeIndex].get(key);
344: return rv;
345: }
346:
347: public int getHitCount() {
348: return hitCount;
349: }
350:
351: public int getMissCountExpired() {
352: return missCountExpired;
353: }
354:
355: public int getMissCountNotFound() {
356: return missCountNotFound;
357: }
358:
359: public void clearStatistics() {
360: this .hitCount = 0;
361: this .missCountExpired = 0;
362: this .missCountNotFound = 0;
363: }
364:
365: private void logDebug(String msg) {
366: if (config.isLoggingEnabled()) {
367: if (DebugUtil.DEBUG) {
368: System.err.println(msg);
369: }
370: logger.debug(msg);
371: }
372: }
373:
374: private void logError(String msg, Throwable t) {
375: if (config.isLoggingEnabled()) {
376: logger.error(msg, t);
377: }
378: }
379:
380: private Collection getAllLocalEntries(int startEvictionIndex,
381: int lastEvictionIndex) {
382: Collection allLocalEntries = new ArrayList();
383: for (int i = startEvictionIndex; i < lastEvictionIndex; i++) {
384: Collection t = ((TCMap) dtmStore[i])
385: .__tc_getAllLocalEntriesSnapshot();
386: allLocalEntries.addAll(t);
387: }
388: return allLocalEntries;
389: }
390:
391: Object[] getAllLocalKeys(int startEvictionIndex,
392: int lastEvictionIndex) {
393: Collection allLocalEntries = getAllLocalEntries(
394: startEvictionIndex, lastEvictionIndex);
395: Object[] allLocalKeys = new Object[allLocalEntries.size()];
396: int i = 0;
397: for (Iterator it = allLocalEntries.iterator(); it.hasNext(); i++) {
398: Map.Entry e = (Map.Entry) it.next();
399: allLocalKeys[i] = e.getKey();
400: }
401: return allLocalKeys;
402: }
403:
404: private Collection getAllOrphanEntries(Collection remoteKeys,
405: int startEvictionIndex, int lastEvictionIndex) {
406: Collection allEntries = new ArrayList();
407: for (int i = startEvictionIndex; i < lastEvictionIndex; i++) {
408: Collection t = ((TCMap) dtmStore[i])
409: .__tc_getAllEntriesSnapshot();
410: allEntries.addAll(t);
411: }
412: for (Iterator it = allEntries.iterator(); it.hasNext();) {
413: Map.Entry e = (Map.Entry) it.next();
414: if (remoteKeys.contains(e.getKey())) {
415: it.remove();
416: }
417: }
418: return allEntries;
419: }
420:
421: public void evictExpiredElements() {
422: evictExpiredElements(0, config.getConcurrency());
423: }
424:
425: public void evictExpiredElements(int startEvictionIndex,
426: int lastEvictionIndex) {
427: final Collection localEntries = getAllLocalEntries(
428: startEvictionIndex, lastEvictionIndex);
429: invalidateCacheEntries(localEntries, false, -1, -1);
430: }
431:
432: public void evictAllExpiredElements(Collection remoteKeys,
433: int startEvictionIndex, int lastEvictionIndex) {
434: final Collection orphanEntries = getAllOrphanEntries(
435: remoteKeys, startEvictionIndex, lastEvictionIndex);
436: invalidateCacheEntries(orphanEntries, true, config
437: .getNumOfChunks(), config.getRestMillis());
438: }
439:
440: private void invalidateCacheEntries(
441: final Collection entriesToBeExamined,
442: boolean isGlobalInvalidation, int numOfChunks,
443: long restMillis) {
444: int totalCnt = 0;
445: int evaled = 0;
446: int notEvaled = 0;
447: int errors = 0;
448: long numOfObjectsPerChunk = entriesToBeExamined.size();
449:
450: if (isGlobalInvalidation) {
451: // Use ceiling here so that we get at least 1 obj / chunk
452: numOfObjectsPerChunk = (int) Math.ceil(entriesToBeExamined
453: .size()
454: * 1.0 / numOfChunks);
455: }
456:
457: for (Iterator it = entriesToBeExamined.iterator(); it.hasNext();) {
458: final Map.Entry timestampEntry = (Map.Entry) it.next();
459:
460: try {
461: final Timestamp dtm = findTimestampUnlocked(timestampEntry
462: .getKey());
463: if (dtm == null)
464: continue;
465: totalCnt++;
466: if (dtm.getInvalidatedTimeMillis() < System
467: .currentTimeMillis()) {
468: evaled++;
469: logDebug("expiring .... key: "
470: + timestampEntry.getKey());
471: expire(timestampEntry.getKey());
472: } else {
473: notEvaled++;
474: }
475: } catch (Throwable t) {
476: errors++;
477: t.printStackTrace(System.err);
478: logError(
479: "Unhandled exception inspecting session "
480: + timestampEntry.getKey()
481: + " for invalidation", t);
482: } finally {
483: if (isGlobalInvalidation) {
484: if ((totalCnt % numOfObjectsPerChunk) == 0) {
485: try {
486: Thread.sleep(restMillis);
487: } catch (InterruptedException e) {
488: // ignore
489: }
490: }
491: }
492: }
493: }
494: }
495:
496: }
|