0001: /**
0002: * Copyright Mobixess Inc. 2007
0003: */package com.mobixess.jodb.core.io;
0004:
0005: import java.io.File;
0006: import java.io.IOException;
0007: import java.io.PrintStream;
0008: import java.io.RandomAccessFile;
0009: import java.io.UnsupportedEncodingException;
0010: import java.lang.ref.WeakReference;
0011: import java.lang.reflect.Field;
0012: import java.net.URI;
0013: import java.nio.channels.FileLock;
0014: import java.rmi.RemoteException;
0015: import java.rmi.server.UnicastRemoteObject;
0016: import java.util.Iterator;
0017: import java.util.Vector;
0018: import java.util.WeakHashMap;
0019: import java.util.concurrent.TimeUnit;
0020: import java.util.concurrent.locks.ReentrantLock;
0021: import java.util.concurrent.locks.ReentrantReadWriteLock;
0022: import java.util.logging.Logger;
0023:
0024: import com.mobixess.jodb.core.IDatabaseStatistics;
0025: import com.mobixess.jodb.core.IPersistentObjectStatistics;
0026: import com.mobixess.jodb.core.IllegalClassTypeException;
0027: import com.mobixess.jodb.core.JODBConfig;
0028: import com.mobixess.jodb.core.JODBConstants;
0029: import com.mobixess.jodb.core.JodbIOException;
0030: import com.mobixess.jodb.core.index.JODBIndexingRootAgent;
0031: import com.mobixess.jodb.core.io.IRandomAccessBufferFactory.BUFFER_TYPE;
0032: import com.mobixess.jodb.core.io.rmi.IOTicketRemoteInterface;
0033: import com.mobixess.jodb.core.query.QueryNode;
0034: import com.mobixess.jodb.core.transaction.ITranslatedDataSorce;
0035: import com.mobixess.jodb.core.transaction.JODBQueryList;
0036: import com.mobixess.jodb.core.transaction.JODBSession;
0037: import com.mobixess.jodb.core.transaction.TransactionAssembler;
0038: import com.mobixess.jodb.core.transaction.TransactionContainer;
0039: import com.mobixess.jodb.core.transaction.TransactionUtils;
0040: import com.mobixess.jodb.util.Utils;
0041:
0042: /**
0043: * @author Mobixess
0044: *
0045: */
0046: public class JODBIOBase implements IOBase {
0047:
0048: public final static short ENTRY_EMPTY_ID = 0x0707;
0049: public final static short ENTRY_REDIRECTOR_ID = 0x0F70;
0050: public final static short ENTRY_OBJECT_ID = 0x00F7;
0051:
0052: public final static int LEN_MODIFIER_LONG = 1 << 15;
0053: public final static int LEN_MODIFIER_BYTE = 1 << 14;
0054: public final static int REDIRECTED_OBJECT_MODIFIER = 1 << 13;
0055:
0056: public final static int LEN_MODIFIER_EXCLUSION_MASK = ~(LEN_MODIFIER_LONG | LEN_MODIFIER_BYTE);
0057:
0058: private File _targetDBFile;
0059: private FileLock _fileLock;
0060: private RandomAccessFile _fileLockHolder;
0061: private Header _header;
0062: //private RandomAccessFile _randomAccessFile;
0063: private LiteralSubstitutions _substitutions;
0064: private Vector<DataEntry> _entries = new Vector<DataEntry>(2);
0065: private WeakHashMap<Object, IOTicketWRHolder> _ioBuffersCache = new WeakHashMap<Object, IOTicketWRHolder>();
0066: private final static short SUBST_TABLE_SIZE = 1024 * 10;
0067: //private final static short MAX_MEM_RANGES = 200;
0068: private Logger _logger = Utils.getLogger(getClass().getName() + " "
0069: + System.identityHashCode(this ));
0070: private boolean _newDatabase;
0071:
0072: private ReentrantReadWriteLock _ioLock = new ReentrantReadWriteLock();
0073:
0074: private ReentrantReadWriteLock _transactionLock = new ReentrantReadWriteLock();
0075:
0076: private DatabaseStatistics _databaseStatistics = new DatabaseStatistics();
0077:
0078: private static ThreadLocal<SearchKeysHolder> _threadLocalSearchKeys = new ThreadLocal<SearchKeysHolder>() {
0079: @Override
0080: protected SearchKeysHolder initialValue() {
0081: return new SearchKeysHolder();
0082: }
0083: };
0084:
0085: private static String[] STATIC_SUBST = new String[] { "java.lang" };
0086:
0087: private enum TRANSACTION_STATE {
0088: NONE, // no transactions
0089: DATA_COPY_START, //transaction data copy start
0090: ROLLBACK_DATA_COPY_START, //rollback data copy start
0091: TRANSACTION_IN_PROGRESS
0092: };//transaction in process
0093:
0094: /**
0095: * @throws IOException
0096: *
0097: */
0098: public JODBIOBase(File file) throws IOException {
0099: _targetDBFile = file;
0100: //IRandomAccessDataBuffer transactionBuffer = JODBConfig.getRandomAccessBufferFactory().createBuffer(file.getPath(), BUFFER_TYPE.MAIN, true);
0101: if (file.exists()) {
0102: lockFile(file);
0103: initExistingFile(file);
0104: } else {
0105: _newDatabase = true;
0106: initNewFile(file);
0107: lockFile(file);
0108: }
0109: }
0110:
0111: private void lockFile(File file) throws IOException {
0112: _fileLockHolder = new RandomAccessFile(file, "rw");
0113: _fileLock = _fileLockHolder.getChannel().tryLock(0, 1, false);
0114: if (_fileLock == null) {
0115: throw new JodbIOException("Unable to obtain file lock for "
0116: + file);
0117: }
0118: }
0119:
0120: public URI getDbIdentificator() {
0121: return _targetDBFile.toURI();
0122: }
0123:
0124: public ReentrantReadWriteLock getTransactionLock() {
0125: return _transactionLock;
0126: }
0127:
0128: public void close() throws IOException {
0129: if (_fileLock.isValid()) {
0130: _fileLock.release();
0131: }
0132: _fileLockHolder.close();
0133: _ioLock.writeLock().lock();//TODO set timeout
0134: try {
0135: synchronized (_ioBuffersCache) {
0136: Iterator<Object> iterator = _ioBuffersCache.keySet()
0137: .iterator();
0138: while (iterator.hasNext()) {
0139: IRandomAccessDataBuffer next = ((IOTicketImpl) iterator
0140: .next())._randomAccessBuffer;
0141: next.close();
0142: }
0143: _ioBuffersCache.clear();
0144: }
0145: _targetDBFile = null;
0146: } finally {
0147: _ioLock.writeLock().unlock();
0148: }
0149: }
0150:
0151: public boolean isNewDatabase() {
0152: return _newDatabase;
0153: }
0154:
0155: private void disposeIOTicket(IOTicketImpl ticket) {
0156: try {
0157: ticket._randomAccessBuffer.close();
0158: } catch (IOException e) {
0159: e.printStackTrace();//TODO print to log???
0160: }
0161: synchronized (_ioBuffersCache) {
0162: _ioBuffersCache.remove(ticket);
0163: }
0164: }
0165:
0166: public boolean isClosed() {
0167: return _targetDBFile == null;
0168: }
0169:
0170: private void initNewFile(File file) throws IOException {
0171: file.createNewFile();
0172: IRandomAccessDataBuffer transactionBuffer = JODBConfig
0173: .getRandomAccessBufferFactory().createBuffer(
0174: file.getPath(), BUFFER_TYPE.MAIN, true);// new RandomAccessFileTransactionBuffer(file);
0175: try {
0176: transactionBuffer.write(0);//skip lock byte
0177: _header = new Header(transactionBuffer, 1);
0178: _substitutions = new LiteralSubstitutions(
0179: transactionBuffer, STATIC_SUBST);
0180: DataEntry entry = new DataEntry();
0181: entry.writeWithAllSubstitutionRecords(transactionBuffer,
0182: null, 0, 0);
0183: _entries.add(entry);
0184: entry.enablePendingValues();
0185: _substitutions.enablePendingValues();
0186: } finally {
0187: transactionBuffer.close();
0188: }
0189: }
0190:
0191: private void initExistingFile(File file) throws IOException {
0192: //_randomAccessFile = new RandomAccessFile(file,"rws");
0193: IRandomAccessDataBuffer transactionBuffer = JODBConfig
0194: .getRandomAccessBufferFactory().createBuffer(
0195: file.getPath(), BUFFER_TYPE.MAIN, false);
0196: try {
0197: _header = new Header(transactionBuffer);
0198: if (_header.getTransactionState() != TRANSACTION_STATE.NONE) {
0199: handleTransactionInterrupt();
0200: }
0201: _substitutions = new LiteralSubstitutions(transactionBuffer);
0202: DataEntry entry;
0203: do {
0204: entry = new DataEntry();
0205: entry.read(transactionBuffer);
0206: _entries.add(entry);
0207: } while (!entry._lastEntry);
0208: } finally {
0209: transactionBuffer.close();
0210: }
0211: }
0212:
0213: private void handleTransactionInterrupt() throws IOException {
0214: IOTicketImpl ticket = getIOTicket(true, true);
0215: try {
0216: ticket.lock(true);
0217: handleTransactionInterrupt(ticket.getRandomAccessBuffer());
0218: } finally {
0219: ticket.unlock();
0220: ticket.close();
0221: }
0222: }
0223:
0224: public IDatabaseStatistics getDatabaseStatistics() {
0225: return _databaseStatistics;
0226: }
0227:
0228: private void handleTransactionInterrupt(
0229: IRandomAccessDataBuffer transactionBuffer) {
0230: try {
0231: if (_header._needRollbackExecution) {
0232: long offset = transactionBuffer.getCursorOffset();
0233: if (addrHasConfirmationBit(_header._rollbackDataEnd)) {
0234: runRollback(transactionBuffer);
0235: }
0236: long trancateTo = -1;
0237: if (addrHasConfirmationBit(_header._transDataStart)) {
0238: trancateTo = removeConfirmationBit(_header._transDataStart);
0239: }
0240: _header.setTransactionCompleteState(transactionBuffer);
0241: if (trancateTo != -1) {
0242: transactionBuffer.setLength(trancateTo);
0243: }
0244: transactionBuffer.seek(offset);
0245: }
0246: } catch (Exception e) {
0247: throw new Error(e);
0248: }
0249: }
0250:
0251: private void runRollback(IRandomAccessDataBuffer dataBuffer)
0252: throws IOException {
0253: long rollbackDataStart = removeConfirmationBit(_header._rollbackDataStart);
0254: dataBuffer.seek(rollbackDataStart);
0255: long rollbackDataEnd = removeConfirmationBit(_header._rollbackDataEnd);
0256: while (dataBuffer.getCursorOffset() < rollbackDataEnd) {
0257: long offset = dataBuffer.readLong();
0258: long length = dataBuffer.readLong();
0259: dataBuffer.transferFrom(dataBuffer.getChannel(), offset,
0260: length);
0261: }
0262: }
0263:
0264: public static boolean hasRedirectedObjectModifier(short id) {
0265: return (id & REDIRECTED_OBJECT_MODIFIER) != 0;
0266: }
0267:
0268: public static int addRedirectedObjectModifier(int id) {
0269: return (id | REDIRECTED_OBJECT_MODIFIER);
0270: }
0271:
0272: public long getFirstObjectOffset() {
0273: DataEntry dataEntry = _entries.elementAt(0);
0274: return dataEntry._dataOffset;
0275: }
0276:
0277: public long[] getForAllObjects(IOTicket ioTicket)
0278: throws IOException {
0279: Vector<Long> offsets = new Vector<Long>();
0280: DataEntry[] dataEntries = new DataEntry[_entries.size()];
0281: dataEntries = _entries.toArray(dataEntries);
0282: ObjectDataContainer objectDataContainer = TransactionUtils
0283: .getObjectDataContainerCache()
0284: .pullObjectDataContainer();
0285: try {
0286: IRandomAccessDataBuffer randomAccessDataBuffer = ioTicket
0287: .getRandomAccessBuffer();
0288: for (int i = 0; i < dataEntries.length; i++) {
0289: DataEntry dataEntry = dataEntries[i];
0290:
0291: //ioTicket.getRandomAccessBuffer().seek(dataEntry._dataOffset);
0292: long endOffset = dataEntry.getDataBlockEnd();
0293: long nextOffset = dataEntry._dataOffset;
0294: while (nextOffset < endOffset) {
0295: long currentOffset = nextOffset;
0296: objectDataContainer.reset();
0297: objectDataContainer.readHeader(
0298: randomAccessDataBuffer, currentOffset,
0299: false);
0300: nextOffset = objectDataContainer.getEndOffset();
0301:
0302: if (!(objectDataContainer.isDeleted() || objectDataContainer
0303: .isRedirectedObject())) {
0304: if (objectDataContainer.isRedirection()) {
0305: long redirectedObjectOffset = objectDataContainer
0306: .getRedirectionOffset();
0307: objectDataContainer.reset();
0308: objectDataContainer.readHeader(
0309: randomAccessDataBuffer,
0310: redirectedObjectOffset, false);
0311: }
0312: if (!objectDataContainer.isJodbAgentObject()
0313: && !objectDataContainer.isDeleted()) {
0314: offsets.addElement(currentOffset);
0315: }
0316: }
0317: //objectDataContainer.resetInputSourceToEnd(this);
0318: //ioTicket.getRandomAccessBuffer().seek(nextOffset);
0319: }
0320: }
0321: } finally {
0322: TransactionUtils.getObjectDataContainerCache()
0323: .pushObjectDataContainer(objectDataContainer);
0324: }
0325: long[] result = new long[offsets.size()];
0326: for (int i = 0; i < result.length; i++) {
0327: result[i] = offsets.elementAt(i).longValue();
0328: }
0329: return result;
0330: }
0331:
0332: public void applyTransaction(
0333: TransactionContainer transactionContainer,
0334: JODBSession session, IOTicket writeTicket,
0335: JODBIndexingRootAgent indexingRootAgent) throws IOException {
0336: if (isClosed()) {
0337: throw new JodbIOException("Container closed");
0338: }
0339: if (transactionContainer.isEmpty()) {
0340: return;
0341: }
0342: IOTicket readTicket = getIOTicket(true, false);
0343: boolean writeLocked = true;
0344: if (writeTicket == null) {
0345: writeTicket = getIOTicket(true, true);
0346: writeLocked = false;
0347: }
0348: transactionContainer.lockTransaction();
0349: JODBOperationContext context = new JODBOperationContext(
0350: session, readTicket, null, transactionContainer,
0351: indexingRootAgent);
0352: try {
0353: applyTransaction0(context, writeTicket, writeLocked);
0354: } catch (IOException e) {
0355: handleTransactionInterrupt(writeTicket
0356: .getRandomAccessBuffer());
0357: throw e;
0358: } catch (IllegalClassTypeException e) {
0359: //should never happen as exception would occur much earlier when object is set to container
0360: throw new JodbIOException(e);
0361: } finally {
0362: writeTicket.unlock();//write ticket may remain locked due to exception
0363: transactionContainer.reset();
0364: writeTicket.close();
0365: readTicket.close();
0366: }
0367: }
0368:
0369: public IPersistentObjectStatistics getPersistenceStatistics(
0370: long offset, JODBSession session) throws IOException {
0371: IOTicket ticket = getIOTicket(true, true);
0372: ObjectDataContainer objectDataContainer = TransactionUtils
0373: .getObjectDataContainerCache()
0374: .pullObjectDataContainer();
0375: try {
0376: ObjectStatImpl result = new ObjectStatImpl();
0377: ticket.getRandomAccessBuffer().seek(offset);
0378: objectDataContainer.readHeader(ticket, false);
0379: if (objectDataContainer.isDeleted()) {
0380: result._deleted = true;
0381: return result;
0382: }
0383: result._objectID = objectDataContainer.getOffset();
0384: result._totalSize = objectDataContainer.getTotalLength();
0385: result._bodySize = objectDataContainer.getBodyLength();
0386: if (objectDataContainer.isRedirection()) {
0387: result._totalRecords = 2;
0388: long redirectedOffset = objectDataContainer
0389: .getRedirectionOffset();
0390: objectDataContainer.reset();
0391: objectDataContainer.readObject(ticket
0392: .getRandomAccessBuffer(), this , session,
0393: redirectedOffset, true);
0394: result._totalSize += objectDataContainer
0395: .getTotalLength();
0396: } else {
0397: result._totalRecords = 1;
0398: }
0399: return result;
0400: } finally {
0401: TransactionUtils.getObjectDataContainerCache()
0402: .pushObjectDataContainer(objectDataContainer);
0403: ticket.close();
0404: }
0405: }
0406:
0407: public long getTransactionOffset() {
0408: DataEntry entry = _entries.lastElement();
0409: return entry._dataOffset + entry._dataBlockLength;
0410: }
0411:
0412: public void applyRemoteTransaction(
0413: ITranslatedDataSorce translatedDataSorce,
0414: long transactionOffset) throws IOException {
0415: IOTicket writeTicket = getIOTicket(true, true);
0416: writeTicket.lock(true);
0417: try {
0418: applyTransaction0(null, translatedDataSorce,
0419: transactionOffset, writeTicket);
0420: } catch (IOException e) {
0421: handleTransactionInterrupt(writeTicket
0422: .getRandomAccessBuffer());
0423: throw e;
0424: } finally {
0425: writeTicket.unlock();//write ticket may remain locked due to exception
0426: writeTicket.close();
0427: }
0428: }
0429:
0430: private void applyTransaction0(JODBOperationContext context,
0431: IOTicket writeTicket, boolean writeLocked)
0432: throws IOException, IllegalClassTypeException {
0433: //local transaction only
0434: TransactionContainer transactionContainer = context
0435: .getTransactionContainer();
0436: IOTicket readTicket = context.getIoTicket();
0437: if (JODBConfig.DEBUG) {
0438: _logger
0439: .info(" ******************* Transaction start for container="
0440: + System
0441: .identityHashCode(transactionContainer));
0442: }
0443: //JODBSession session, IOTicket readTicket, IOTicket writeTicket
0444: //JODBOperationContext context = new JODBOperationContext(session,readTicket,null,transactionContainer);
0445: long transactionOffset = getTransactionOffset();
0446: context.setTransactionOffset(transactionOffset);
0447: try {
0448: readTicket.lock(false, 0);//for transaction assembling we need only read lock
0449: TransactionAssembler.assembleTransactionData(context,
0450: transactionContainer);
0451: } finally {
0452: readTicket.unlock();
0453: }
0454: //transaction lock engaged earlier so no one will ask for write lock concurently
0455: ////////////////////// WRITE LOCK PHASE ////////////////////
0456: if (!writeLocked) {
0457: writeTicket.lock(true, 0);
0458: }
0459:
0460: transactionContainer.processTranslatedObjectsIndex(context,
0461: transactionOffset);
0462: transactionContainer.resetTransactionBufferToEnd();
0463: try {
0464: transactionContainer.enableAgentMode();
0465: TransactionAssembler.assembleTransactionData(context,
0466: transactionContainer);
0467: } finally {
0468: transactionContainer.disableAgentMode();
0469: }
0470:
0471: applyTransaction0(context.getSession(), transactionContainer,
0472: transactionOffset, writeTicket);
0473: }
0474:
0475: private void applyTransaction0(JODBSession session,
0476: ITranslatedDataSorce translatedDataSorce,
0477: long transactionOffset, IOTicket writeTicket)
0478: throws IOException {
0479:
0480: if (!_transactionLock.isWriteLockedByCurrentThread()) {
0481: throw new JodbIOException(
0482: "Transaction without holding write lock");
0483: }
0484:
0485: IRandomAccessDataBuffer mainDataBuffer = writeTicket
0486: .getRandomAccessBuffer();
0487:
0488: IRandomAccessDataBuffer newDataBuffer = translatedDataSorce
0489: .getTransactionNewDataFile();
0490: IRandomAccessDataBuffer replacementsDataBuffer = translatedDataSorce
0491: .getTransactionReplacementsDataFile();
0492: IRandomAccessDataBuffer rollbackDataBuffer = translatedDataSorce
0493: .getRollbackDataFile();
0494:
0495: long dataBlockIncrease = newDataBuffer.length();
0496:
0497: _substitutions.backupPendingSubstitutions();
0498: DataEntry entry = _entries.lastElement();
0499:
0500: if (_substitutions.hasPendingSubstitutionTables()) {
0501: translatedDataSorce.resetTransactionBufferToEnd();
0502: //writing substitution table
0503: replacementsDataBuffer.resetToEnd();
0504: SubstTable substTable = entry.getLiteralSubstTable();
0505: replacementsDataBuffer
0506: .write(TransactionAssembler.TRANSACTION_REPLACEMENT_ENTRY_TYPE_STATIC);
0507: replacementsDataBuffer.writeLong(substTable._tableOffset);//offset of table in main file
0508: long lengthEntryOffset = replacementsDataBuffer
0509: .getCursorOffset();
0510: replacementsDataBuffer.writeLong(0);//reserved space for length
0511: substTable.writePendingRecords(mainDataBuffer,
0512: replacementsDataBuffer);
0513: //replacement length
0514: long substTableLen = replacementsDataBuffer
0515: .getCursorOffset()
0516: - lengthEntryOffset - 8;//8 bytes for len of entry
0517: replacementsDataBuffer.seek(lengthEntryOffset);
0518: replacementsDataBuffer.writeLong(substTableLen);
0519: }
0520:
0521: DataEntry pendingDataEntry = null;
0522: if (_substitutions.hasPendingSubstitutionTables()) {//last entry didn't accept all substitutions, seel current entry and create new one
0523: replacementsDataBuffer.resetToEnd();
0524: pendingDataEntry = new DataEntry();
0525: newDataBuffer.resetToEnd();
0526: pendingDataEntry.writeWithAllSubstitutionRecords(
0527: newDataBuffer, null, 0, transactionOffset);
0528: }
0529:
0530: if (dataBlockIncrease > 0) {
0531: //write replacement for data entry length
0532: replacementsDataBuffer.resetToEnd();
0533: replacementsDataBuffer
0534: .writeByte(TransactionAssembler.TRANSACTION_REPLACEMENT_ENTRY_TYPE_STATIC);
0535: replacementsDataBuffer.writeLong(entry._tableOffset);
0536: long lengthEntryOffset = replacementsDataBuffer
0537: .getCursorOffset();
0538: replacementsDataBuffer.writeLong(0);//reserved space for length
0539: entry.write(replacementsDataBuffer, entry._dataBlockLength
0540: + dataBlockIncrease, pendingDataEntry != null);//write as sealed if pending data entry exists
0541: long len = replacementsDataBuffer.getCursorOffset()
0542: - lengthEntryOffset - 8;//8 bytes for len of entry
0543: //replacement length
0544: replacementsDataBuffer.seek(lengthEntryOffset);
0545: replacementsDataBuffer.writeLong(len);
0546: //..
0547: }
0548: translatedDataSorce.resetTransactionBufferToStart();
0549: if (JODBConfig.DEBUG) {
0550: _logger.info("File size prior transaction="
0551: + mainDataBuffer.length());
0552: _logger.info("Transaction state offset="
0553: + transactionOffset + " new data len="
0554: + newDataBuffer.length() + " replacement data="
0555: + replacementsDataBuffer.length()
0556: + " rollback data" + rollbackDataBuffer.length());
0557: }
0558: long transactionDataEnd = transactionOffset
0559: + newDataBuffer.length();
0560: _header.setDataTransactionState(mainDataBuffer,
0561: transactionOffset, transactionDataEnd);
0562: mainDataBuffer.seek(transactionOffset);
0563: long toTransfer = newDataBuffer.length();
0564: long transfered = mainDataBuffer.transferFrom(newDataBuffer
0565: .getChannel(), transactionOffset, toTransfer);
0566: if (toTransfer != transfered) {
0567: throw new JodbIOException(
0568: "unable to transfer transaction data " + transfered
0569: + "<>" + toTransfer);
0570: }
0571:
0572: mainDataBuffer.seek(mainDataBuffer.getCursorOffset()
0573: + transfered);
0574:
0575: long rollbackTransactionStartOffset = mainDataBuffer
0576: .getCursorOffset();
0577:
0578: long backupEnd = backup(mainDataBuffer,
0579: rollbackTransactionStartOffset, replacementsDataBuffer);
0580:
0581: _header.setRollbackTransactionState(mainDataBuffer,
0582: rollbackTransactionStartOffset, backupEnd);
0583:
0584: _header.setTransactionInProgressState(mainDataBuffer);
0585:
0586: translatedDataSorce.resetTransactionBufferToStart();
0587:
0588: while (replacementsDataBuffer.getCursorOffset() < replacementsDataBuffer
0589: .length()) {
0590: byte replacementType = replacementsDataBuffer.readByte();
0591: long targetOffset = replacementsDataBuffer.readLong();
0592: switch (replacementType) {
0593: case TransactionAssembler.TRANSACTION_REPLACEMENT_ENTRY_TYPE_STATIC:
0594: long dataLength = replacementsDataBuffer.readLong();
0595: transfered = mainDataBuffer.transferFrom(
0596: replacementsDataBuffer.getChannel(),
0597: targetOffset, dataLength);
0598: if (transfered != dataLength) {
0599: throw new JodbIOException(
0600: "unable to transfer rollback data "
0601: + transfered + "<>" + toTransfer);
0602: }
0603: break;
0604: case TransactionAssembler.TRANSACTION_REPLACEMENT_ENTRY_TYPE_REDIRECTOR:
0605: mainDataBuffer.seek(targetOffset);
0606: short id = mainDataBuffer.readShort();
0607: int lengthID = id & ~LEN_MODIFIER_EXCLUSION_MASK;
0608: mainDataBuffer.seek(targetOffset);
0609: mainDataBuffer.writeShort(lengthID
0610: | ENTRY_REDIRECTOR_ID);
0611: mainDataBuffer
0612: .skipBytes(totalBytesForLengthID(lengthID));//skiping length part, as it remains the same
0613: long absoluteAddress = transactionOffset
0614: + replacementsDataBuffer.readLong();
0615: mainDataBuffer.writeLong(absoluteAddress);
0616: break;
0617: }
0618: }
0619: translatedDataSorce.resetTranslatedObjects(session,
0620: transactionOffset);
0621: entry.enablePendingValues();
0622: _substitutions.enablePendingValues();
0623: if (pendingDataEntry != null) {
0624: _entries.add(pendingDataEntry);
0625: }
0626: _header.setTransactionCompleteState(mainDataBuffer);
0627: if (JODBConfig.DEBUG) {
0628: _logger.info("File size after transaction="
0629: + mainDataBuffer.length());
0630: }
0631: writeTicket.unlock();
0632:
0633: if (JODBConfig.DEBUG) {
0634: _logger
0635: .info(" ******************* Transaction end for container="
0636: + System
0637: .identityHashCode(translatedDataSorce));
0638: }
0639:
0640: }
0641:
0642: /**
0643: *
0644: * @param mainBuffer
0645: * @param backupOffset
0646: * @param replacementsDataBuffer
0647: * @return end offset
0648: * @throws IOException
0649: */
0650: private long backup(IRandomAccessDataBuffer mainBuffer,
0651: long backupOffset,
0652: IRandomAccessDataBuffer replacementsDataBuffer)
0653: throws IOException {
0654: replacementsDataBuffer.resetToStart();
0655: long mainBufferBackupPosition = backupOffset;
0656: while (replacementsDataBuffer.getCursorOffset() < replacementsDataBuffer
0657: .length()) {
0658: byte replacementType = replacementsDataBuffer.readByte();
0659: long targetOffset = replacementsDataBuffer.readLong();
0660: //mainBuffer.seek(targetOffset);
0661: long dataLength;
0662: switch (replacementType) {
0663: case TransactionAssembler.TRANSACTION_REPLACEMENT_ENTRY_TYPE_STATIC:
0664: dataLength = replacementsDataBuffer.readLong();
0665: replacementsDataBuffer.skip(dataLength);
0666: break;
0667: case TransactionAssembler.TRANSACTION_REPLACEMENT_ENTRY_TYPE_REDIRECTOR:
0668: mainBuffer.seek(targetOffset);
0669: long startOffset = mainBuffer.getCursorOffset();
0670: short id = mainBuffer.readShort();
0671: int lengthID = id & ~LEN_MODIFIER_EXCLUSION_MASK;
0672: int bytesRepresentingLength = totalBytesForLengthID(lengthID);//skiping length part, as it remains the same
0673: dataLength = mainBuffer.getCursorOffset() - startOffset
0674: + bytesRepresentingLength + 8;//8 bytes for redirection offset entry
0675: mainBuffer.seek(targetOffset);
0676: replacementsDataBuffer.skip(8);//skip relative address
0677: break;
0678: default:
0679: throw new JodbIOException("illegal replacement type "
0680: + replacementType);
0681: }
0682: mainBuffer.seek(mainBufferBackupPosition);
0683: mainBuffer.writeLong(targetOffset);//rollback entry target offset
0684: mainBuffer.writeLong(dataLength);
0685: mainBufferBackupPosition = mainBuffer.getCursorOffset();
0686: mainBuffer.seek(targetOffset);//seek the offset of data to backup
0687: mainBuffer.transferFrom(mainBuffer.getChannel(),
0688: mainBufferBackupPosition, dataLength);
0689: mainBufferBackupPosition += dataLength;
0690: }
0691: return mainBufferBackupPosition;
0692: }
0693:
0694: private int totalBytesForLengthID(int lenID) {
0695: switch (lenID) {
0696: case LEN_MODIFIER_BYTE:
0697: return 1;//byte
0698: case LEN_MODIFIER_LONG:
0699: return 8;//long
0700: default:
0701: return 2;//short
0702: }
0703: }
0704:
0705: public void printFileMap(JODBSession session,
0706: PrintStream printStream) throws IOException {
0707: IOTicket ticket = getIOTicket(true, false);
0708: printStream.println(" --> Map for " + _targetDBFile
0709: + " total length="
0710: + ticket.getRandomAccessBuffer().length());
0711: printStream.println("Header: offset" + _header._dataOffset
0712: + " state=" + _header._transactionState);
0713: printStream.println("Static substitutions: start offset="
0714: + _substitutions._staticTableStartOffset + " length="
0715: + _substitutions._staticTableLength);
0716: for (int i = 0; i < _entries.size(); i++) {
0717: DataEntry entry = _entries.elementAt(i);
0718: printStream.println("Data entry: table offset="
0719: + entry._tableOffset + " data offset="
0720: + entry._dataOffset + " entry end="
0721: + entry.getDataBlockEnd() + " dataBlockLength="
0722: + entry._dataBlockLength
0723: + " pendingDataBlockLength="
0724: + entry._pendingDataBlockLength);
0725: SubstTable substTable = entry.getLiteralSubstTable();
0726: printStream.println("Subst Table Start="
0727: + substTable._tableOffset + " bytes in use"
0728: + substTable._bytesInUse + " "
0729: + substTable._totalEntries);
0730: printMapForDataEntry(session, entry, printStream);
0731: }
0732: ticket.close();
0733: }
0734:
0735: private void printMapForDataEntry(JODBSession session,
0736: DataEntry dataEntry, PrintStream printStream)
0737: throws IOException {
0738: IOTicket ioTicket = getIOTicket(true, false);
0739: DataEntry[] dataEntries = new DataEntry[_entries.size()];
0740: dataEntries = _entries.toArray(dataEntries);
0741: ObjectDataContainer objectDataContainer = new ObjectDataContainer();
0742: IRandomAccessDataBuffer randomAccessDataBuffer = ioTicket
0743: .getRandomAccessBuffer();
0744: ioTicket.getRandomAccessBuffer().seek(dataEntry._dataOffset);
0745: long endOffset = dataEntry.getDataBlockEnd();
0746: while (randomAccessDataBuffer.getCursorOffset() < endOffset) {
0747: objectDataContainer.reset();
0748: //JODBIOUtils.readObject(ioTicket,this, objectDataContainer, false);
0749: objectDataContainer.printContent(ioTicket
0750: .getRandomAccessBuffer(), ioTicket.getBase(),
0751: session, ioTicket.getRandomAccessBuffer()
0752: .getCursorOffset(), false, printStream);
0753: objectDataContainer.resetInputSourceToEnd(null);
0754: }
0755: ioTicket.close();
0756: }
0757:
0758: private final long addConfirmationBitToAddress(long address)
0759: throws IOException {
0760: return address | (1L << 63);
0761: }
0762:
0763: private final void verifyAddress(long address) throws IOException {
0764: if ((address & ~(0xFFL << 56)) != address) {//what is the purpose of this check
0765: throw new JodbIOException("address verification failed");
0766: }
0767: }
0768:
0769: private final boolean addrHasConfirmationBit(long address) {
0770: return (address & (1L << 63)) != 0;
0771: }
0772:
0773: private final long removeConfirmationBit(long address) {
0774: return (address & ~(1L << 63));
0775: }
0776:
0777: /*package*/Vector<DataEntry> getDataEntries() {
0778: return _entries;
0779: }
0780:
0781: public int getClassTypeSubstitutionID(String classType) {
0782: return _substitutions.getClassTypeSubstitutionID(classType);
0783: }
0784:
0785: public final int getOrSetClassTypeSubstitutionID(Class clazz) {
0786: return getOrSetClassTypeSubstitutionID(clazz.getName());
0787: }
0788:
0789: public final int getOrSetClassTypeSubstitutionID(String classType) {
0790: return _substitutions
0791: .getOrSetClassTypeSubstitutionID(classType);
0792: }
0793:
0794: public int getOrSetFieldSubstitutionID(Field field) {
0795: int declaringClassID = getOrSetClassTypeSubstitutionID(field
0796: .getDeclaringClass().getName());
0797: int fieldTypeID = getOrSetClassTypeSubstitutionID(field
0798: .getType().getName());
0799: return getOrSetFieldSubstitutionID(declaringClassID,
0800: fieldTypeID, field.getName());
0801: }
0802:
0803: public int getOrSetFieldSubstitutionID(int declaringClassID,
0804: int fieldTypeID, String fieldName) {
0805: return _substitutions.getOrSetFieldSubstitutionID(""
0806: + (char) declaringClassID + (char) fieldTypeID
0807: + fieldName);
0808: }
0809:
0810: public int getFieldSubstitutionID(Field field) {
0811: int declaringClassID = getClassTypeSubstitutionID(field
0812: .getDeclaringClass().getName());
0813: if (declaringClassID == -1) {
0814: return -1;
0815: }
0816: int fieldTypeID = getClassTypeSubstitutionID(field.getType()
0817: .getName());
0818: if (fieldTypeID == -1) {
0819: return -1;
0820: }
0821: int result = _substitutions.getFieldSubstitutionID(""
0822: + (char) declaringClassID + (char) fieldTypeID
0823: + field.getName());
0824: return result;
0825: }
0826:
0827: // private int getOrSetFieldSubstitutionID(String fieldName){
0828: // return _substitutions.getOrSetFieldSubstitutionID(fieldName);
0829: // }
0830:
0831: public String getFullFieldNameForID(int id) {
0832: return _substitutions.getFieldNameForID(id);
0833: }
0834:
0835: public String getSimpleFieldNameForID(int id) {
0836: String fullName = _substitutions.getFieldNameForID(id);
0837: return fullName.substring(2);
0838: }
0839:
0840: public String getClassTypeForID(int id) {
0841: return _substitutions.getClassTypeForID(id);
0842: }
0843:
0844: public String getPrefixForID(int id) {
0845: return _substitutions.getPrefixForID(id);
0846: }
0847:
0848: public IOTicketImpl getIOTicket(boolean read, boolean write)
0849: throws IOException {
0850: return getIOTicket(read, write, false);
0851: }
0852:
0853: public IOTicketImpl getIOTicket(boolean read, boolean write,
0854: boolean remote) throws IOException {
0855: if (isClosed()) {
0856: throw new JodbIOException("Container closed");
0857: }
0858: IOTicketImpl result = remote ? new RemoteIOTicketImpl(
0859: _targetDBFile, read, write) : new IOTicketImpl(
0860: _targetDBFile, read, write);
0861: _ioBuffersCache.put(result, new IOTicketWRHolder(result));
0862: return result;
0863: }
0864:
0865: public IOTicketImpl findTicket(int identityHash) {
0866: TicketSearchKey ticketSearchKey = _threadLocalSearchKeys.get()._ticketSearchObject;
0867: ticketSearchKey.init(identityHash);
0868: IOTicketWRHolder ticketWRHolder = _ioBuffersCache
0869: .get(ticketSearchKey);
0870: return ticketWRHolder._weakReference.get();
0871: }
0872:
0873: public JODBQueryList executeQuery(QueryNode query)
0874: throws IOException, IllegalClassTypeException {
0875: return query.runQuery();
0876: }
0877:
0878: /*
0879: public class EntryOffsetsIterator implements Iterator<Long>{
0880:
0881: private LinkedList<Long> _list = new LinkedList<Long>();
0882: private int _index = 0;
0883:
0884: private void addElement(long value){
0885: _list.add(value);
0886: }
0887:
0888: public boolean hasNext() {
0889: return _index < _list.size();
0890: }
0891:
0892: public Long next() {
0893: return _list.get(_index++);
0894: }
0895:
0896: public void remove() {
0897: throw new RuntimeException("Not Implemented");
0898: }
0899:
0900: }
0901: */
0902: class IOTicketImpl implements IOTicket {
0903:
0904: /**
0905: *
0906: */
0907: private static final long serialVersionUID = 1L;
0908: private ReentrantLock _lock = new ReentrantLock(false);
0909: private IRandomAccessDataBuffer _randomAccessBuffer;
0910: private boolean _holdsGlobalReadLock;
0911: private boolean _holdsGlobalWriteLock;
0912: boolean _closed;
0913:
0914: // private final boolean _keepConstructionTrace = false;
0915: // private Throwable _constructionDebugElement;
0916: // private boolean _closed;
0917:
0918: /**
0919: * @throws IOException
0920: *
0921: */
0922: private IOTicketImpl(File file, boolean read, boolean write)
0923: throws IOException {
0924: // if(_keepConstructionTrace){
0925: // _constructionDebugElement = new RuntimeException("unclosed ticket");
0926: // }
0927: _randomAccessBuffer = JODBConfig
0928: .getRandomAccessBufferFactory().createBuffer(
0929: file.getPath(), BUFFER_TYPE.MAIN, write);
0930: }
0931:
0932: public void lock(boolean write) throws IOException {
0933: lock(write, _randomAccessBuffer.getCursorOffset());
0934: }
0935:
0936: public void lock(boolean write, long offset) throws IOException {
0937: if (_lock.isHeldByCurrentThread()) {
0938: throw new JodbIOException(
0939: "recursive lock requests are not allowed");
0940: }
0941: try {
0942: if (write) {
0943: _ioLock.writeLock().tryLock(
0944: JODBConfig.getMaxWriteWait(),
0945: TimeUnit.MILLISECONDS);
0946: _holdsGlobalWriteLock = true;
0947: } else {
0948: _ioLock.readLock().tryLock(
0949: JODBConfig.getMaxReadWait(),
0950: TimeUnit.MILLISECONDS);
0951: _holdsGlobalReadLock = true;
0952: }
0953: _lock.tryLock(write ? JODBConfig.getMaxWriteWait()
0954: : JODBConfig.getMaxReadWait(),
0955: TimeUnit.MILLISECONDS);
0956: _randomAccessBuffer.seek(offset);
0957: } catch (InterruptedException e) {
0958: unlock();
0959: e.printStackTrace();
0960: throw new JodbIOException(e);
0961: }
0962: }
0963:
0964: public void unlock() {
0965: if (_lock.isHeldByCurrentThread()) {
0966: _lock.unlock();
0967: }
0968: if (_holdsGlobalReadLock) {
0969: _ioLock.readLock().unlock();
0970: _holdsGlobalReadLock = false;
0971: }
0972: if (_holdsGlobalWriteLock) {
0973: if (_ioLock.isWriteLockedByCurrentThread()) {
0974: _ioLock.writeLock().unlock();
0975: }
0976: _holdsGlobalWriteLock = false;
0977: }
0978: }
0979:
0980: public IOBase getBase() {
0981: return JODBIOBase.this ;
0982: }
0983:
0984: public IRandomAccessDataBuffer getRandomAccessBuffer() {
0985: return _randomAccessBuffer;
0986: }
0987:
0988: public synchronized void close() throws IOException {
0989: if (_closed) {
0990: return;
0991: }
0992: disposeIOTicket(this );
0993: _closed = true;
0994: }
0995:
0996: @Override
0997: protected void finalize() throws Throwable {
0998: close();
0999: }
1000:
1001: @Override
1002: public boolean equals(Object obj) {
1003: if (obj instanceof TicketSearchKey) {
1004: return hashCode() == ((TicketSearchKey) obj)._hashCode;
1005: }
1006: return super .equals(obj);
1007: }
1008:
1009: public int getTicketIdentity() throws RemoteException {
1010: throw new RuntimeException("Not Implemented");
1011: }
1012: }
1013:
1014: private class RemoteIOTicketImpl extends IOTicketImpl implements
1015: IOTicketRemoteInterface {
1016:
1017: private IRandomAccessDataBuffer _localDataBuffer;
1018: private final int MAX_SIZE_FOR_TRANSFER = 10 * 1024;
1019:
1020: public RemoteIOTicketImpl(File file, boolean read, boolean write)
1021: throws IOException {
1022: super (file, read, write);
1023: _localDataBuffer = JODBConfig
1024: .getRandomAccessBufferFactory().createBuffer("",
1025: BUFFER_TYPE.SERVER_TEMP_BUFFER, true);
1026: UnicastRemoteObject.exportObject(this ,
1027: JODBConstants.DEFAULT_RMI_PORT);
1028: }
1029:
1030: public int fetchRecord(long offset) throws IOException {
1031: lock(false);
1032: ObjectDataContainer objectDataContainer = TransactionUtils
1033: .getObjectDataContainerCache()
1034: .pullObjectDataContainer();
1035: try {
1036: _localDataBuffer.setLength(0);
1037: IRandomAccessDataBuffer dataBuffer = getRandomAccessBuffer();
1038: objectDataContainer.readHeader(dataBuffer, offset,
1039: false);
1040: dataBuffer.transferTo(offset, objectDataContainer
1041: .getTotalLength(), _localDataBuffer
1042: .getChannel());
1043: return (int) objectDataContainer.getTotalLength();//TODO type safety
1044: } finally {
1045: unlock();
1046: TransactionUtils.getObjectDataContainerCache()
1047: .pushObjectDataContainer(objectDataContainer);
1048: }
1049: }
1050:
1051: public byte[] getRecordData(long offsetInRecord, int len)
1052: throws IOException {
1053: byte[] result = new byte[Math.min(MAX_SIZE_FOR_TRANSFER,
1054: len)];
1055: _localDataBuffer.seek(offsetInRecord);
1056: _localDataBuffer.read(result, 0, result.length);
1057: return result;
1058: }
1059:
1060: public int getTicketIdentity() throws RemoteException {
1061: return System.identityHashCode(this );
1062: }
1063:
1064: @Override
1065: public synchronized void close() throws IOException {
1066: if (_closed) {
1067: return;
1068: }
1069: try {
1070: UnicastRemoteObject.unexportObject(this , false);
1071: } catch (Exception e) {
1072: e.printStackTrace();//TODO log
1073: }
1074: _localDataBuffer.close();
1075: _localDataBuffer.delete();
1076: super .close();
1077: }
1078:
1079: }
1080:
1081: private class Header {
1082: int _version;
1083: //long _uidCounter;
1084: long _indexFileWerificationNumber;
1085: TRANSACTION_STATE _transactionState;
1086: long _transDataStart;
1087: long _transDataEnd;
1088: long _rollbackDataStart;
1089: long _rollbackDataEnd;
1090: long _dataOffset;
1091: long _transactionDataEntryOffset;
1092: boolean _needRollbackExecution;
1093:
1094: /**
1095: * @throws IOException
1096: *
1097: */
1098: public Header(IRandomAccessDataBuffer file) throws IOException {
1099: read(file);
1100: }
1101:
1102: /**
1103: * @throws IOException
1104: *
1105: */
1106: public Header(IRandomAccessDataBuffer file, int version)
1107: throws IOException {
1108: create(file, version);
1109: }
1110:
1111: public void read(IRandomAccessDataBuffer file)
1112: throws IOException {
1113: if (JODBConfig.DEBUG) {
1114: _logger.info("Header start=" + file.getCursorOffset());
1115: }
1116: file.seek(1);//skip locked byte
1117: file.readUTF();
1118: _version = file.readInt();
1119: _dataOffset = file.getCursorOffset();
1120: _transactionState = TRANSACTION_STATE.NONE;
1121: _indexFileWerificationNumber = file.readLong();
1122: _transactionDataEntryOffset = file.getCursorOffset();
1123: _transDataStart = file.readLong();
1124: boolean needTransactionOffsetsReset = false;
1125: if (addrHasConfirmationBit(_transDataStart)) {
1126: _transactionState = TRANSACTION_STATE.DATA_COPY_START;
1127: _needRollbackExecution = true;
1128: }
1129: _transDataEnd = file.readLong();
1130: if (addrHasConfirmationBit(_transDataEnd)) {
1131: if (!_needRollbackExecution) {
1132: needTransactionOffsetsReset = true;
1133: }
1134: }
1135: _rollbackDataStart = file.readLong();
1136: if (addrHasConfirmationBit(_rollbackDataStart)) {
1137: if (!_needRollbackExecution) {
1138: needTransactionOffsetsReset = true;
1139: } else {
1140: _transactionState = TRANSACTION_STATE.ROLLBACK_DATA_COPY_START;
1141: }
1142: }
1143: _rollbackDataEnd = file.readLong();
1144: if (addrHasConfirmationBit(_rollbackDataEnd)) {
1145: if (!_needRollbackExecution) {
1146: needTransactionOffsetsReset = true;
1147: } else {
1148: _transactionState = TRANSACTION_STATE.TRANSACTION_IN_PROGRESS;
1149: }
1150: }
1151: if (JODBConfig.DEBUG) {
1152: _logger.info("Header end=" + file.getCursorOffset());
1153: }
1154: if (needTransactionOffsetsReset) {
1155: resetTransactionOffsets();
1156: }
1157: }
1158:
1159: private void resetTransactionOffsets() throws IOException {
1160: IOTicket ticket = getIOTicket(true, true);
1161: try {
1162: setTransactionCompleteState(ticket
1163: .getRandomAccessBuffer());
1164: } finally {
1165: ticket.close();
1166: }
1167: }
1168:
1169: public void create(IRandomAccessDataBuffer file, int version)
1170: throws IOException {
1171: file.writeUTF("JODB");
1172: file.writeInt(version);
1173: _dataOffset = file.getCursorOffset();
1174: //file.writeLong(0);//counter
1175: file.writeLong(0);//index verif number
1176: _transactionDataEntryOffset = file.getCursorOffset();
1177: file.writeLong(0);//trans start
1178: file.writeLong(0);//trans end
1179: file.writeLong(0);//rollback start
1180: file.writeLong(0);//rollback end
1181: }
1182:
1183: public TRANSACTION_STATE getTransactionState() {
1184: return _transactionState;
1185: }
1186:
1187: public void setDataTransactionState(
1188: IRandomAccessDataBuffer file, long startOffset,
1189: long endOffset) throws IOException {
1190: verifyAddress(startOffset);
1191: verifyAddress(endOffset);
1192: file.seek(_transactionDataEntryOffset);
1193: file.writeLong(startOffset);//write without confirmation bit
1194: file.writeLong(endOffset);
1195: //make sure the rollback offsets have validity bits, could be excessive though
1196: _rollbackDataStart = removeConfirmationBit(_rollbackDataStart);
1197: _rollbackDataEnd = removeConfirmationBit(_rollbackDataEnd);
1198: file.writeLong(_rollbackDataStart);
1199: file.writeLong(_rollbackDataEnd);
1200: //
1201: file.seek(_transactionDataEntryOffset);
1202: _transDataStart = addConfirmationBitToAddress(startOffset);
1203: _needRollbackExecution = true;
1204: _transDataEnd = endOffset;
1205: _needRollbackExecution = true;
1206: file.writeLong(_transDataStart);//write with confirmation bit
1207: _transactionState = TRANSACTION_STATE.DATA_COPY_START;
1208: }
1209:
1210: public void setRollbackTransactionState(
1211: IRandomAccessDataBuffer file, long startOffset,
1212: long endOffset) throws IOException {
1213: verifyAddress(startOffset);
1214: verifyAddress(endOffset);
1215: file.seek(_transactionDataEntryOffset + 8);
1216: _transDataEnd = addConfirmationBitToAddress(_transDataEnd);
1217: file.writeLong(_transDataEnd);//write confirmation that transaction copy complete
1218: file.writeLong(startOffset);//write without confirmation bit
1219: file.writeLong(endOffset);
1220: file.seek(_transactionDataEntryOffset + 16);
1221: _rollbackDataStart = addConfirmationBitToAddress(startOffset);
1222: file.writeLong(_rollbackDataStart);//write with confirmation bit
1223: _rollbackDataEnd = endOffset;
1224: _transactionState = TRANSACTION_STATE.ROLLBACK_DATA_COPY_START;
1225: }
1226:
1227: public void setTransactionInProgressState(
1228: IRandomAccessDataBuffer file) throws IOException {
1229: file.seek(_transactionDataEntryOffset + 24);
1230: _rollbackDataEnd = addConfirmationBitToAddress(_rollbackDataEnd);
1231: file.writeLong(_rollbackDataEnd);
1232: _transactionState = TRANSACTION_STATE.TRANSACTION_IN_PROGRESS;
1233: }
1234:
1235: public void setTransactionCompleteState(
1236: IRandomAccessDataBuffer file) throws IOException {
1237: file.seek(_transactionDataEntryOffset);
1238: long transDataStart = removeConfirmationBit(_transDataStart);
1239: long transDataEnd = removeConfirmationBit(_transDataEnd);
1240: long rollbackDataStart = removeConfirmationBit(_rollbackDataStart);
1241: long rollbackDataEnd = removeConfirmationBit(_rollbackDataEnd);
1242:
1243: file.writeLong(transDataStart);
1244: _transDataStart = transDataStart;
1245: file.writeLong(transDataEnd);
1246: _transDataEnd = transDataEnd;
1247: file.writeLong(rollbackDataStart);
1248: _rollbackDataStart = rollbackDataStart;
1249: file.writeLong(rollbackDataEnd);
1250: _rollbackDataEnd = rollbackDataEnd;
1251:
1252: _transactionState = TRANSACTION_STATE.NONE;
1253: _needRollbackExecution = false;
1254: }
1255:
1256: // public void saveUIDCounter(IRandomAccessDataBuffer file) throws IOException{
1257: // file.seek(_dataOffset);
1258: // file.writeLong(_uidCounter);
1259: // }
1260: }
1261:
1262: static enum LITERAL_SUBSTITUTION_TYPE {
1263: EMPTY, PREFFIX, TYPE, TYPE_WITH_PREFFIX, FIELD
1264: };
1265:
1266: private class LiteralSubstitutions {
1267:
1268: //private String EMPTY_STUB = "";
1269: private Vector<String> _prefixes = new Vector<String>();
1270: private Vector<String> _types = new Vector<String>();
1271: private Vector<String> _fields = new Vector<String>();
1272: private Vector<SubstitutionRecord> _pendingRecords = new Vector<SubstitutionRecord>();
1273: private Vector<SubstitutionRecord> _pendingRecordsBackup = new Vector<SubstitutionRecord>();
1274: private Vector<SubstTable> _substTables = new Vector<SubstTable>();
1275: private Vector<SubstTable> _pendingSubstTables = new Vector<SubstTable>();
1276: private Vector<String> _unresolvedTypes = new Vector<String>();
1277: private long _staticTableStartOffset;
1278: private long _staticTableLength;
1279:
1280: /**
1281: * @throws IOException
1282: *
1283: */
1284: private LiteralSubstitutions(IRandomAccessDataBuffer file)
1285: throws IOException {
1286: readStaticTable(file);
1287: }
1288:
1289: /**
1290: * @throws IOException
1291: *
1292: */
1293: private LiteralSubstitutions(IRandomAccessDataBuffer file,
1294: String[] staticSubstitutions) throws IOException {
1295: createStaticTableEntry(file, staticSubstitutions);
1296: }
1297:
1298: private void createStaticTableEntry(
1299: IRandomAccessDataBuffer file,
1300: String[] staticSubstitutions) throws IOException {
1301: long lengthOffset = file.getCursorOffset();
1302: if (JODBConfig.DEBUG) {
1303: _logger.info("Satic substitution table start @"
1304: + lengthOffset);
1305: }
1306: file.writeShort(0);//total size unknown yet
1307: long startOffset = file.getCursorOffset();
1308: _staticTableStartOffset = file.getCursorOffset();
1309: for (int i = 0; i < staticSubstitutions.length; i++) {
1310: byte[] substAsBytes = staticSubstitutions[i]
1311: .getBytes("utf8");
1312: if (substAsBytes.length > 256) {
1313: throw new RuntimeException("entry size too big");
1314: }
1315: file.write((byte) substAsBytes.length);
1316: file.write(substAsBytes);
1317: _prefixes.add(staticSubstitutions[i]);
1318: }
1319: file.seek(lengthOffset);
1320: _staticTableLength = file.length() - startOffset;
1321: file.writeShort((short) _staticTableLength);
1322: file.seek(file.length());
1323: if (JODBConfig.DEBUG) {
1324: _logger.info("Satic substitution table end @"
1325: + file.getCursorOffset());
1326: }
1327: }
1328:
1329: public int getClassTypeSubstitutionID(String classType) {
1330: return _types.indexOf(classType);
1331: }
1332:
1333: public boolean hasPendingSubstitutionTables() {
1334: return _pendingRecords.size() != 0;
1335: }
1336:
1337: public void backupPendingSubstitutions() {
1338: _pendingRecordsBackup.addAll(_pendingRecords);
1339: }
1340:
1341: public synchronized int getOrSetClassTypeSubstitutionID(
1342: String classType) {
1343: int id = _types.indexOf(classType);
1344: if (id == -1) {
1345: id = getNextID(_types);
1346: setSubstitutionToBuffer(_types, classType, id);
1347: SubstitutionRecord record = new SubstitutionRecord(id,
1348: classType, LITERAL_SUBSTITUTION_TYPE.TYPE);
1349: _pendingRecords.add(record);
1350: }
1351: return id;
1352: }
1353:
1354: public int getOrSetFieldSubstitutionID(String fieldName) {
1355: int id = _fields.indexOf(fieldName);
1356: if (id == -1) {
1357: id = getNextID(_fields);
1358: setSubstitutionToBuffer(_fields, fieldName, id);
1359: SubstitutionRecord record = new SubstitutionRecord(id,
1360: fieldName, LITERAL_SUBSTITUTION_TYPE.FIELD);
1361: _pendingRecords.add(record);
1362: }
1363: return id;
1364: }
1365:
1366: public int getFieldSubstitutionID(String fieldName) {
1367: return _fields.indexOf(fieldName);
1368: }
1369:
1370: public String getFieldNameForID(int id) {
1371: if (id < 0 || id >= _fields.size()) {
1372: return null;
1373: }
1374: return _fields.elementAt(id);
1375: }
1376:
1377: public String getClassTypeForID(int id) {
1378: if (id < 0 || id >= _types.size()) {
1379: return null;
1380: }
1381: return _types.elementAt(id);
1382: }
1383:
1384: public String getPrefixForID(int id) {
1385: if (id < 0 || id >= _prefixes.size()) {
1386: return null;
1387: }
1388: return _prefixes.elementAt(id);
1389: }
1390:
1391: private void readStaticTable(IRandomAccessDataBuffer file)
1392: throws IOException {
1393: _staticTableStartOffset = file.getCursorOffset();
1394: _staticTableLength = file.readShort() & 0xFFFF;
1395: int end = (int) (_staticTableLength + file
1396: .getCursorOffset());
1397: byte[] buffer = new byte[256];
1398: while (file.getCursorOffset() < end) {
1399: int len = file.readUnsignedByte();
1400: if (len > buffer.length) {
1401: buffer = new byte[len];
1402: }
1403: file.read(buffer, 0, len);
1404: _prefixes.add(new String(buffer, 0, len, "utf8"));
1405: }
1406: }
1407:
1408: private SubstTable createNewTable(IRandomAccessDataBuffer file,
1409: long transactionShift) throws IOException {
1410: if (JODBConfig.DEBUG) {
1411: _logger.info("New literal sub table start @"
1412: + file.getCursorOffset());
1413: }
1414: SubstTable result = new SubstTable(this );
1415: writePendingRecords(result, null, file);
1416: //factor in transaction shift
1417: result._tableOffset += transactionShift;
1418: result._entriesCounterOffset += transactionShift;
1419: //
1420: _pendingSubstTables.add(result);
1421: if (JODBConfig.DEBUG) {
1422: _logger.info("New literal sub table end @"
1423: + file.getCursorOffset());
1424: }
1425: return result;
1426: }
1427:
1428: private SubstTable read(IRandomAccessDataBuffer file)
1429: throws IOException {
1430: if (JODBConfig.DEBUG) {
1431: _logger.info("Subst table start="
1432: + file.getCursorOffset());
1433: }
1434: SubstTable result = new SubstTable(this );
1435: result._tableOffset = file.getCursorOffset();
1436: result._totalTableLength = file.readLong();
1437: result._entriesCounterOffset = file.getCursorOffset();
1438: result._totalEntriesPending = result._totalEntries = file
1439: .readShort();
1440:
1441: //result._entriesDataOffset = file.getCursorOffset();
1442: //result._entriesDataLength = result._totalTableLength;//remove 2 bytes of counter
1443: byte[] buffer = new byte[256];
1444: for (int i = 0; i < result._totalEntries; i++) {
1445: int typeIndex = file.readUnsignedByte();
1446: LITERAL_SUBSTITUTION_TYPE type = LITERAL_SUBSTITUTION_TYPE
1447: .values()[typeIndex];
1448: int substID = file.readShort() & 0xffff;
1449: int len = file.readShort() & 0xFFFF;
1450: if (len > buffer.length) {
1451: buffer = new byte[len];
1452: }
1453: file.read(buffer, 0, len);
1454: switch (type) {
1455: case EMPTY:
1456: //TODO collect and analize empty space
1457: continue;
1458: case PREFFIX:
1459: setSubstitutionToBuffer(_prefixes, new String(
1460: buffer, 0, len, "utf8"), substID);
1461: break;
1462: case FIELD:
1463: setSubstitutionToBuffer(_fields, new String(buffer,
1464: 0, len, "utf8"), substID);
1465: break;
1466: case TYPE:
1467: setSubstitutionToBuffer(_types, new String(buffer,
1468: 0, len, "utf8"), substID);
1469: break;
1470: case TYPE_WITH_PREFFIX:
1471: setSubstitutionToBuffer(_unresolvedTypes,
1472: new String(buffer, 0, len, "utf8"), substID);
1473: break;
1474: default:
1475: throw new JodbIOException("unknown literal type");
1476: }
1477: }
1478: result._bytesInUsePending = result._bytesInUse = file
1479: .getCursorOffset()
1480: - result._entriesCounterOffset;
1481: _substTables.add(result);
1482: file.seek(result._entriesCounterOffset
1483: + result._totalTableLength);
1484: if (JODBConfig.DEBUG) {
1485: _logger.info("Subst table end="
1486: + file.getCursorOffset());
1487: }
1488: return result;
1489: }
1490:
1491: // private void writePendingRecords(ITransactionBuffer existingRecordsSrc, ITransactionBuffer newDataBuffer, ITransactionBuffer replacementsBuffer) throws IOException{
1492: // SubstTable lastRecord = _substTables.lastElement();
1493: // write(lastRecord, existingRecordsSrc, _pendingRecords, replacementsBuffer);
1494: // if( _pendingRecords.size() == 0 ){
1495: // return;
1496: // }
1497: // while(_pendingRecords.size() > 0){
1498: // int initialSize = _pendingRecords.size();
1499: // DataEntry lastEntry = _entries.lastElement();
1500: // //lastEntry.setFixedSize(true);
1501: // replacementsBuffer.seek(replacementsBuffer.length());
1502: // createNewTable(newDataBuffer, SUBST_TABLE_SIZE);
1503: // lastRecord = _substTables.lastElement();
1504: // write(lastRecord, _pendingRecords, replacementsBuffer);
1505: // if(_pendingRecords.size() == initialSize){
1506: // throw new IOException("unable to write substitutions");
1507: // }
1508: // }
1509: // }
1510:
1511: private void writePendingRecords(SubstTable target,
1512: IRandomAccessDataBuffer srcBuffer,
1513: IRandomAccessDataBuffer destinationBuffer)
1514: throws IOException {
1515:
1516: //prepare pending values
1517: target._bytesInUsePending = target._bytesInUse;
1518: target._totalEntriesPending = target._totalEntries;
1519:
1520: long totalTableLengthOffset = destinationBuffer
1521: .getCursorOffset();
1522: long tableDataStartOffset = totalTableLengthOffset + 8;//8 bytes for length entry
1523:
1524: if (target._totalTableLength > 0) {//TODO do not backup table length as it remains constant
1525: long bytesToTransfer = target._entriesCounterOffset
1526: - target._tableOffset + target._bytesInUse;
1527: srcBuffer.seek(target._tableOffset);
1528: if (destinationBuffer.transferFrom(srcBuffer
1529: .getChannel(), totalTableLengthOffset,
1530: bytesToTransfer) != bytesToTransfer) {
1531: throw new JodbIOException("Unable to tansfer "
1532: + bytesToTransfer + " bytes");
1533: }
1534: destinationBuffer.seek(totalTableLengthOffset
1535: + bytesToTransfer);
1536: } else {//new table
1537: target._tableOffset = destinationBuffer
1538: .getCursorOffset();
1539: destinationBuffer.writeLong(0);//reserve space for total length
1540: target._entriesCounterOffset = destinationBuffer
1541: .getCursorOffset();
1542: target._totalEntriesPending = target._totalEntries = 0;
1543: destinationBuffer
1544: .writeShort(target._totalEntriesPending);
1545: target._bytesInUse = target._bytesInUsePending = destinationBuffer
1546: .getCursorOffset()
1547: - tableDataStartOffset;
1548: }
1549:
1550: Vector<SubstitutionRecord> recordsToRemove = new Vector<SubstitutionRecord>();
1551:
1552: for (int i = 0; i < _pendingRecords.size(); i++) {
1553: SubstitutionRecord record = _pendingRecords
1554: .elementAt(i);
1555: byte[] substAsBytes;
1556: try {
1557: substAsBytes = record._substitution
1558: .getBytes("utf8");
1559: } catch (UnsupportedEncodingException e) {
1560: throw new Error(e);
1561: }
1562: int lengthOfEntry = substAsBytes.length + 5;
1563: if (target._totalTableLength != 0
1564: && lengthOfEntry > target._totalTableLength
1565: - target._bytesInUsePending) {
1566: break;
1567: }
1568:
1569: destinationBuffer.write((byte) record._type.ordinal());
1570: // Vector<String> targetSubstitutions;
1571: // switch (record._type) {
1572: // case FIELD:
1573: // targetSubstitutions = _fields;
1574: // break;
1575: // case PREFFIX:
1576: // targetSubstitutions = _prefixes;
1577: // break;
1578: // case TYPE:
1579: // targetSubstitutions = _types;
1580: // break;
1581: // default:
1582: // throw new RuntimeException();
1583: // }
1584:
1585: destinationBuffer.writeShort(record._id);
1586: destinationBuffer
1587: .writeShort((short) substAsBytes.length);
1588: destinationBuffer.write(substAsBytes);
1589: target._bytesInUsePending += lengthOfEntry;
1590: target._totalEntriesPending++;
1591: _pendingRecordsBackup.add(record);
1592: recordsToRemove.add(record);
1593: }
1594: destinationBuffer.seek(totalTableLengthOffset);
1595: if (target._totalTableLength == 0) {
1596: target._totalTableLength = target._bytesInUsePending
1597: + SUBST_TABLE_SIZE;
1598: destinationBuffer.writeLong(target._totalTableLength);
1599: } else {
1600: destinationBuffer.skip(8);
1601: }
1602: if (recordsToRemove.size() > 0) {
1603: destinationBuffer
1604: .writeShort(target._totalEntriesPending);
1605: }
1606: long tableEndOffset = tableDataStartOffset
1607: + target._totalTableLength;
1608: if (destinationBuffer.length() < tableEndOffset) {
1609: destinationBuffer.setLength(tableEndOffset);
1610: }
1611: destinationBuffer.seek(tableEndOffset);
1612: _pendingRecords.removeAll(recordsToRemove);
1613: }
1614:
1615: // private synchronized void resolveTypes(){
1616: // if(_unresolvedTypes.size()==0){
1617: // return;
1618: // }
1619: // for (int i = 0; i < _unresolvedTypes.size(); i++) {
1620: // String type = _unresolvedTypes.elementAt(i);
1621: // if(type == null){
1622: // continue;
1623: // }
1624: // char substIndex = type.charAt(0);
1625: // String subst = _prefixes.elementAt(substIndex);
1626: // type = subst+type.substring(1);
1627: // setSubstitutionToBuffer(_prefixes, type, i);
1628: // }
1629: // _unresolvedTypes = new Vector<String>();
1630: // }
1631:
1632: private void setSubstitutionToBuffer(Vector<String> buffer,
1633: String subst, int id) {
1634: if (buffer.size() <= id) {
1635: buffer.setSize(id + 1);
1636: }
1637: buffer.setElementAt(subst, id);
1638: }
1639:
1640: private int getNextID(Vector<String> substitutions) {
1641: for (int i = 0; i < substitutions.size(); i++) {
1642: if (substitutions.elementAt(i) == null) {
1643: return i;
1644: }
1645: }
1646: return substitutions.size();
1647: }
1648:
1649: public void enablePendingValues() {
1650: if (_substTables.size() > 0) {
1651: SubstTable last = _substTables.lastElement();
1652: last.enablePendingValues();
1653: }
1654: _substTables.addAll(_pendingSubstTables);
1655: _pendingSubstTables.clear();
1656: _pendingRecordsBackup.clear();
1657: }
1658: }
1659:
1660: public class SubstitutionRecord {
1661: public int _id;
1662: public String _substitution;
1663: public LITERAL_SUBSTITUTION_TYPE _type;
1664:
1665: public SubstitutionRecord(int id, String substitution,
1666: LITERAL_SUBSTITUTION_TYPE type) {
1667: super ();
1668: _id = id;
1669: _substitution = substitution;
1670: _type = type;
1671: }
1672:
1673: }
1674:
1675: public class SubstTable {
1676: long _tableOffset;
1677: //long _entriesDataOffset;
1678: long _totalTableLength;
1679: long _entriesCounterOffset;
1680: int _totalEntries;
1681: //long _entriesDataLength;
1682: long _bytesInUse;//from start
1683:
1684: int _totalEntriesPending;
1685: long _bytesInUsePending;//from start
1686:
1687: //int _nextTableOffset;
1688: private LiteralSubstitutions _globalOwner;
1689:
1690: /**
1691: *
1692: */
1693: public SubstTable(LiteralSubstitutions owner) {
1694: _globalOwner = owner;
1695: }
1696:
1697: public void enablePendingValues() {
1698: _totalEntries = _totalEntriesPending;
1699: _bytesInUse = _bytesInUsePending;
1700: }
1701:
1702: public void writePendingRecords(
1703: IRandomAccessDataBuffer srcBuffer,
1704: IRandomAccessDataBuffer destinationBuffer)
1705: throws IOException {
1706: _globalOwner.writePendingRecords(this , srcBuffer,
1707: destinationBuffer);
1708: }
1709: }
1710:
1711: public class DataEntry {
1712: private SubstTable _literalSubstTable;
1713: private long _tableOffset;
1714: private long _dataOffset;//actual data region start from the begginning of file
1715: private long _dataBlockLength;
1716: private boolean _lastEntry;
1717:
1718: private long _pendingDataBlockLength;
1719: private boolean _pendingLastEntry = true;
1720:
1721: //private boolean _fixedSize;
1722:
1723: public void read(IRandomAccessDataBuffer transactionBuffer)
1724: throws IOException {
1725: if (JODBConfig.DEBUG) {
1726: _logger.info("Data entry start read @"
1727: + transactionBuffer.getCursorOffset());
1728: }
1729: _tableOffset = transactionBuffer.getCursorOffset();
1730: _dataBlockLength = transactionBuffer.readLong();
1731: _pendingLastEntry = _lastEntry = addrHasConfirmationBit(_dataBlockLength);
1732: _pendingDataBlockLength = _dataBlockLength = removeConfirmationBit(_dataBlockLength);
1733: _literalSubstTable = _substitutions.read(transactionBuffer);
1734: _dataOffset = transactionBuffer.getCursorOffset();
1735: transactionBuffer.seek(getDataBlockEnd());
1736: if (JODBConfig.DEBUG) {
1737: _logger.info("Data entry end read @"
1738: + transactionBuffer.getCursorOffset());
1739: }
1740: }
1741:
1742: public void writeWithAllSubstitutionRecords(
1743: IRandomAccessDataBuffer destinationBuffer,
1744: IRandomAccessDataBuffer srcBuffer,
1745: long dataBlockLength, long transactionShift)
1746: throws IOException {
1747: if (_tableOffset == 0) {//new entry
1748: _tableOffset = destinationBuffer.getCursorOffset()
1749: + transactionShift;
1750: if (JODBConfig.DEBUG) {
1751: _logger.info("New data table start @"
1752: + _tableOffset);
1753: }
1754: destinationBuffer
1755: .writeLong(addConfirmationBitToAddress(0));//length of data
1756: _literalSubstTable = _substitutions.createNewTable(
1757: destinationBuffer, transactionShift);
1758: _dataOffset = destinationBuffer.getCursorOffset()
1759: + transactionShift;
1760: if (JODBConfig.DEBUG) {
1761: _logger
1762: .info("New data table start @"
1763: + _dataOffset);
1764: }
1765: } else {
1766: _pendingDataBlockLength = dataBlockLength;
1767: long lengthEntry = dataBlockLength;
1768: if (_lastEntry) {
1769: lengthEntry = addConfirmationBitToAddress(lengthEntry);
1770: }
1771: destinationBuffer.writeLong(lengthEntry);//length of data
1772: _literalSubstTable.writePendingRecords(srcBuffer,
1773: destinationBuffer);
1774: }
1775: }
1776:
1777: // public void writeSubstitutionRecords(ITransactionBuffer destinationBuffer,ITransactionBuffer srcBuffer) throws IOException{
1778: // _literalSubstTable.writePendingRecords(srcBuffer, destinationBuffer);
1779: // }
1780:
1781: public void write(IRandomAccessDataBuffer destinationBuffer,
1782: long dataBlockLength, boolean seal) throws IOException {
1783: _pendingLastEntry = !seal;
1784: _pendingDataBlockLength = dataBlockLength;
1785: destinationBuffer.writeLong(seal ? dataBlockLength
1786: : addConfirmationBitToAddress(dataBlockLength));
1787: }
1788:
1789: public SubstTable getLiteralSubstTable() {
1790: return _literalSubstTable;
1791: }
1792:
1793: /**
1794: * @return the _offset
1795: */
1796: public long getDataOffset() {
1797: return _dataOffset;
1798: }
1799:
1800: /**
1801: * @return the _dataBlockEnd
1802: */
1803: public long getDataBlockEnd() {
1804: return _dataOffset + _dataBlockLength;
1805: }
1806:
1807: public void enablePendingValues() {
1808: _dataBlockLength = _pendingDataBlockLength;
1809: // _pendingDataBlockLength = 0;
1810: _lastEntry = _pendingLastEntry;
1811: _literalSubstTable.enablePendingValues();
1812: }
1813: }
1814:
1815: private class DatabaseStatistics implements IDatabaseStatistics {
1816:
1817: public DatabaseStatistics() throws IOException {
1818: UnicastRemoteObject.exportObject(this ,
1819: JODBConstants.DEFAULT_RMI_PORT);
1820: }
1821:
1822: public long literalSubstFreeSize(int tableIndex)
1823: throws IOException {
1824: SubstTable substTable = _entries.get(tableIndex)
1825: .getLiteralSubstTable();
1826: return substTable._totalTableLength
1827: - substTable._bytesInUse;
1828: }
1829:
1830: public long literalSubstMaxSize(int tableIndex)
1831: throws IOException {
1832: SubstTable substTable = _entries.get(tableIndex)
1833: .getLiteralSubstTable();
1834: return substTable._totalTableLength;
1835: }
1836:
1837: public int totalDataEntries() throws IOException {
1838: return _entries.size();
1839: }
1840:
1841: }
1842:
1843: private static class TicketSearchKey {
1844: int _hashCode;
1845:
1846: public void init(int hashCode) {
1847: _hashCode = hashCode;
1848: }
1849:
1850: @Override
1851: public int hashCode() {
1852: return _hashCode;
1853: }
1854:
1855: @Override
1856: public boolean equals(Object obj) {
1857: return _hashCode == System.identityHashCode(obj);
1858: }
1859: }
1860:
1861: private static class SearchKeysHolder {
1862: TicketSearchKey _ticketSearchObject = new TicketSearchKey();
1863: }
1864:
1865: private static class IOTicketWRHolder {
1866: WeakReference<IOTicketImpl> _weakReference;
1867:
1868: public IOTicketWRHolder(IOTicketImpl ticketImpl) {
1869: _weakReference = new WeakReference<IOTicketImpl>(ticketImpl);
1870: }
1871:
1872: public IRandomAccessDataBuffer getRandomAccessDataBuffer() {
1873: IOTicketImpl ticketImpl = _weakReference.get();
1874: if (ticketImpl != null) {
1875: return ticketImpl._randomAccessBuffer;
1876: }
1877: return null;
1878: }
1879: }
1880: }
|