001: /*
002: * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright
003: * notice. All rights reserved.
004: */
005: package com.tc.objectserver.persistence.sleepycat;
006:
007: import com.sleepycat.bind.serial.ClassCatalog;
008: import com.sleepycat.je.Cursor;
009: import com.sleepycat.je.CursorConfig;
010: import com.sleepycat.je.Database;
011: import com.sleepycat.je.DatabaseEntry;
012: import com.sleepycat.je.DatabaseException;
013: import com.sleepycat.je.LockMode;
014: import com.sleepycat.je.OperationStatus;
015: import com.tc.exception.TCRuntimeException;
016: import com.tc.logging.TCLogger;
017: import com.tc.object.ObjectID;
018: import com.tc.objectserver.core.api.ManagedObject;
019: import com.tc.objectserver.core.api.ManagedObjectState;
020: import com.tc.objectserver.managedobject.MapManagedObjectState;
021: import com.tc.objectserver.persistence.api.ManagedObjectPersistor;
022: import com.tc.objectserver.persistence.api.PersistenceTransaction;
023: import com.tc.objectserver.persistence.api.PersistenceTransactionProvider;
024: import com.tc.objectserver.persistence.sleepycat.SleepycatPersistor.SleepycatPersistorBase;
025: import com.tc.properties.TCPropertiesImpl;
026: import com.tc.text.PrettyPrinter;
027: import com.tc.util.Assert;
028: import com.tc.util.Conversion;
029: import com.tc.util.ObjectIDSet2;
030: import com.tc.util.SyncObjectIdSet;
031: import com.tc.util.SyncObjectIdSetImpl;
032: import com.tc.util.sequence.MutableSequence;
033:
034: import java.io.IOException;
035: import java.util.Collection;
036: import java.util.Comparator;
037: import java.util.HashMap;
038: import java.util.HashSet;
039: import java.util.Iterator;
040: import java.util.Map;
041: import java.util.Set;
042: import java.util.SortedSet;
043: import java.util.TreeSet;
044:
045: public final class ManagedObjectPersistorImpl extends
046: SleepycatPersistorBase implements ManagedObjectPersistor {
047:
048: private static final Comparator MO_COMPARATOR = new Comparator() {
049: public int compare(Object o1, Object o2) {
050: long oid1 = ((ManagedObject) o1).getID().toLong();
051: long oid2 = ((ManagedObject) o2).getID().toLong();
052: if (oid1 < oid2) {
053: return -1;
054: } else if (oid1 > oid2) {
055: return 1;
056: } else {
057: return 0;
058: }
059: }
060: };
061:
062: private static final Object MO_PERSISTOR_KEY = ManagedObjectPersistorImpl.class
063: .getName()
064: + ".saveAllObjects";
065: private static final Object MO_PERSISTOR_VALUE = "Complete";
066:
067: private final Database objectDB;
068: private final Database oidDB;
069: private final SerializationAdapterFactory saf;
070: private final CursorConfig dBCursorConfig;
071: private final MutableSequence objectIDSequence;
072: private final Database rootDB;
073: private final CursorConfig rootDBCursorConfig;
074: private long saveCount;
075: private final TCLogger logger;
076: private final PersistenceTransactionProvider ptp;
077: private final ClassCatalog classCatalog;
078: SerializationAdapter serializationAdapter;
079: private final SleepycatCollectionsPersistor collectionsPersistor;
080: private final String OID_FAST_LOAD = "l2.objectmanager.loadObjectID.fastLoad";
081: private final boolean oidFastLoad;
082: private final boolean paranoid;
083: private final OidBitsArrayMapManager oidManager;
084:
085: public ManagedObjectPersistorImpl(TCLogger logger,
086: ClassCatalog classCatalog,
087: SerializationAdapterFactory serializationAdapterFactory,
088: Database objectDB, Database oidDB,
089: CursorConfig dBCursorConfig,
090: MutableSequence objectIDSequence, Database rootDB,
091: CursorConfig rootDBCursorConfig,
092: PersistenceTransactionProvider ptp,
093: SleepycatCollectionsPersistor collectionsPersistor,
094: boolean paranoid) {
095: this .logger = logger;
096: this .classCatalog = classCatalog;
097: this .saf = serializationAdapterFactory;
098: this .objectDB = objectDB;
099: this .oidDB = oidDB;
100: this .dBCursorConfig = dBCursorConfig;
101: this .objectIDSequence = objectIDSequence;
102: this .rootDB = rootDB;
103: this .rootDBCursorConfig = rootDBCursorConfig;
104: this .ptp = ptp;
105: this .collectionsPersistor = collectionsPersistor;
106: this .paranoid = paranoid;
107:
108: this .oidFastLoad = TCPropertiesImpl.getProperties().getBoolean(
109: OID_FAST_LOAD);
110: if (this .oidFastLoad) {
111: this .oidManager = new OidBitsArrayMapManagerImpl(logger,
112: paranoid, oidDB, ptp, dBCursorConfig);
113: } else {
114: this .oidManager = null;
115: }
116: }
117:
118: public long nextObjectIDBatch(int batchSize) {
119: return objectIDSequence.nextBatch(batchSize);
120: }
121:
122: public void setNextAvailableObjectID(long startID) {
123: objectIDSequence.setNext(startID);
124: }
125:
126: public void addRoot(PersistenceTransaction tx, String name,
127: ObjectID id) {
128: validateID(id);
129: OperationStatus status = null;
130: try {
131: DatabaseEntry key = new DatabaseEntry();
132: DatabaseEntry value = new DatabaseEntry();
133: setStringData(key, name);
134: setObjectIDData(value, id);
135:
136: status = this .rootDB.put(pt2nt(tx), key, value);
137: } catch (Throwable t) {
138: throw new DBException(t);
139: }
140: if (!OperationStatus.SUCCESS.equals(status)) {
141: throw new DBException("Unable to write root id: " + name
142: + "=" + id + "; status: " + status);
143: }
144: }
145:
146: public ObjectID loadRootID(String name) {
147: if (name == null)
148: throw new AssertionError(
149: "Attempt to retrieve a null root name");
150: OperationStatus status = null;
151: try {
152: DatabaseEntry key = new DatabaseEntry();
153: DatabaseEntry value = new DatabaseEntry();
154: setStringData(key, name);
155: PersistenceTransaction tx = ptp.newTransaction();
156: status = this .rootDB.get(pt2nt(tx), key, value,
157: LockMode.DEFAULT);
158: tx.commit();
159: if (OperationStatus.SUCCESS.equals(status)) {
160: ObjectID rv = getObjectIDData(value);
161: return rv;
162: }
163: } catch (Throwable t) {
164: throw new DBException(t);
165: }
166: if (OperationStatus.NOTFOUND.equals(status))
167: return ObjectID.NULL_ID;
168: else
169: throw new DBException("Error retrieving root: " + name
170: + "; status: " + status);
171: }
172:
173: public Set loadRoots() {
174: Set rv = new HashSet();
175: Cursor cursor = null;
176: try {
177: DatabaseEntry key = new DatabaseEntry();
178: DatabaseEntry value = new DatabaseEntry();
179: PersistenceTransaction tx = ptp.newTransaction();
180: cursor = rootDB.openCursor(pt2nt(tx), rootDBCursorConfig);
181: while (OperationStatus.SUCCESS.equals(cursor.getNext(key,
182: value, LockMode.DEFAULT))) {
183: rv.add(getObjectIDData(value));
184: }
185: cursor.close();
186: tx.commit();
187: } catch (Throwable t) {
188: throw new DBException(t);
189: }
190: return rv;
191: }
192:
193: public SyncObjectIdSet getAllObjectIDs() {
194: SyncObjectIdSet rv = new SyncObjectIdSetImpl();
195: if (paranoid) {
196: rv.startPopulating();
197: Thread t;
198: if (this .oidFastLoad) {
199: t = new Thread(oidManager.createObjectIdReader(rv),
200: "OidObjectIdReaderThread");
201: } else {
202: t = new Thread(new ObjectIdReader(rv),
203: "ObjectIdReaderThread");
204: }
205: t.setDaemon(true);
206: t.start();
207: }
208: return rv;
209: }
210:
211: public Set loadRootNames() {
212: Set rv = new HashSet();
213: Cursor cursor = null;
214: try {
215: PersistenceTransaction tx = ptp.newTransaction();
216: DatabaseEntry key = new DatabaseEntry();
217: DatabaseEntry value = new DatabaseEntry();
218: cursor = rootDB.openCursor(pt2nt(tx), rootDBCursorConfig);
219: while (OperationStatus.SUCCESS.equals(cursor.getNext(key,
220: value, LockMode.DEFAULT))) {
221: rv.add(getStringData(key));
222: }
223: cursor.close();
224: tx.commit();
225: } catch (Throwable t) {
226: throw new DBException(t);
227: }
228: return rv;
229: }
230:
231: public Map loadRootNamesToIDs() {
232: Map rv = new HashMap();
233: Cursor cursor = null;
234: try {
235: PersistenceTransaction tx = ptp.newTransaction();
236: DatabaseEntry key = new DatabaseEntry();
237: DatabaseEntry value = new DatabaseEntry();
238: cursor = rootDB.openCursor(pt2nt(tx), rootDBCursorConfig);
239: while (OperationStatus.SUCCESS.equals(cursor.getNext(key,
240: value, LockMode.DEFAULT))) {
241: rv.put(getStringData(key), getObjectIDData(value));
242: }
243: cursor.close();
244: tx.commit();
245: } catch (Throwable t) {
246: throw new DBException(t);
247: }
248: return rv;
249: }
250:
251: public ManagedObject loadObjectByID(ObjectID id) {
252: validateID(id);
253: OperationStatus status = null;
254: PersistenceTransaction tx = ptp.newTransaction();
255: try {
256: DatabaseEntry key = new DatabaseEntry();
257: DatabaseEntry value = new DatabaseEntry();
258: setObjectIDData(key, id);
259: status = this .objectDB.get(pt2nt(tx), key, value,
260: LockMode.DEFAULT);
261: if (OperationStatus.SUCCESS.equals(status)) {
262: ManagedObject mo = getManagedObjectData(value);
263: loadCollection(tx, mo);
264: tx.commit();
265: return mo;
266: }
267: } catch (Throwable e) {
268: abortOnError(tx);
269: throw new DBException(e);
270: }
271: if (OperationStatus.NOTFOUND.equals(status))
272: return null;
273: else
274: throw new DBException("Error retrieving object id: " + id
275: + "; status: " + status);
276: }
277:
278: private void loadCollection(PersistenceTransaction tx,
279: ManagedObject mo) throws IOException,
280: ClassNotFoundException, TCDatabaseException {
281: ManagedObjectState state = mo.getManagedObjectState();
282: if (state.getType() == ManagedObjectState.MAP_TYPE
283: || state.getType() == ManagedObjectState.PARTIAL_MAP_TYPE) {
284: MapManagedObjectState mapState = (MapManagedObjectState) state;
285: Assert.assertNull(mapState.getMap());
286: try {
287: mapState.setMap(collectionsPersistor.loadMap(tx, mo
288: .getID()));
289: } catch (DatabaseException e) {
290: throw new TCDatabaseException(e);
291: }
292: }
293: }
294:
295: public void saveObject(
296: PersistenceTransaction persistenceTransaction,
297: ManagedObject managedObject) {
298: Assert.assertNotNull(managedObject);
299: validateID(managedObject.getID());
300: OperationStatus status = null;
301: try {
302: status = basicSaveObject(persistenceTransaction,
303: managedObject);
304: if (oidFastLoad && paranoid
305: && OperationStatus.SUCCESS.equals(status)) {
306: status = oidManager.oidPut(persistenceTransaction,
307: managedObject.getID());
308: }
309: } catch (DBException e) {
310: throw e;
311: } catch (Throwable t) {
312: throw new DBException("Trying to save object: "
313: + managedObject, t);
314: }
315:
316: if (!OperationStatus.SUCCESS.equals(status)) {
317: throw new DBException("Unable to write ManagedObject: "
318: + managedObject + "; status: " + status);
319: }
320:
321: }
322:
323: private OperationStatus basicSaveObject(PersistenceTransaction tx,
324: ManagedObject managedObject) throws TCDatabaseException,
325: IOException {
326: if (!managedObject.isDirty())
327: return OperationStatus.SUCCESS;
328: OperationStatus status;
329: DatabaseEntry key = new DatabaseEntry();
330: DatabaseEntry value = new DatabaseEntry();
331: setObjectIDData(key, managedObject.getID());
332: setManagedObjectData(value, managedObject);
333: try {
334: status = this .objectDB.put(pt2nt(tx), key, value);
335: if (OperationStatus.SUCCESS.equals(status)) {
336: basicSaveCollection(tx, managedObject);
337: managedObject.setIsDirty(false);
338: saveCount++;
339: if (saveCount == 1 || saveCount % (100 * 1000) == 0) {
340: logger.debug("saveCount: " + saveCount);
341: }
342: }
343: } catch (DatabaseException de) {
344: throw new TCDatabaseException(de);
345: }
346: return status;
347: }
348:
349: private void basicSaveCollection(PersistenceTransaction tx,
350: ManagedObject managedObject) throws IOException,
351: TCDatabaseException {
352: ManagedObjectState state = managedObject
353: .getManagedObjectState();
354: if (state.getType() == ManagedObjectState.MAP_TYPE
355: || state.getType() == ManagedObjectState.PARTIAL_MAP_TYPE) {
356: MapManagedObjectState mapState = (MapManagedObjectState) state;
357: SleepycatPersistableMap map = (SleepycatPersistableMap) mapState
358: .getMap();
359: try {
360: collectionsPersistor.saveMap(tx, map);
361: } catch (DatabaseException e) {
362: throw new TCDatabaseException(e);
363: }
364: }
365: }
366:
367: public void saveAllObjects(
368: PersistenceTransaction persistenceTransaction,
369: Collection managedObjects) {
370: long t0 = System.currentTimeMillis();
371: if (managedObjects.isEmpty())
372: return;
373: Object failureContext = null;
374:
375: // XXX:: We are sorting so that we maintain lock ordering when writting to sleepycat (check
376: // SleepycatPersistableMap.basicClear()). This is done under the assumption that this method is not called
377: // twice with the same transaction
378: Object old = persistenceTransaction.setProperty(
379: MO_PERSISTOR_KEY, MO_PERSISTOR_VALUE);
380: Assert.assertNull(old);
381: SortedSet sortedList = getSortedManagedObjectsSet(managedObjects);
382: HashSet oidSet = new HashSet();
383:
384: try {
385: for (Iterator i = sortedList.iterator(); i.hasNext();) {
386: final ManagedObject managedObject = (ManagedObject) i
387: .next();
388:
389: final OperationStatus status = basicSaveObject(
390: persistenceTransaction, managedObject);
391:
392: if (!OperationStatus.SUCCESS.equals(status)) {
393: failureContext = new Object() {
394: public String toString() {
395: return "Unable to save ManagedObject: "
396: + managedObject + "; status: "
397: + status;
398: }
399: };
400: break;
401: }
402:
403: // record new object-IDs to be written to persistent store later.
404: if (oidFastLoad && paranoid) {
405: oidSet.add(managedObject.getID());
406: }
407: }
408: // write all new Object-IDs to persistor
409: if (oidFastLoad && paranoid) {
410: if (!OperationStatus.SUCCESS.equals(oidManager
411: .oidPutAll(persistenceTransaction, oidSet))) {
412: throw new DBException("Failed to save Object-IDs");
413: }
414: }
415: } catch (Throwable t) {
416: throw new DBException(t);
417: }
418:
419: if (failureContext != null)
420: throw new DBException(failureContext.toString());
421:
422: long delta = System.currentTimeMillis() - t0;
423: saveAllElapsed += delta;
424: saveAllCount++;
425: saveAllObjectCount += managedObjects.size();
426: if (saveAllCount % (100 * 1000) == 0) {
427: double avg = ((double) saveAllObjectCount / (double) saveAllElapsed) * 1000;
428: logger.debug("save time: " + delta + ", "
429: + managedObjects.size() + " objects; avg: " + avg
430: + "/sec");
431: }
432: }
433:
434: private SortedSet getSortedManagedObjectsSet(
435: Collection managedObjects) {
436: TreeSet sorted = new TreeSet(MO_COMPARATOR);
437: sorted.addAll(managedObjects);
438: Assert.assertEquals(managedObjects.size(), sorted.size());
439: return sorted;
440: }
441:
442: /**
443: * ObjectIDs extend AbstractIdentifiers which are Sortable
444: */
445: private SortedSet getSortedObjectIDs(Collection objectIDs) {
446: TreeSet sorted = new TreeSet();
447: sorted.addAll(objectIDs);
448: Assert.assertEquals(objectIDs.size(), sorted.size());
449: return sorted;
450: }
451:
452: private long saveAllCount = 0;
453: private long saveAllObjectCount = 0;
454: private long saveAllElapsed = 0;
455:
456: private void deleteObjectByID(PersistenceTransaction tx, ObjectID id) {
457: validateID(id);
458: try {
459: DatabaseEntry key = new DatabaseEntry();
460: setObjectIDData(key, id);
461: OperationStatus status = this .objectDB.delete(pt2nt(tx),
462: key);
463: if (!(OperationStatus.NOTFOUND.equals(status) || OperationStatus.SUCCESS
464: .equals(status))) {
465: // make the formatter happy
466: throw new DBException(
467: "Unable to remove ManagedObject for object id: "
468: + id + ", status: " + status);
469: } else {
470: collectionsPersistor.deleteCollection(tx, id);
471: }
472: } catch (DatabaseException t) {
473: throw new DBException(t);
474: }
475: }
476:
477: public void deleteAllObjectsByID(PersistenceTransaction tx,
478: Collection objectIDs) {
479: // Sorting to maintain lock ordering - check saveAllObjects
480: SortedSet sortedOids = getSortedObjectIDs(objectIDs);
481: for (Iterator i = sortedOids.iterator(); i.hasNext();) {
482: ObjectID objectId = (ObjectID) i.next();
483: deleteObjectByID(tx, objectId);
484: }
485:
486: if (oidFastLoad && paranoid) {
487: try {
488: oidManager.oidDeleteAll(tx, sortedOids);
489: } catch (TCDatabaseException de) {
490: throw new TCRuntimeException(de);
491: }
492: }
493: }
494:
495: /**
496: * This is only package protected for tests.
497: */
498: SerializationAdapter getSerializationAdapter() throws IOException {
499: // XXX: This lazy initialization comes from how the sleepycat stuff is glued together in the server.
500: if (serializationAdapter == null)
501: serializationAdapter = saf.newAdapter(this .classCatalog);
502: return serializationAdapter;
503: }
504:
505: /*********************************************************************************************************************
506: * Private stuff
507: */
508:
509: private void validateID(ObjectID id) {
510: Assert.assertNotNull(id);
511: Assert.eval(!ObjectID.NULL_ID.equals(id));
512: }
513:
514: private void setObjectIDData(DatabaseEntry entry, ObjectID objectID) {
515: entry.setData(Conversion.long2Bytes(objectID.toLong()));
516: }
517:
518: private void setStringData(DatabaseEntry entry, String string)
519: throws IOException {
520: getSerializationAdapter().serializeString(entry, string);
521: }
522:
523: private void setManagedObjectData(DatabaseEntry entry,
524: ManagedObject mo) throws IOException {
525: getSerializationAdapter().serializeManagedObject(entry, mo);
526: }
527:
528: private ObjectID getObjectIDData(DatabaseEntry entry) {
529: return new ObjectID(Conversion.bytes2Long(entry.getData()));
530: }
531:
532: private String getStringData(DatabaseEntry entry)
533: throws IOException, ClassNotFoundException {
534: return getSerializationAdapter().deserializeString(entry);
535: }
536:
537: private ManagedObject getManagedObjectData(DatabaseEntry entry)
538: throws IOException, ClassNotFoundException {
539: return getSerializationAdapter()
540: .deserializeManagedObject(entry);
541: }
542:
543: public void prettyPrint(PrettyPrinter out) {
544: out.println(this .getClass().getName());
545: out = out.duplicateAndIndent();
546: out.println("db: " + objectDB);
547: }
548:
549: /*
550: * the old/slow reading object-Ids at server restart
551: */
552: class ObjectIdReader implements Runnable {
553: protected final SyncObjectIdSet set;
554:
555: public ObjectIdReader(SyncObjectIdSet set) {
556: this .set = set;
557: }
558:
559: public void run() {
560: Assert
561: .assertTrue(
562: "Shall be in persistent mode to refresh Object IDs at startup",
563: paranoid);
564:
565: ObjectIDSet2 tmp = new ObjectIDSet2();
566: PersistenceTransaction tx = null;
567: Cursor cursor = null;
568: try {
569: tx = ptp.newTransaction();
570: cursor = objectDB.openCursor(pt2nt(tx), dBCursorConfig);
571: DatabaseEntry key = new DatabaseEntry();
572: DatabaseEntry value = new DatabaseEntry();
573: while (OperationStatus.SUCCESS.equals(cursor.getNext(
574: key, value, LockMode.DEFAULT))) {
575: tmp.add(new ObjectID(Conversion.bytes2Long(key
576: .getData())));
577: }
578: } catch (Throwable t) {
579: logger.error("Error Reading Object IDs", t);
580: } finally {
581: safeClose(cursor);
582: safeCommit(tx);
583: set.stopPopulating(tmp);
584: tmp = null;
585: }
586: }
587:
588: protected void safeCommit(PersistenceTransaction tx) {
589: if (tx == null)
590: return;
591: try {
592: tx.commit();
593: } catch (Throwable t) {
594: logger.error("Error Committing Transaction", t);
595: }
596: }
597:
598: protected void safeClose(Cursor c) {
599: if (c == null)
600: return;
601:
602: try {
603: c.close();
604: } catch (Throwable e) {
605: logger.error("Error closing cursor", e);
606: }
607: }
608: }
609:
610: // for testing purpose only
611: public OidBitsArrayMapManagerImpl getOidManager() {
612: return (OidBitsArrayMapManagerImpl) oidManager;
613: }
614: }
|