001: package com.jofti.store;
002:
003: import java.nio.ByteBuffer;
004: import java.util.Properties;
005:
006: import org.apache.commons.logging.Log;
007: import org.apache.commons.logging.LogFactory;
008:
009: import com.jofti.btree.BTree;
010: import com.jofti.btree.IPage;
011: import com.jofti.btree.LeafNodeEntry;
012: import com.jofti.core.IStoreKey;
013: import com.jofti.exception.JoftiException;
014: import com.jofti.util.PrimeFinder;
015:
016: public class HashedStoreManager extends AbstractStoreManager {
017:
018: private static Log log = LogFactory
019: .getLog(AbstractStoreManager.class);
020:
021: protected StoreWrapper[] nodes = null;
022:
023: protected Object[] locks = null;
024:
025: long stores = 0;
026:
027: public void init(Properties properties) throws JoftiException {
028: if (log.isInfoEnabled()) {
029: log.info("Initialising Hashed store manager");
030: }
031: super .init(properties);
032: maxNodes = PrimeFinder.nextPrime(maxNodes);
033:
034: nodes = new StoreWrapper[maxNodes];
035: locks = new Object[maxNodes];
036: for (int i = maxNodes - 1; i >= 0; i--) {
037: locks[i] = new Object();
038: }
039: if (log.isInfoEnabled()) {
040: log.info("adjusted maxNodes to " + maxNodes);
041: log.info("Initialised Hashed store manager");
042: }
043: }
044:
045: public void removeAll() throws JoftiException {
046: for (int i = 0; i < nodes.length; i++) {
047: StoreWrapper wrap = nodes[i];
048: if (wrap != null) {
049: CachedPage old = (CachedPage) wrap.page;
050: old.releaseReference();
051: if (!old.hasReferences()) {
052: pageManager.releasePage(old.page);
053: }
054: nodes[i] = null;
055: }
056: }
057: doRemoveAll();
058: }
059:
060: protected IPage getNewPage(int size) {
061: IPage temp = doGetNewPage(size);
062:
063: // wrap the page
064: CachedPage cPage = new CachedPage(temp);
065: cPage.entries = new LeafNodeEntry[BTree.getMaxNodeSize()];
066: // add a usage reference here
067: cPage.acquireReference();
068: return cPage;
069: }
070:
071: public IStoreKey store(IStoreKey key, IPage page)
072: throws JoftiException {
073: StoreWrapper wrapper = null;
074:
075: // get the number of positions we need to store the data
076: int limit = page.getBuffer().limit();
077: FilePositionHolder[] array = allocatePositions(key
078: .getFilePositions(), limit);
079: key.setFilePositions(array);
080:
081: // make sure we are storing a copy here
082: ByteBuffer buf = pageManager.acquireBuffer(limit);
083:
084: doStore(key, page.copyBuffer(buf));
085: // release the buffer we have just stored
086: pageManager.releaseBuffer(buf);
087:
088: int i = (int) (key.getId() % maxNodes);
089:
090: // add to cached pages
091: synchronized (locks[i]) {
092: // release our reference
093: ((CachedPage) page).releaseReference();
094: // get the old ref
095: wrapper = nodes[i];
096: // if previous is null
097: if (wrapper == null) {
098: StoreWrapper newWrapper = new StoreWrapper(key, page);
099: nodes[i] = newWrapper;
100: // we are replacing the entry here
101: } else if (!wrapper.key.equals(key)) {
102:
103: CachedPage old = (CachedPage) wrapper.page;
104: if (!old.hasReferences()) {
105: pageManager.releasePage(old.page);
106: } else {
107: if (log.isDebugEnabled()) {
108: log.debug("unable to release ref on write "
109: + old);
110: }
111: }
112: StoreWrapper newWrapper = new StoreWrapper(key, page);
113: nodes[i] = newWrapper;
114: }
115: // else the page should be attached to the wrapper anyway
116:
117: }
118:
119: // we do not care
120: return null;
121: }
122:
123: public void releasePage(IStoreKey key, IPage page) {
124:
125: int i = (int) (key.getId() % maxNodes);
126:
127: synchronized (locks[i]) {
128: ((CachedPage) page).releaseReference();
129: }
130: }
131:
132: public StoreWrapper retrieve(IStoreKey key) throws JoftiException {
133:
134: StoreWrapper wrapper = null;
135:
136: int i = (int) (key.getId() % maxNodes);
137:
138: synchronized (locks[i]) {
139: // get the previous entry
140: wrapper = nodes[i];
141: if (wrapper != null && wrapper.key.equals(key)) {
142: ((CachedPage) wrapper.page).acquireReference();
143: return wrapper;
144: }
145: }
146:
147: IPage page = doRetrieve(key);
148: // create a new cached node
149: CachedPage cPage = new CachedPage(page);
150: cPage.entries = new LeafNodeEntry[BTree.getMaxNodeSize()];
151: cPage.acquireReference();
152: wrapper = new StoreWrapper(key, cPage);
153:
154: // otherwise replace the val
155:
156: synchronized (locks[i]) {
157: StoreWrapper old = nodes[i];
158:
159: if (old != null && !old.key.equals(key)) {
160: CachedPage oldPage = (CachedPage) old.page;
161: if (!oldPage.hasReferences()) {
162: pageManager.releasePage(oldPage.page);
163: } else {
164: if (log.isDebugEnabled()) {
165: log.debug("unable to release due to ref "
166: + oldPage.page);
167: }
168: }
169: }
170:
171: nodes[i] = wrapper;
172: }
173: return wrapper;
174:
175: }
176:
177: public void remove(IStoreKey key, IPage page) throws JoftiException {
178: releasePage(key, page);
179: int i = (int) (key.getId() % maxNodes);
180:
181: StoreWrapper wrapper = null;
182:
183: synchronized (locks[i]) {
184:
185: wrapper = nodes[i];
186: if (wrapper != null && wrapper.key.equals(key)) {
187: CachedPage oldPage = (CachedPage) wrapper.page;
188: oldPage.releaseReference();
189: if (!oldPage.hasReferences()) {
190: pageManager.releasePage(oldPage.page);
191: } else {
192: if (log.isDebugEnabled()) {
193: log.debug("unable to release due to ref "
194: + oldPage.page);
195: }
196: }
197:
198: nodes[i] = null;
199:
200: }
201:
202: }
203: doRemove(key);
204:
205: }
206:
207: }
|