001: package com.jofti.store;
002:
003: import java.io.IOException;
004: import java.nio.ByteBuffer;
005: import java.util.Arrays;
006: import java.util.Iterator;
007: import java.util.Properties;
008:
009: import org.apache.commons.logging.Log;
010: import org.apache.commons.logging.LogFactory;
011:
012: import com.jofti.btree.BTree;
013: import com.jofti.btree.EntrySplitWrapper;
014: import com.jofti.btree.IPage;
015: import com.jofti.btree.LeafNodeEntry;
016: import com.jofti.core.IStoreKey;
017: import com.jofti.core.IStoreManager;
018: import com.jofti.exception.JoftiException;
019:
020: import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicLong;
021:
022: /**
023: * <p>
024: * Base store manager that deals with non-implementation specific actions. The majority of the
025: * functions are to do with storing and retrieving pages to and from disk.
026: * </p>
027: * <p>
028: * Sub classesshould make use of the bstract methods provided to allow diffirent types of
029: * cache optimisations to make alleviate some of the direct to disk behaviour.
030: * </p>
031: * @author xenephon
032: *
033: */
034: public abstract class AbstractStoreManager implements IStoreManager,
035: IEntrySerializer {
036:
037: // values used for initial properties
038: final String OVERFLOW_DIRECTORY = "directory";
039: final String BLOCK_SIZE = "block-size";
040: final String FILE_SIZE = "file-size";
041: final String MAX_NODES = "max-nodes";
042: final String MAX_FILES = "max-files";
043: final String FILE_NAME = "filename";
044:
045: /* objects that manage the reusable objects */
046:
047: //manages the file access and writing
048: FileManager logManager = null;
049:
050: // buffers used to transfer data to the files
051: BufferManager bufferManager = null;
052:
053: // serialization helpers
054: HelperManager helperManager = null;
055:
056: //manager for pages
057: PageManager pageManager = null;
058:
059: //marker for curent storepointer
060: protected int currentStore = 0;
061:
062: // used to issue new keys per node
063: final AtomicLong keyId = new AtomicLong(0);
064:
065: // properties
066: protected Properties configProperties = null;
067: protected String name = null;
068:
069: private static Log log = LogFactory
070: .getLog(AbstractStoreManager.class);
071:
072: int misses = 0;
073: int hits = 0;
074: int pendingHits = 0;
075: int storeRetrieves = 0;
076:
077: int size = 0;
078:
079: int blockSize = 4096; //default is 4k
080: int fileSize = 30485760; // default is 300Mb
081: int maxFiles = 10; //default is 10
082: String overflowDirectory = "/tmp"; //temp directory for operating system
083: String fileName = "jofti";
084:
085: int maxBuffers = 100; //maximum reusable buffer objects - depends on threads
086: int bufferNumber = 10; // number of buffers to create initially
087:
088: int maxNodes = 3000; //default max nodes to keep in memory
089: int helperNumber = 30; // number of helpers - should be able to cope with maximum number of threads
090:
091: /* (non-Javadoc)
092: * @see com.jofti.store.IStoreManager#init(java.util.Properties)
093: */
094: public synchronized void init(Properties properties)
095: throws JoftiException {
096:
097: setUpProperties(properties);
098:
099: if (log.isInfoEnabled()) {
100: log.info("block size is " + blockSize);
101: log.info("max file length is " + fileSize);
102: log.info("max nodes are " + maxNodes);
103: log.info("max file stores is " + maxFiles);
104: log.info("Overflow directory is " + overflowDirectory);
105: log.info("filename is " + fileName);
106:
107: }
108: //set up the log files
109: logManager = new FileManager();
110: logManager.init(blockSize, overflowDirectory, fileName,
111: fileSize, maxFiles);
112:
113: //set up the buffers that transfer data to the files
114: bufferManager = new BufferManager();
115: bufferManager.init(blockSize, maxBuffers, bufferNumber);
116:
117: //set up a serialization helper
118: helperManager = new HelperManager();
119: helperManager.init(helperNumber, blockSize, BTree
120: .getMaxNodeSize(), this );
121:
122: //set up the page manager
123: pageManager = new PageManager();
124: pageManager.init(blockSize, 100, 10, this , this );
125:
126: }
127:
128: /**
129: * @param properties
130: * @throws JoftiException
131: */
132: protected void setUpProperties(Properties properties)
133: throws JoftiException {
134: if (properties != null) {
135: configProperties = properties;
136: String key = null;
137: for (Iterator it = properties.keySet().iterator(); it
138: .hasNext();) {
139: key = (String) it.next();
140: if (BLOCK_SIZE.equalsIgnoreCase(key)) {
141: String tempSize = properties.getProperty(key);
142: try {
143: blockSize = Integer.parseInt(tempSize);
144: } catch (Exception e) {
145: throw new JoftiException("block-size of "
146: + tempSize + " is not valid");
147: }
148: }
149:
150: if (FILE_SIZE.equalsIgnoreCase(key)) {
151: String tempFile = properties.getProperty(key);
152: try {
153: fileSize = Integer.parseInt(tempFile);
154: } catch (Exception e) {
155: throw new JoftiException("file lengths of "
156: + tempFile + " is not valid");
157: }
158: }
159: if (MAX_NODES.equalsIgnoreCase(key)) {
160: String temp = properties.getProperty(key);
161: try {
162: maxNodes = Integer.parseInt(temp);
163: } catch (Exception e) {
164: throw new JoftiException("max nodes of " + temp
165: + " is not valid");
166: }
167: }
168:
169: if (MAX_FILES.equalsIgnoreCase(key)) {
170: String temp = properties.getProperty(key);
171: try {
172: maxFiles = Integer.parseInt(temp);
173: } catch (Exception e) {
174: throw new JoftiException("max store number of "
175: + temp + " is not valid");
176: }
177:
178: }
179: if (OVERFLOW_DIRECTORY.equalsIgnoreCase(key)) {
180:
181: overflowDirectory = properties.getProperty(key);
182:
183: }
184: if (FILE_NAME.equalsIgnoreCase(key)) {
185:
186: fileName = properties.getProperty(key);
187:
188: }
189:
190: }
191: }
192: }
193:
194: /* (non-Javadoc)
195: * @see com.jofti.store.IStoreManager#getName()
196: */
197: public String getName() {
198: return name;
199: }
200:
201: /* (non-Javadoc)
202: * @see com.jofti.store.IStoreManager#setName(java.lang.String)
203: */
204: public void setName(String name) {
205: this .name = name;
206: }
207:
208: /* (non-Javadoc)
209: * @see com.jofti.core.IStoreManager#getNextKey()
210: */
211: public IStoreKey getNextKey() {
212: StoreKey key = new StoreKey(keyId.getAndIncrement());
213: try {
214: key.setFilePositions(new FilePositionHolder[] { logManager
215: .getNextPosition() });
216: } catch (JoftiException e) {
217: throw new RuntimeException(e);
218: }
219: key.newKey = true;
220: return key;
221: }
222:
223: /* (non-Javadoc)
224: * @see com.jofti.core.IStoreManager#store(com.jofti.core.IStoreKey, com.jofti.btree.IPage)
225: */
226: public abstract IStoreKey store(IStoreKey key, IPage obj)
227: throws JoftiException;
228:
229: /* (non-Javadoc)
230: * @see com.jofti.core.IStoreManager#retrieve(com.jofti.core.IStoreKey)
231: */
232: public abstract StoreWrapper retrieve(IStoreKey key)
233: throws JoftiException;
234:
235: protected FilePositionHolder[] allocatePositions(
236: FilePositionHolder[] filePositions, int size)
237: throws JoftiException {
238: return logManager.allocateBlocks(filePositions, size);
239:
240: }
241:
242: /**
243: * <p>
244: * Method that does the actual work for storing a byteBuffer to disk. The Byte buffer can be
245: * of any size and the filepositions in the key object are used to determine where the data is written.
246: * </p>
247: * @param key
248: * @param buffer
249: * @return
250: * @throws JoftiException
251: */
252: protected IStoreKey doStore(IStoreKey key, ByteBuffer buffer)
253: throws JoftiException {
254:
255: buffer.rewind();
256: FilePositionHolder[] positions = key.getFilePositions();
257:
258: if (positions.length == 1) {
259: BlockBuffer buf = (BlockBuffer) bufferManager
260: .acquireBuffer(positions[0]);
261: // write a single buffer
262: try {
263: buf.writeFurniture(key.getNumber());
264: buf.put(buffer);
265: FileStore file = logManager
266: .getFileStore(positions[0].file);
267: buf.file = file;
268: buf.write();
269: } catch (IOException ie) {
270: throw new JoftiException(ie);
271: } finally {
272: bufferManager.releaseBuffer(buf);
273: }
274: } else {
275: // we need to loop through the positions and write out the buffers
276: // as we have a node that is too big for single buffer
277: int offSet = 0;
278: int originalLimit = buffer.limit();
279:
280: for (int i = 0; i < positions.length; i++) {
281: BlockBuffer buf = (BlockBuffer) bufferManager
282: .acquireBuffer(positions[i]);
283: try {
284: buf.writeFurniture(key.getNumber());
285:
286: // set the pos to be the first offset
287: buffer.position(offSet);
288: // set a temp limit
289: if (originalLimit - offSet > buf.remaining()) {
290: buffer.limit(offSet + buf.remaining());
291: offSet = buffer.limit();
292: } else {
293: buffer.limit(originalLimit);
294: offSet = originalLimit;
295: }
296:
297: buf.put(buffer);
298:
299: buf.file = logManager
300: .getFileStore(positions[i].file);
301: buf.write();
302: } catch (IOException ie) {
303: throw new JoftiException(ie);
304: } finally {
305: bufferManager.releaseBuffer(buf);
306: }
307:
308: }
309:
310: }
311:
312: return key;
313: }
314:
315: /* (non-Javadoc)
316: * @see com.jofti.core.IStoreManager#releasePage(com.jofti.core.IStoreKey, com.jofti.btree.IPage)
317: */
318: public abstract void releasePage(IStoreKey key, IPage page);
319:
320: /**
321: * <p>
322: * Used to retrieve a new page. Sub classes should use this method to decorate the
323: * pages in any way they want in order to improve any cache optimisations.
324: * </p>
325: * @param size
326: * @return
327: */
328: protected abstract IPage getNewPage(int size);
329:
330: /**
331: * <p>
332: * The method that does the work of retrieving the page from the backing store.
333: * </p>
334: * @param key
335: * @return
336: * @throws JoftiException
337: */
338:
339: protected IPage doRetrieve(IStoreKey key) throws JoftiException {
340:
341: if (key.isNewKey()) {
342: return doGetNewPage(blockSize);
343: }
344:
345: IPage page = null;
346:
347: try {
348: FilePositionHolder[] positions = key.getFilePositions();
349:
350: if (positions.length == 1) {
351: // just get the values straight
352: BlockBuffer buf = bufferManager
353: .acquireBuffer(positions[0]);
354: ExternalisableHelper helper = helperManager
355: .acquireHelper();
356: try {
357: FileStore file = logManager
358: .getFileStore(positions[0].file);
359: buf.read(file);
360:
361: page = helper.readExternalBuffer(buf.buffer, key
362: .getNumber());
363: page.setManager(this );
364: return page;
365: } finally {
366: bufferManager.releaseBuffer(buf);
367: helperManager.releaseHelper(helper);
368: }
369: } else {
370: // merge all the buffers into a single buffer
371: ByteBuffer totalBuf = pageManager
372: .acquireBuffer(positions.length * blockSize);
373: totalBuf.clear();
374: ExternalisableHelper helper = helperManager
375: .acquireHelper();
376: // Object[] obj =null;
377: int number = 0;
378: try {
379: for (int i = 0; i < positions.length; i++) {
380: BlockBuffer buf = bufferManager
381: .acquireBuffer(positions[i]);
382:
383: try {
384: FileStore file = logManager
385: .getFileStore(positions[i].file);
386: buf.read(file);
387: totalBuf.put(buf.buffer);
388: if (i == 0) {
389: number = buf.numberEntries;
390: }
391: } finally {
392: bufferManager.releaseBuffer(buf);
393: }
394: }
395: //
396: totalBuf.flip();
397: try {
398: page = helper.readExternalBuffer(totalBuf,
399: number);
400: } catch (Throwable t) {
401: log.fatal("buffer read failure ", t);
402: throw t;
403: }
404: page.setManager(this );
405: } finally {
406: helperManager.releaseHelper(helper);
407: pageManager.releaseBuffer(totalBuf);
408: }
409: return page;
410: }
411: } catch (Throwable t) {
412: log.fatal("error retrieving record ", t);
413: throw new JoftiException("error retrieving record ", t);
414: }
415:
416: }
417:
418: /* (non-Javadoc)
419: * @see com.jofti.core.IStoreManager#remove(com.jofti.core.IStoreKey, com.jofti.btree.IPage)
420: */
421: public abstract void remove(IStoreKey key, IPage page)
422: throws JoftiException;
423:
424: /* (non-Javadoc)
425: * @see com.jofti.core.IStoreManager#removeAll()
426: */
427: public abstract void removeAll() throws JoftiException;
428:
429: protected void doRemoveAll() throws JoftiException {
430:
431: logManager.reset();
432: }
433:
434: protected void doRemove(IStoreKey key) throws JoftiException {
435:
436: FilePositionHolder[] holder = key.getFilePositions();
437:
438: if (holder != null) {
439: for (int i = 0; i < holder.length; i++) {
440: logManager.removePosition(holder[i]);
441: }
442:
443: }
444:
445: }
446:
447: /* Serializer methods */
448:
449: /* (non-Javadoc)
450: * @see com.jofti.store.IEntrySerializer#convertForStorage(java.lang.Object)
451: */
452: public byte[] convertForStorage(Object obj) throws JoftiException {
453: ExternalisableHelper helper = helperManager.acquireHelper();
454: try {
455: return helper.convertForStore((LeafNodeEntry) obj);
456: } finally {
457: helperManager.releaseHelper(helper);
458: }
459: }
460:
461: /* (non-Javadoc)
462: * @see com.jofti.store.IEntrySerializer#convertFromStorage(java.nio.ByteBuffer)
463: */
464: public LeafNodeEntry convertFromStorage(ByteBuffer buf)
465: throws JoftiException {
466: ExternalisableHelper helper = helperManager.acquireHelper();
467: try {
468: return (LeafNodeEntry) helper.convertFromStore(buf);
469: } finally {
470: helperManager.releaseHelper(helper);
471: }
472: }
473:
474: /**
475: * @param size
476: * @return
477: */
478: protected IPage doGetNewPage(int size) {
479:
480: IPage page = null;
481:
482: try {
483: page = pageManager.acquirePage(size);
484: } catch (Exception e) {
485: throw new RuntimeException(
486: "unable to acquire page for size " + size);
487: }
488: return page;
489:
490: }
491:
492: /* (non-Javadoc)
493: * @see com.jofti.core.IStoreManager#split(com.jofti.btree.IPage, int)
494: */
495: public EntrySplitWrapper[] split(IPage page, int entryNumber) {
496:
497: int splitPoint = entryNumber / 2;
498:
499: //set up the new buffer
500: ByteBuffer buf = page.getBuffer();
501: int[] pointers = page.getPointers();
502: // see if we need more than block size
503: int endPoint = pointers[entryNumber - 1];
504:
505: buf.position(endPoint);
506: int endLength = buf.getInt() + 4;
507:
508: int bufferSize = (endLength + endPoint) - pointers[splitPoint];
509:
510: IPage newPage = getNewPage(bufferSize);
511:
512: newPage.getBuffer().limit(bufferSize);
513: newPage.getBuffer().rewind();
514:
515: int length = entryNumber - splitPoint;
516: System.arraycopy(pointers, splitPoint, newPage.getPointers(),
517: 0, length);
518:
519: // write the buffer
520: buf.position(newPage.getPointers()[0]);
521:
522: buf.mark();
523: try {
524: newPage.getBuffer().put(buf);
525: } catch (Throwable t) {
526: log.fatal("error " + newPage.getBuffer() + " " + buf);
527: throw new RuntimeException(t);
528: }
529: newPage.getBuffer().flip();
530:
531: buf.reset();
532: buf.flip();
533:
534: // erase the higher entries
535: Arrays.fill(pointers, splitPoint, pointers.length - 1, -1);
536:
537: // ok nor reset the higher fields
538: int offSet = newPage.getPointers()[0];
539:
540: for (int i = 0; i < length; i++) {
541: newPage.getPointers()[i] = newPage.getPointers()[i]
542: - offSet;
543: }
544:
545: EntrySplitWrapper[] entries = new EntrySplitWrapper[2];
546: EntrySplitWrapper splitWrapper = new EntrySplitWrapper();
547: splitWrapper.entries = page;
548: splitWrapper.size = length;
549: entries[0] = splitWrapper;
550:
551: EntrySplitWrapper newWrapper = new EntrySplitWrapper();
552: newWrapper.entries = newPage;
553: newWrapper.size = splitPoint;
554:
555: entries[1] = newWrapper;
556:
557: return entries;
558:
559: }
560:
561: }
|