001: package com.jofti.store;
002:
003: import java.io.File;
004: import java.io.FileNotFoundException;
005: import java.io.IOException;
006: import java.util.Properties;
007:
008: import org.apache.commons.logging.Log;
009: import org.apache.commons.logging.LogConfigurationException;
010: import org.apache.commons.logging.LogFactory;
011:
012: import com.jofti.exception.InsufficientSpaceException;
013: import com.jofti.exception.JoftiException;
014:
015: import edu.emory.mathcs.backport.java.util.concurrent.LinkedBlockingQueue;
016:
017: /**
018: *
019: * Manages files set up in Jofti. This manager sets up the files, allocates space in the files and
020: * keeps track of the current poaitions in each file.
021: * <p>
022: * This is based on some of the ideas found in HOWL.
023: * </p>
024: *
025: * @author xenephon
026: *
027: */
028: class FileManager {
029:
030: private static Log log = LogFactory.getLog(FileManager.class);
031:
032: // lock controlling access to block allocation
033: private Object recordLock = new Object();
034:
035: /**
036: * set of FileStore objects associated with the physical files.
037: *
038: * @see #open()
039: */
040: FileStore[] fileSet = null;
041:
042: /**
043: * array of unalloctaed blocks fro each store.
044: *
045: * @see #open()
046: */
047: long[] unallocatedBlocks = null;
048:
049: /**
050: * array of positions for each store.
051: *
052: */
053: long[] filePositions = null;
054:
055: /**
056: * current store
057: *
058: */
059: FileStore currentStore = null;
060:
061: // where the old blocks are put - should just be a queue of FilePositionHolders
062: protected LinkedBlockingQueue removedBlockQueue = new LinkedBlockingQueue(); //this is unbounded
063:
064: //default max files
065: private int maxFiles = 10;
066:
067: // max size of each file
068: private long maxSize = 0;
069:
070: /**
071: * keys into properties
072: *
073: */
074: protected String OVERFLOW_DIRECTORY = "overflow-directory";
075:
076: protected String fileName = "jofti";
077:
078: protected String extension = "str";
079:
080: protected String logFileDirectory = "../tmp";
081:
082: private Properties props = null;
083:
084: // block size to be used
085: int blockSize = 0;
086:
087: // block size- (header size + footer size)
088: int payloadSize = 0;
089:
090: FileManager() {
091:
092: }
093:
094: void init(int blockSize, String directory, String fileName,
095: long size, int maxFiles) throws JoftiException {
096:
097: //set up the properties we need
098:
099: this .blockSize = blockSize;
100: this .logFileDirectory = directory;
101: this .fileName = fileName;
102: this .maxSize = size;
103: this .maxFiles = maxFiles;
104:
105: // open all the files
106: try {
107: open();
108: } catch (Exception e) {
109: throw new JoftiException(e);
110: }
111:
112: // set up the initial free blocks for each file
113: unallocatedBlocks = new long[fileSet.length];
114: filePositions = new long[fileSet.length];
115:
116: long initialBlocks = maxSize / blockSize;
117:
118: // set up the unallocated blocks
119: for (int i = 0; i < fileSet.length; i++) {
120: unallocatedBlocks[i] = initialBlocks;
121: }
122:
123: //set up the payload size
124: payloadSize = blockSize
125: - (BlockBuffer.bufferHeaderSize + BlockBuffer.bufferFooterSize);
126:
127: }
128:
129: public synchronized void reset() {
130: // set up the initial free blocks for each file
131: unallocatedBlocks = new long[fileSet.length];
132: filePositions = new long[fileSet.length];
133:
134: long initialBlocks = maxSize / blockSize;
135:
136: // set up the unallocated blocks
137: for (int i = 0; i < fileSet.length; i++) {
138: unallocatedBlocks[i] = initialBlocks;
139: }
140: removedBlockQueue.clear();
141: }
142:
143: /**
144: * Allocates an array of FilePositionHolder objects which encompasses the locations
145: * of the page data across one or more files.
146: * <p>
147: * @param blocks - existing file positions (if any)
148: * @param size - total number of bytes to store
149: * @return
150: * @throws JoftiException
151: */
152: public FilePositionHolder[] allocateBlocks(
153: FilePositionHolder[] blocks, int size)
154: throws JoftiException {
155:
156: FilePositionHolder[] returnBlocks = null;
157:
158: // empty storage
159: if (size == 0) {
160: return blocks;
161: }
162:
163: // see how many keys we need
164: int keysNeeded = size / payloadSize;
165: int remainder = size % payloadSize;
166:
167: // is there a bit left over?
168: if (remainder != 0) {
169: keysNeeded++;
170: }
171:
172: if (log.isDebugEnabled()) {
173: log.debug("payloadSize " + payloadSize
174: + " number keys needed " + keysNeeded
175: + " for data size:" + size);
176: }
177:
178: // we now know the number of keys needed - but we do not know what original ones we have?
179:
180: try {
181: returnBlocks = allocateRecords(blocks, keysNeeded);
182: } catch (Exception e) {
183: throw new JoftiException(e);
184: }
185:
186: return returnBlocks;
187:
188: }
189:
190: private FilePositionHolder[] allocateRecords(
191: FilePositionHolder[] blocks, int number)
192: throws InsufficientSpaceException, Exception {
193:
194: int originalRecordNumber = 0;
195: FilePositionHolder[] temp = null;
196:
197: if (blocks != null) {
198: originalRecordNumber += blocks.length;
199: }
200:
201: if (number == originalRecordNumber) {
202: // return the same blocks
203: temp = blocks;
204: } else if (number < originalRecordNumber) {
205: // otherwise we need to free some up
206:
207: temp = new FilePositionHolder[number];
208:
209: //free up blocks we do not need
210: for (int i = 0; i < blocks.length; i++) {
211: if (i < number) {
212: temp[i] = blocks[i];
213: } else {
214: removedBlockQueue.put(blocks[i]);
215: }
216:
217: }
218: return temp;
219: }
220:
221: // we need more than we have
222: else if (number > originalRecordNumber) {
223:
224: // we need to add some more records here
225: temp = new FilePositionHolder[number];
226: // copy the existing records into the new array
227:
228: System.arraycopy(blocks, 0, temp, 0, blocks.length);
229:
230: for (int i = blocks.length; i < temp.length; i++) {
231: // try and get from the freeQueue first
232:
233: temp[i] = getNextPosition();
234: }
235:
236: }
237: return temp;
238:
239: }
240:
241: public void removePosition(FilePositionHolder holder)
242: throws JoftiException {
243: removedBlockQueue.offer(holder);
244: }
245:
246: public FilePositionHolder getNextPosition() throws JoftiException {
247: FileStore file = null;
248:
249: //see if we can get one from the free queue first
250: FilePositionHolder fileHolder = null;
251:
252: fileHolder = (FilePositionHolder) removedBlockQueue.poll();
253:
254: if (fileHolder != null) {
255: return fileHolder;
256: }
257:
258: file = currentStore;
259:
260: // store we started at
261: int marker = file.fileId;
262: long position = 0;
263:
264: do {
265: synchronized (recordLock) {
266: if (unallocatedBlocks[file.fileId] > 0) {
267: position = filePositions[file.fileId];
268: fileHolder = new FilePositionHolder(file.fileId,
269: position);
270: filePositions[file.fileId] = position + blockSize;
271: unallocatedBlocks[file.fileId]--;
272:
273: //we rotate the store so we get even spread of entries in all stores
274: if (file.fileId == unallocatedBlocks.length - 1) {
275: currentStore = getFileStore(0);
276: } else {
277: currentStore = getFileStore(file.fileId + 1);
278: }
279: return fileHolder;
280: } else {
281:
282: // ok we can't get as key here - try the next one
283: file = getFileStore(file.fileId + 1);
284:
285: if (file != null) {
286: currentStore = file;
287: if (log.isInfoEnabled()) {
288: log.info("Storing in:" + file.fileId
289: + " moving to next store");
290: }
291: }
292: }
293: }
294: // loop around until we have run out of stores to try
295: } while (file != null && file.fileId != marker);
296: // we have dropped through with no file available
297:
298: throw new JoftiException("No space available in any file");
299: }
300:
301: FileStore getFileStore(int id) {
302: FileStore lf = null;
303: int fsl = fileSet.length;
304:
305: if (id >= fsl) {
306: return null;
307: } else {
308: return fileSet[id];
309: }
310:
311: }
312:
313: /**
314: * open pool of FileStore(s).
315: *
316: * @throws FileNotFoundException
317: * @throws LogConfigurationException
318: * @throws IOException
319: * @throws InvalidFileSetException
320: */
321: void open() throws JoftiException, IOException,
322: FileNotFoundException, JoftiException {
323:
324: // get configuration information for log file names.
325: String logDir = logFileDirectory;
326: String logFileName = fileName;
327: String logFileExt = extension;
328:
329: // make sure the directory exists
330: File dir = new File(logDir);
331: dir.mkdirs();
332:
333: int existingFiles = 0;
334:
335: // allocate the set of log files
336: fileSet = new FileStore[maxFiles];
337: for (int i = 0; i < maxFiles; ++i) {
338: File name = new File(logDir + "/" + logFileName + "_"
339: + (i + 1) + "." + logFileExt);
340: if (name.exists()) {
341: name.delete();
342: name.createNewFile();
343: }
344:
345: try {
346: fileSet[i] = new FileStore(name, i).open("rw");
347:
348: } catch (FileNotFoundException e) {
349: while (--i >= 0) {
350: fileSet[i].close();
351: fileSet[i] = null;
352: }
353:
354: throw e;
355: }
356:
357: }
358: currentStore = fileSet[0];
359:
360: }
361:
362: /**
363: * close the log files.
364: *
365: * @throws IOException
366: * If FileChannel.close() encounters an error.
367: * @see java.nio.channels.FileChannel#close()
368: */
369: void close() throws IOException, InterruptedException {
370:
371: if (fileSet == null)
372: return;
373:
374: // close the log files
375: for (int i = 0; i < fileSet.length; ++i) {
376: if (fileSet[i] != null)
377: fileSet[i].close();
378: }
379:
380: }
381:
382: }
|