001: /*
002: * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright
003: * notice. All rights reserved.
004: */
005: package com.tc.objectserver.persistence.sleepycat;
006:
007: import com.sleepycat.je.Cursor;
008: import com.sleepycat.je.CursorConfig;
009: import com.sleepycat.je.Database;
010: import com.sleepycat.je.DatabaseEntry;
011: import com.sleepycat.je.DatabaseException;
012: import com.sleepycat.je.LockMode;
013: import com.sleepycat.je.OperationStatus;
014: import com.sleepycat.je.Transaction;
015: import com.tc.logging.TCLogger;
016: import com.tc.net.protocol.tcm.ChannelID;
017: import com.tc.objectserver.persistence.api.ClientStatePersistor;
018: import com.tc.objectserver.persistence.api.PersistenceTransaction;
019: import com.tc.objectserver.persistence.api.PersistenceTransactionProvider;
020: import com.tc.objectserver.persistence.impl.ClientNotFoundException;
021: import com.tc.objectserver.persistence.sleepycat.SleepycatPersistor.SleepycatPersistorBase;
022: import com.tc.util.Conversion;
023: import com.tc.util.sequence.MutableSequence;
024:
025: import java.util.HashSet;
026: import java.util.Set;
027:
028: class ClientStatePersistorImpl extends SleepycatPersistorBase implements
029: ClientStatePersistor {
030:
031: private final Database db;
032: private final CursorConfig cursorConfig;
033: private final DatabaseEntry key;
034: private final DatabaseEntry value;
035: private final PersistenceTransactionProvider ptp;
036: private final TCLogger logger;
037: private final MutableSequence connectionIDSequence;
038:
039: ClientStatePersistorImpl(final TCLogger logger,
040: final PersistenceTransactionProvider ptp,
041: final MutableSequence connectionIDSequence,
042: final Database db) {
043: this .logger = logger;
044: this .ptp = ptp;
045: this .cursorConfig = new CursorConfig();
046: this .cursorConfig.setReadCommitted(true);
047: this .db = db;
048: this .key = new DatabaseEntry();
049: this .value = new DatabaseEntry();
050: this .connectionIDSequence = connectionIDSequence;
051: }
052:
053: public MutableSequence getConnectionIDSequence() {
054: return this .connectionIDSequence;
055: }
056:
057: public synchronized boolean containsClient(ChannelID id) {
058: setKey(id);
059: try {
060: PersistenceTransaction tx = ptp.newTransaction();
061: OperationStatus status = db.get(pt2nt(tx), key, value,
062: LockMode.DEFAULT);
063: tx.commit();
064: return OperationStatus.SUCCESS.equals(status);
065: } catch (DatabaseException e) {
066: throw new DBException(e);
067: }
068: }
069:
070: public synchronized Set loadClientIDs() {
071: Set set = new HashSet();
072: Cursor cursor;
073: try {
074: PersistenceTransaction tx = ptp.newTransaction();
075: cursor = db.openCursor(pt2nt(tx), cursorConfig);
076: while (OperationStatus.SUCCESS.equals(cursor.getNext(key,
077: value, LockMode.DEFAULT))) {
078: set.add(new ChannelID(Conversion.bytes2Long(key
079: .getData())));
080: }
081: cursor.close();
082: tx.commit();
083: } catch (DatabaseException e) {
084: e.printStackTrace();
085: throw new DBException(e);
086: }
087: return set;
088: }
089:
090: public synchronized void saveClientState(ChannelID clientID) {
091: // someday, maybe we'll need to save more state, but for now, this stops
092: // from overwriting the sequence.
093: if (containsClient(clientID))
094: return;
095: basicSave(clientID.toLong());
096: logger.debug("Saved client state for " + clientID);
097: }
098:
099: private void basicSave(long clientID) {
100:
101: setKey(clientID);
102: value.setData(Conversion.long2Bytes(0));
103: try {
104: PersistenceTransaction tx = ptp.newTransaction();
105: Transaction realTx = pt2nt(tx);
106: OperationStatus status = db.put(realTx, key, value);
107: if (!OperationStatus.SUCCESS.equals(status)) {
108: realTx.abort();
109: throw new DBException(
110: "Unable to save client state: ChannelID "
111: + clientID + "; status: " + status);
112: }
113: tx.commit();
114: } catch (DatabaseException e) {
115: throw new DBException(e);
116: }
117: }
118:
119: public synchronized void deleteClientState(ChannelID id)
120: throws ClientNotFoundException {
121: setKey(id);
122: try {
123: PersistenceTransaction tx = ptp.newTransaction();
124: Transaction realTx = pt2nt(tx);
125:
126: OperationStatus status = db.delete(realTx, key);
127: if (OperationStatus.NOTFOUND.equals(status)) {
128: realTx.abort();
129: throw new ClientNotFoundException("Client not found: "
130: + id);
131: }
132: if (!OperationStatus.SUCCESS.equals(status)) {
133: realTx.abort();
134: throw new DBException("Unable to delete client state: "
135: + id + "; status: " + status);
136: }
137:
138: tx.commit();
139: logger.debug("Deleted client state for " + id);
140: } catch (DatabaseException e) {
141: throw new DBException(e);
142: }
143: }
144:
145: private void setKey(ChannelID id) {
146: setKey(id.toLong());
147: }
148:
149: private void setKey(long id) {
150: setData(id, key);
151: }
152:
153: private void setData(long id, DatabaseEntry entry) {
154: entry.setData(Conversion.long2Bytes(id));
155: }
156:
157: }
|