001: /*
002: * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright
003: * notice. All rights reserved.
004: */
005: package com.tc.objectserver.persistence.sleepycat;
006:
007: import com.sleepycat.je.Database;
008: import com.sleepycat.je.DatabaseEntry;
009: import com.sleepycat.je.LockMode;
010: import com.sleepycat.je.OperationStatus;
011: import com.tc.logging.TCLogger;
012: import com.tc.objectserver.persistence.api.PersistenceTransaction;
013: import com.tc.objectserver.persistence.api.PersistenceTransactionProvider;
014: import com.tc.objectserver.persistence.sleepycat.SleepycatPersistor.SleepycatPersistorBase;
015: import com.tc.util.Conversion;
016: import com.tc.util.UUID;
017: import com.tc.util.sequence.MutableSequence;
018:
019: class SleepycatSequence extends SleepycatPersistorBase implements
020: MutableSequence {
021: private static final String UID_KEY = "UIDKEY-3475674589230";
022: private final long startValue;
023: private final DatabaseEntry key;
024: private final Database sequenceDB;
025: private final PersistenceTransactionProvider ptxp;
026: private final String uid;
027:
028: SleepycatSequence(PersistenceTransactionProvider ptxp,
029: TCLogger logger, long sequenceID, long startValue,
030: Database sequenceDB) {
031: this .ptxp = ptxp;
032: this .startValue = startValue;
033: this .sequenceDB = sequenceDB;
034: key = new DatabaseEntry();
035: key.setData(Conversion.long2Bytes(sequenceID));
036: this .uid = getOrCreateUID();
037: }
038:
039: private String getOrCreateUID() {
040: PersistenceTransaction tx = ptxp.newTransaction();
041: String newuid;
042: try {
043: DatabaseEntry ukey = new DatabaseEntry();
044: ukey.setData(Conversion.string2Bytes(UID_KEY));
045: DatabaseEntry value = new DatabaseEntry();
046: OperationStatus status = this .sequenceDB.get(pt2nt(tx),
047: ukey, value, LockMode.DEFAULT);
048:
049: if (OperationStatus.SUCCESS.equals(status)) {
050: newuid = Conversion.bytes2String(value.getData());
051: } else if (OperationStatus.NOTFOUND.equals(status)) {
052: newuid = createUID();
053: value.setData(Conversion.string2Bytes(newuid));
054: status = this .sequenceDB.put(pt2nt(tx), ukey, value);
055: if (!OperationStatus.SUCCESS.equals(status)) {
056: throw new DBException(
057: "Unable to store UID for SleepycatSequence: "
058: + newuid + "): " + status);
059: }
060: } else {
061: throw new DBException(
062: "Unable to retrieve UID for SleepycatSequence: "
063: + status);
064: }
065: tx.commit();
066: return newuid;
067: } catch (Exception t) {
068: abortOnError(tx);
069: t.printStackTrace();
070: throw (t instanceof DBException ? (DBException) t
071: : new DBException(t));
072: }
073: }
074:
075: private String createUID() {
076: UUID uuid = UUID.getUUID();
077: return uuid.toString();
078: }
079:
080: /**
081: * NOTE: This current() implementation is a little different from the rest in that it returns the value that will be
082: * returned when you call next()
083: */
084: public synchronized long current() {
085: PersistenceTransaction tx = ptxp.newTransaction();
086: try {
087: DatabaseEntry value = new DatabaseEntry();
088: long currentValue = startValue;
089: OperationStatus status = this .sequenceDB.get(pt2nt(tx),
090: key, value, LockMode.DEFAULT);
091:
092: if (OperationStatus.SUCCESS.equals(status)) {
093: currentValue = Conversion.bytes2Long(value.getData());
094: } else if (!OperationStatus.NOTFOUND.equals(status)) {
095: // Formatting
096: throw new DBException(
097: "Unable to retrieve current value: " + status);
098: }
099: tx.commit();
100: return currentValue;
101: } catch (Exception t) {
102: abortOnError(tx);
103: t.printStackTrace();
104: throw (t instanceof DBException ? (DBException) t
105: : new DBException(t));
106: }
107:
108: }
109:
110: public synchronized long next() {
111: return nextBatch(1);
112: }
113:
114: public synchronized long nextBatch(int batchSize) {
115: if (batchSize < 1)
116: throw new AssertionError(
117: "Can't increment by a value less than 1.");
118: PersistenceTransaction tx = ptxp.newTransaction();
119: try {
120: DatabaseEntry value = new DatabaseEntry();
121: long currentValue = startValue;
122: OperationStatus status = this .sequenceDB.get(pt2nt(tx),
123: key, value, LockMode.DEFAULT);
124:
125: if (OperationStatus.SUCCESS.equals(status)) {
126: currentValue = Conversion.bytes2Long(value.getData());
127: } else if (!OperationStatus.NOTFOUND.equals(status)) {
128: // Formatting
129: throw new DBException(
130: "Unable to retrieve current value: " + status);
131: }
132:
133: value.setData(Conversion.long2Bytes(currentValue
134: + batchSize));
135: status = this .sequenceDB.put(pt2nt(tx), key, value);
136: if (!OperationStatus.SUCCESS.equals(status)) {
137: throw new DBException("Unable to store current value: "
138: + (currentValue + batchSize) + "): " + status);
139: }
140: tx.commit();
141: return currentValue;
142: } catch (Exception t) {
143: abortOnError(tx);
144: t.printStackTrace();
145: throw (t instanceof DBException ? (DBException) t
146: : new DBException(t));
147: }
148: }
149:
150: public String getUID() {
151: return uid;
152: }
153:
154: public void setNext(long next) {
155: PersistenceTransaction tx = ptxp.newTransaction();
156: try {
157: DatabaseEntry value = new DatabaseEntry();
158: long currentValue = startValue;
159: OperationStatus status = this .sequenceDB.get(pt2nt(tx),
160: key, value, LockMode.DEFAULT);
161:
162: if (OperationStatus.SUCCESS.equals(status)) {
163: currentValue = Conversion.bytes2Long(value.getData());
164: } else if (!OperationStatus.NOTFOUND.equals(status)) {
165: // Formatting
166: throw new DBException(
167: "Unable to retrieve current value: " + status);
168: }
169:
170: if (currentValue > next) {
171: abortOnError(tx);
172: throw new AssertionError(
173: "Cant setNext sequence to a value less than current: current = "
174: + currentValue + " next = " + next);
175: }
176:
177: value.setData(Conversion.long2Bytes(next));
178: status = this .sequenceDB.put(pt2nt(tx), key, value);
179: if (!OperationStatus.SUCCESS.equals(status)) {
180: throw new DBException("Unable to store next value: "
181: + (next) + "): " + status);
182: }
183: tx.commit();
184: } catch (Exception t) {
185: abortOnError(tx);
186: t.printStackTrace();
187: throw (t instanceof DBException ? (DBException) t
188: : new DBException(t));
189: }
190: }
191: }
|