001: /*
002: * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright
003: * notice. All rights reserved.
004: */
005: package com.tc.objectserver.persistence.sleepycat;
006:
007: import EDU.oswego.cs.dl.util.concurrent.BoundedLinkedQueue;
008:
009: import com.sleepycat.je.Cursor;
010: import com.sleepycat.je.CursorConfig;
011: import com.sleepycat.je.Database;
012: import com.sleepycat.je.DatabaseEntry;
013: import com.sleepycat.je.DatabaseException;
014: import com.sleepycat.je.LockMode;
015: import com.sleepycat.je.OperationStatus;
016: import com.tc.logging.TCLogger;
017: import com.tc.object.ObjectID;
018: import com.tc.objectserver.persistence.api.PersistenceTransaction;
019: import com.tc.objectserver.persistence.api.PersistenceTransactionProvider;
020: import com.tc.objectserver.persistence.sleepycat.SleepycatPersistor.SleepycatPersistorBase;
021: import com.tc.properties.TCPropertiesImpl;
022: import com.tc.util.Assert;
023: import com.tc.util.Conversion;
024: import com.tc.util.ObjectIDSet2;
025: import com.tc.util.OidLongArray;
026: import com.tc.util.SyncObjectIdSet;
027:
028: import java.util.Collection;
029: import java.util.Date;
030: import java.util.HashMap;
031: import java.util.HashSet;
032: import java.util.Iterator;
033: import java.util.Set;
034: import java.util.SortedSet;
035: import java.util.TreeSet;
036:
037: public final class OidBitsArrayMapManagerImpl extends
038: SleepycatPersistorBase implements OidBitsArrayMapManager {
039: private final Database oidDB;
040: private final TCLogger logger;
041: private final PersistenceTransactionProvider ptp;
042: private final OidBitsArrayMap oidBitsArrayMap;
043: private final CursorConfig oidDBCursorConfig;
044: private volatile boolean isPopulating;
045: private final int BitsPerLong = OidLongArray.BitsPerLong;
046: private final String LONGS_PER_DISK_ENTRY = "l2.objectmanager.loadObjectID.longsPerDiskEntry";
047: private final String LONGS_PER_MEMORY_ENTRY = "l2.objectmanager.loadObjectID.longsPerMemoryEntry";
048: private final boolean paranoid;
049:
050: public OidBitsArrayMapManagerImpl(TCLogger logger,
051: boolean paranoid, Database oidDB,
052: PersistenceTransactionProvider ptp,
053: CursorConfig oidDBCursorConfig) {
054: this .oidDB = oidDB;
055: this .logger = logger;
056: this .paranoid = paranoid;
057: this .ptp = ptp;
058: this .oidDBCursorConfig = oidDBCursorConfig;
059:
060: isPopulating = false;
061: if (!this .paranoid) {
062: oidBitsArrayMap = null;
063: } else {
064: oidBitsArrayMap = new OidBitsArrayMap(TCPropertiesImpl
065: .getProperties().getInt(LONGS_PER_MEMORY_ENTRY),
066: TCPropertiesImpl.getProperties().getInt(
067: LONGS_PER_DISK_ENTRY));
068: }
069: }
070:
071: public Runnable createObjectIdReader(SyncObjectIdSet set) {
072: return (new OidObjectIdReader(set));
073: }
074:
075: /*
076: * fast way to load object-Ids at server restart by reading them from bit array
077: */
078: class OidObjectIdReader implements Runnable {
079: protected final SyncObjectIdSet set;
080:
081: public OidObjectIdReader(SyncObjectIdSet set) {
082: this .set = set;
083: }
084:
085: public void run() {
086: Assert
087: .assertTrue(
088: "Shall be in persistent mode to refresh Object IDs at startup",
089: paranoid);
090:
091: ObjectIDSet2 tmp = new ObjectIDSet2();
092: PersistenceTransaction tx = null;
093: Cursor cursor = null;
094: Thread helperThread = null;
095: try {
096: BoundedLinkedQueue queue;
097: queue = new BoundedLinkedQueue(1000);
098: helperThread = new Thread(new ObjectIdCreator(queue,
099: tmp), "OidObjectIdCreatorThread");
100: helperThread.start();
101: isPopulating = true;
102: tx = ptp.newTransaction();
103: cursor = oidDB.openCursor(pt2nt(tx), oidDBCursorConfig);
104: DatabaseEntry key = new DatabaseEntry();
105: DatabaseEntry value = new DatabaseEntry();
106: while (OperationStatus.SUCCESS.equals(cursor.getNext(
107: key, value, LockMode.DEFAULT))) {
108: queue.put(dbToOidBitsArray(key, value));
109: }
110: // null data to end helper thread
111: queue.put(new OidLongArray(null, null));
112: helperThread.join();
113: if (MeasurePerf) {
114: System.out.println("XXX done");
115: }
116: } catch (Throwable t) {
117: logger.error("Error Reading Object IDs", t);
118: } finally {
119: safeClose(cursor);
120: safeCommit(tx);
121: isPopulating = false;
122: set.stopPopulating(tmp);
123: tmp = null;
124: }
125: }
126:
127: protected void safeCommit(PersistenceTransaction tx) {
128: if (tx == null)
129: return;
130: try {
131: tx.commit();
132: } catch (Throwable t) {
133: logger.error("Error Committing Transaction", t);
134: }
135: }
136:
137: protected void safeClose(Cursor c) {
138: if (c == null)
139: return;
140:
141: try {
142: c.close();
143: } catch (Throwable e) {
144: logger.error("Error closing cursor", e);
145: }
146: }
147: }
148:
149: boolean MeasurePerf = false;
150:
151: public class ObjectIdCreator implements Runnable {
152: ObjectIDSet2 tmp;
153: BoundedLinkedQueue queue;
154: long start_time;
155: int counter = 0;
156:
157: ObjectIdCreator(BoundedLinkedQueue queue, ObjectIDSet2 tmp) {
158: this .queue = queue;
159: this .tmp = tmp;
160: }
161:
162: public void run() {
163: if (MeasurePerf)
164: start_time = new Date().getTime();
165: while (true) {
166: try {
167: OidLongArray entry = (OidLongArray) queue.take();
168: if (entry.isEnded())
169: break;
170: process(entry);
171: } catch (InterruptedException ex) {
172: logger.error("ObjectIdCreator interruptted!");
173: break;
174: }
175: }
176: }
177:
178: private void process(OidLongArray entry) {
179: oidBitsArrayMap.applyOnDiskEntry(entry);
180: long oid = entry.getKey();
181: long[] ary = entry.getArray();
182: for (int j = 0; j < oidBitsArrayMap.longsPerDiskUnit; ++j) {
183: long bit = 1L;
184: long bits = ary[j];
185: for (int i = 0; i < BitsPerLong; ++i) {
186: if ((bits & bit) != 0) {
187: tmp.add(new ObjectID(oid));
188: if (MeasurePerf) {
189: if ((++counter % 1000) == 0) {
190: long elapse_time = new Date().getTime()
191: - start_time;
192: long avg_time = elapse_time
193: / (counter / 1000);
194: System.out.println("XXX reading "
195: + counter + " OIDs " + "took "
196: + elapse_time
197: + "ms avg(1000 objs):"
198: + avg_time + " ms");
199: }
200: }
201: }
202: bit <<= 1;
203: ++oid;
204: }
205: }
206: }
207: }
208:
209: /*
210: * Sync up in-memory bits array from persistor (disk) if exist
211: */
212: private void syncOidBitsArrayDiskEntry(ObjectID objectId) {
213: DatabaseEntry key = new DatabaseEntry();
214: DatabaseEntry value = new DatabaseEntry();
215: key.setData(oidBitsArrayMap.onDiskIndex2Bytes(objectId));
216: try {
217: if (OperationStatus.SUCCESS.equals(this .oidDB.get(null,
218: key, value, LockMode.DEFAULT))) {
219: oidBitsArrayMap.applyOnDiskEntry(dbToOidBitsArray(key,
220: value));
221: }
222: } catch (DatabaseException e) {
223: logger.warn("Reading object ID " + objectId + ":" + e);
224: }
225: }
226:
227: /*
228: * Use with great care!!! Shall do db commit before next call otherwise dead lock may result.
229: */
230: public OperationStatus oidPut(PersistenceTransaction tx,
231: ObjectID objectId) throws DatabaseException {
232: // care only new object ID.
233: if (oidBitsArrayMap.contains(objectId))
234: return OperationStatus.SUCCESS;
235:
236: DatabaseEntry key = new DatabaseEntry();
237: DatabaseEntry value = new DatabaseEntry();
238: synchronized (oidBitsArrayMap) {
239: if (isPopulating) {
240: syncOidBitsArrayDiskEntry(objectId);
241: }
242:
243: OidLongArray bits = oidBitsArrayMap.getAndSet(objectId);
244: key.setData(bits.keyToBytes());
245: value.setData(bits.arrayToBytes());
246: return this .oidDB.put(pt2nt(tx), key, value);
247: }
248: }
249:
250: public void oidKeepInSet(Set<ObjectID> set, ObjectID objectId) {
251: set.add(objectId);
252: }
253:
254: /*
255: * Update in-memory array and prepare for writing all new object IDs out at end of loop.
256: */
257: private void processForPut(Set<ObjectID> rawSet,
258: SortedSet<Long> sortedOidSet) {
259: Iterator iter = rawSet.iterator();
260: synchronized (oidBitsArrayMap) {
261: while (iter.hasNext()) {
262: ObjectID objectId = (ObjectID) iter.next();
263: // care only new object ID.
264: if (oidBitsArrayMap.contains(objectId))
265: continue;
266:
267: if (isPopulating) {
268: syncOidBitsArrayDiskEntry(objectId);
269: }
270:
271: oidBitsArrayMap.getAndSet(objectId);
272: sortedOidSet.add(new Long(oidBitsArrayMap
273: .oidOnDiskIndex(objectId.toLong())));
274: }
275: }
276: }
277:
278: public OperationStatus oidPutAll(PersistenceTransaction tx,
279: Set<ObjectID> oidSet) throws TCDatabaseException {
280: OperationStatus status = OperationStatus.SUCCESS;
281: SortedSet sortedOidSet = new TreeSet();
282: processForPut(oidSet, sortedOidSet);
283: if (!oidBitsArrayMap.oidMarkInUse(sortedOidSet)) {
284: throw new TCDatabaseException("OidBitsArrayMap interrupted");
285: }
286: for (Iterator i = sortedOidSet.iterator(); i.hasNext();) {
287: DatabaseEntry key = new DatabaseEntry();
288: DatabaseEntry value = new DatabaseEntry();
289: Long keyOnDisk = (Long) i.next();
290: OidLongArray bits = oidBitsArrayMap
291: .getArrayForDisk(keyOnDisk.longValue());
292: key.setData(bits.keyToBytes());
293: value.setData(bits.arrayToBytes());
294: try {
295: status = this .oidDB.put(pt2nt(tx), key, value);
296: } catch (DatabaseException de) {
297: oidBitsArrayMap.oidUnmarkInUse(sortedOidSet);
298: throw new TCDatabaseException(de);
299: }
300: if (!OperationStatus.SUCCESS.equals(status))
301: break;
302: }
303: oidBitsArrayMap.oidUnmarkInUse(sortedOidSet);
304: return (status);
305: }
306:
307: /*
308: * Update in-memory array and prepare for deleting object IDs out at end of loop.
309: */
310: private void processForDelete(Set<ObjectID> rawSet,
311: SortedSet<Long> sortedOidSet) {
312: Iterator iter = rawSet.iterator();
313: synchronized (oidBitsArrayMap) {
314: while (iter.hasNext()) {
315: ObjectID objectId = (ObjectID) iter.next();
316: oidBitsArrayMap.getAndClr(objectId);
317: sortedOidSet.add(new Long(oidBitsArrayMap
318: .oidOnDiskIndex(objectId.toLong())));
319: }
320: }
321: }
322:
323: public OperationStatus oidDeleteAll(PersistenceTransaction tx,
324: Set<ObjectID> oidSet) throws TCDatabaseException {
325: OperationStatus status = OperationStatus.SUCCESS;
326: SortedSet sortedOidSet = new TreeSet();
327: processForDelete(oidSet, sortedOidSet);
328: if (!oidBitsArrayMap.oidMarkInUse(sortedOidSet)) {
329: throw new TCDatabaseException("OidBitsArrayMap interrupted");
330: }
331: for (Iterator i = sortedOidSet.iterator(); i.hasNext();) {
332: DatabaseEntry key = new DatabaseEntry();
333: Long keyOnDisk = (Long) i.next();
334: OidLongArray bits = oidBitsArrayMap
335: .getArrayForDisk(keyOnDisk.longValue());
336: key.setData(bits.keyToBytes());
337: try {
338: if (bits.isZero()) {
339: status = this .oidDB.delete(pt2nt(tx), key);
340: if (!OperationStatus.SUCCESS.equals(status)) {
341: oidBitsArrayMap.oidUnmarkInUse(sortedOidSet);
342: throw new TCDatabaseException(
343: "Delete non-exist on-disk oid array "
344: + bits.getKey());
345: }
346: } else {
347: DatabaseEntry value = new DatabaseEntry();
348: value.setData(bits.arrayToBytes());
349: status = this .oidDB.put(pt2nt(tx), key, value);
350: if (!OperationStatus.SUCCESS.equals(status)) {
351: oidBitsArrayMap.oidUnmarkInUse(sortedOidSet);
352: throw new TCDatabaseException("Failed to write");
353: }
354: }
355: } catch (DatabaseException de) {
356: oidBitsArrayMap.oidUnmarkInUse(sortedOidSet);
357: throw new TCDatabaseException(de);
358: }
359: }
360: oidBitsArrayMap.oidUnmarkInUse(sortedOidSet);
361: return (status);
362: }
363:
364: private OidLongArray dbToOidBitsArray(DatabaseEntry key,
365: DatabaseEntry value) {
366: return (new OidLongArray(key.getData(), value.getData()));
367: }
368:
369: /*
370: * for testing purpose. Check if contains specified objectId
371: */
372: public boolean inMemoryContains(ObjectID objectId) {
373: return oidBitsArrayMap.contains(objectId);
374: }
375:
376: /*
377: * for testing purpose only. Return all IDs with ObjectID
378: */
379: public Collection bitsArrayMapToObjectID() {
380: HashSet objectIDs = new HashSet();
381: for (Iterator i = oidBitsArrayMap.map.keySet().iterator(); i
382: .hasNext();) {
383: long oid = ((Long) i.next()).longValue();
384: OidLongArray bits = oidBitsArrayMap.getBitsArray(oid);
385: for (int offset = 0; offset < bits.totalBits(); ++offset) {
386: if (bits.isSet(offset)) {
387: Assert
388: .assertTrue(
389: "Same object ID represented by different bits in memory",
390: objectIDs.add(new ObjectID(oid
391: + offset)));
392: }
393: }
394: }
395: return (objectIDs);
396: }
397:
398: /*
399: * for testing purpose only.
400: */
401: public void resetBitsArrayMap() {
402: oidBitsArrayMap.reset();
403: }
404:
405: public class OidBitsArrayMap {
406: final HashMap map;
407: final int longsPerMemUnit;
408: final int memBitsLength;
409: final int longsPerDiskUnit;
410: final int diskBitsLength;
411: final HashSet<Long> inUseSet;
412:
413: OidBitsArrayMap(int longsPerMemUnit, int longsPerDiskUnit) {
414: this .longsPerMemUnit = longsPerMemUnit;
415: this .memBitsLength = longsPerMemUnit * BitsPerLong;
416: this .longsPerDiskUnit = longsPerDiskUnit;
417: this .diskBitsLength = longsPerDiskUnit * BitsPerLong;
418: map = new HashMap();
419: inUseSet = new HashSet<Long>();
420:
421: Assert
422: .assertTrue(
423: "LongsPerMemUnit must be multiple of LongsPerDiskUnit",
424: (longsPerMemUnit % longsPerDiskUnit) == 0);
425: }
426:
427: public boolean oidMarkInUse(Set<Long> set) {
428: synchronized (this ) {
429: while (useSetContainsAny(set)) {
430: try {
431: wait();
432: } catch (InterruptedException ex) {
433: return false;
434: }
435: }
436: inUseSet.addAll(set);
437: }
438: return true;
439: }
440:
441: public void oidUnmarkInUse(Set<Long> set) {
442: synchronized (this ) {
443: inUseSet.removeAll(set);
444: notifyAll();
445: }
446: }
447:
448: public boolean useSetContainsAny(Set set) {
449: Iterator iter = set.iterator();
450: while (iter.hasNext()) {
451: if (inUseSet.contains(iter.next()))
452: return true;
453: }
454: return false;
455: }
456:
457: public long oidOnDiskIndex(long oid) {
458: return (oid / diskBitsLength * diskBitsLength);
459: }
460:
461: public byte[] onDiskIndex2Bytes(ObjectID id) {
462: return Conversion.long2Bytes(oidOnDiskIndex(id.toLong()));
463: }
464:
465: public Long oidInMemIndex(long oid) {
466: return new Long(oid / memBitsLength * memBitsLength);
467: }
468:
469: private OidLongArray getBitsArray(long oid) {
470: Long mapIndex = oidInMemIndex(oid);
471: OidLongArray longAry;
472: synchronized (map) {
473: if (map.containsKey(mapIndex)) {
474: longAry = (OidLongArray) map.get(mapIndex);
475: } else {
476: longAry = new OidLongArray(longsPerMemUnit,
477: mapIndex.longValue());
478: map.put(mapIndex, longAry);
479: }
480: }
481: return longAry;
482: }
483:
484: public OidLongArray getArrayForDisk(long keyOnDisk) {
485: OidLongArray longAry = getBitsArray(keyOnDisk);
486: return (getArrayForDisk(longAry, keyOnDisk));
487: }
488:
489: private OidLongArray getArrayForDisk(OidLongArray inMemLongAry,
490: long oid) {
491: long keyOnDisk = oidOnDiskIndex(oid);
492: OidLongArray onDiskAry = new OidLongArray(longsPerDiskUnit,
493: keyOnDisk);
494: int offset = (int) (keyOnDisk % memBitsLength)
495: / BitsPerLong;
496: inMemLongAry.copyOut(onDiskAry, offset);
497: return onDiskAry;
498: }
499:
500: private OidLongArray getAndModify(long oid, boolean doSet) {
501: OidLongArray longAry = getBitsArray(oid);
502: int oidInArray = (int) (oid % memBitsLength);
503: synchronized (longAry) {
504: if (doSet) {
505: longAry.setBit(oidInArray);
506: } else {
507: longAry.clrBit(oidInArray);
508: }
509:
510: // purge out array if empty
511: /*
512: * not thread safe if(!doSet) { if ((value == 0L) && longAry.isZero()) { map.remove(mapIndex); } }
513: */
514: return (getArrayForDisk(longAry, oid));
515: }
516: }
517:
518: public OidLongArray getAndSet(ObjectID id) {
519: return (getAndModify(id.toLong(), true));
520: }
521:
522: public OidLongArray getAndClr(ObjectID id) {
523: return (getAndModify(id.toLong(), false));
524: }
525:
526: public void applyOnDiskEntry(OidLongArray entry) {
527: OidLongArray inMemArray = getBitsArray(entry.getKey());
528: int offset = (int) (entry.getKey() % memBitsLength)
529: / BitsPerLong;
530: inMemArray.applyIn(entry, offset);
531: }
532:
533: public boolean contains(ObjectID id) {
534: long oid = id.toLong();
535: Long mapIndex = oidInMemIndex(oid);
536: synchronized (map) {
537: if (map.containsKey(mapIndex)) {
538: OidLongArray longAry = (OidLongArray) map
539: .get(mapIndex);
540: return (longAry.isSet((int) oid % memBitsLength));
541: }
542: }
543: return (false);
544: }
545:
546: /*
547: * for testing purpose only.
548: */
549: public void reset() {
550: map.clear();
551: }
552: }
553:
554: }
|