0001: /*
0002: * Licensed to the Apache Software Foundation (ASF) under one or more
0003: * contributor license agreements. See the NOTICE file distributed with
0004: * this work for additional information regarding copyright ownership.
0005: * The ASF licenses this file to You under the Apache License, Version 2.0
0006: * (the "License"); you may not use this file except in compliance with
0007: * the License. You may obtain a copy of the License at
0008: *
0009: * http://www.apache.org/licenses/LICENSE-2.0
0010: *
0011: * Unless required by applicable law or agreed to in writing, software
0012: * distributed under the License is distributed on an "AS IS" BASIS,
0013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014: * See the License for the specific language governing permissions and
0015: * limitations under the License.
0016: */
0017: package org.apache.commons.transaction.file;
0018:
0019: import java.io.BufferedReader;
0020: import java.io.BufferedWriter;
0021: import java.io.File;
0022: import java.io.FileInputStream;
0023: import java.io.FileNotFoundException;
0024: import java.io.FileOutputStream;
0025: import java.io.IOException;
0026: import java.io.InputStream;
0027: import java.io.InputStreamReader;
0028: import java.io.OutputStream;
0029: import java.io.OutputStreamWriter;
0030: import java.util.ArrayList;
0031: import java.util.Collection;
0032: import java.util.HashMap;
0033: import java.util.List;
0034: import java.util.Map;
0035: import java.util.Iterator;
0036: import java.util.Collections;
0037:
0038: import org.apache.commons.transaction.locking.GenericLock;
0039: import org.apache.commons.transaction.locking.GenericLockManager;
0040: import org.apache.commons.transaction.locking.LockException;
0041: import org.apache.commons.transaction.locking.LockManager2;
0042: import org.apache.commons.transaction.util.FileHelper;
0043: import org.apache.commons.transaction.util.LoggerFacade;
0044:
0045: /**
0046: * A resource manager for streamable objects stored in a file system.
0047: *
0048: * It is intended for developer and "out of the box" use.
0049: * It is <em>not</em> intended to be a real alternative for
0050: * a full blown DMBS (of course it can not be compared to a RDBMS at all).
0051: *
0052: * Major features:<br>
0053: * <ul>
0054: * <li>Transactions performed with this class more or less comform to the widely accepted ACID properties
0055: * <li>Reading should be as fast as from the ordinary file system (at the cost of a bit slower commits)
0056: * </ul>
0057: *
0058: * Compared to a "real" DBMS major limitations are (in order of assumed severity):<br>
0059: * <ul>
0060: * <li>Number of simultaneously open resources is limited to the number of available file descriptors
0061: * <li>It does not scale a bit
0062: * <li>Pessimistic transaction and locking scheme
0063: * <li>Isolation level currently is restricted to <em>read committed</em> and <em>repeated read</em> (which is not that bad)
0064: * </ul>
0065: *
0066: * <em>Important</em>: If possible you should have the work and store directory located in the
0067: * same file system. If not, you might get additional problems, as there are:
0068: * <ul>
0069: * <li>On commit it might be necessay to copy files instead of rename/relink them. This may lead to time consuming,
0070: * overly blocking commit phases and higher risk of corrupted files
0071: * <li>Prepare phase might be too permissive, no check for sufficient memory on store file system is possible
0072: * </ul>
0073: *
0074: * General limitations include:<br>
0075: * <ul>
0076: * <li>Due to lack of synchronization on the transaction context level, every transaction may only be
0077: * accessed by a <em>single thread</em> throughout its full life.
0078: * This means it is forbidden for a thread that has not started a transaction
0079: * to perform any operations inside this transaction. However, threads associated
0080: * with different transactions can safely access these methods concurrently.
0081: * Reasons for the lack of synchronization are improved performance and simplicity (of the code of this class).
0082: * <li>There is no dedicated class for a transaction. Having such a class would be better practice and
0083: * make certain actions more intuitive.
0084: * <li>Resource identifiers need a reasonsable string representation obtainable by <code>toString</code>.
0085: * More specifically, they will have to resolve to a <em>valid</em> file path that does note denote a directory.
0086: * If it does, you might be able to create it, but not to read or write anything
0087: * from resp. to it. Valid string representations of a resource idenfier are
0088: * for example "file" "/root" or "hjfhdfhuhuhsdufhdsufhdsufhdfuhdfduhduhduhdu".
0089: * Invalid are for example "/" or "/root/". Invalid on some file systems are for example "c:" or "file://huhu".
0090: * <li>As there are no active processes inside this RM and it shares its threads with the application,
0091: * control over transactions is limited to points where the application calls the RM.
0092: * In particular, this disables <em>active</em> termination of transactions upon timeout.
0093: * <li>There is no notion of a connection to this file manager. This means you can not connect from hosts other than
0094: * local and you will get problems when plugging this store into a J2EE store using connectors.
0095: * <li>Methods should throw more specific exceptions
0096: * </ul>
0097: *
0098: * <p><em>Caution</em>:<br>
0099: * The <code>txId</code> passed to many methods as an identifier for the
0100: * transaction concerned will function as a key in a <code>HashMap</code>.
0101: * Thus assure that <code>equals</code> and <code>hashCode</code> are both
0102: * properly implemented and match each other.</p>
0103: *
0104: * <p><em>Caution</em>:<br>
0105: * You will have to guarantee that no other process will access neither
0106: * the store or the working dir concurrently to this <code>FileResourceManager</code>.</p>
0107: *
0108: * <p><em>Special Caution</em>:<br>
0109: * Be very careful not to have two instances of <code>FileResourceManager</code>
0110: * working in the same store and/or working dir.
0111: *
0112: * @version $Id: FileResourceManager.java 509474 2007-02-20 09:04:22Z antoine $
0113: */
0114: public class FileResourceManager implements ResourceManager,
0115: ResourceManagerErrorCodes {
0116:
0117: // reflects the natural isolation level of this store
0118: protected static final int NATIVE_ISOLATION_LEVEL = ISOLATION_LEVEL_REPEATABLE_READ;
0119: protected static final int DEFAULT_ISOLATION_LEVEL = NATIVE_ISOLATION_LEVEL;
0120:
0121: protected static final int NO_LOCK = 0;
0122: protected static final int LOCK_ACCESS = NO_LOCK + 1;
0123: protected static final int LOCK_SHARED = NO_LOCK + 2;
0124: protected static final int LOCK_EXCLUSIVE = NO_LOCK + 3;
0125: protected static final int LOCK_COMMIT = NO_LOCK + 4;
0126:
0127: protected static final int OPERATION_MODE_STOPPED = 0;
0128: protected static final int OPERATION_MODE_STOPPING = 1;
0129: protected static final int OPERATION_MODE_STARTED = 2;
0130: protected static final int OPERATION_MODE_STARTING = 3;
0131: protected static final int OPERATION_MODE_RECOVERING = 4;
0132:
0133: protected static final String DEFAULT_PARAMETER_ENCODING = "ISO-8859-15";
0134:
0135: protected static final int DEFAULT_TIMEOUT_MSECS = 5000;
0136: protected static final int DEFAULT_COMMIT_TIMEOUT_FACTOR = 2;
0137:
0138: protected static final String WORK_CHANGE_DIR = "change";
0139: protected static final String WORK_DELETE_DIR = "delete";
0140:
0141: protected static final String CONTEXT_FILE = "transaction.log";
0142:
0143: /*
0144: * --- Static helper methods ---
0145: *
0146: *
0147: */
0148:
0149: protected static void applyDeletes(File removeDir, File targetDir,
0150: File rootDir) throws IOException {
0151: if (removeDir.isDirectory() && targetDir.isDirectory()) {
0152: File[] files = removeDir.listFiles();
0153: for (int i = 0; i < files.length; i++) {
0154: File removeFile = files[i];
0155: File targetFile = new File(targetDir, removeFile
0156: .getName());
0157: if (removeFile.isFile()) {
0158: if (targetFile.exists()) {
0159: if (!targetFile.delete()) {
0160: throw new IOException(
0161: "Could not delete file "
0162: + removeFile.getName()
0163: + " in directory targetDir");
0164: }
0165: }
0166: // indicate, this has been done
0167: removeFile.delete();
0168: } else {
0169: applyDeletes(removeFile, targetFile, rootDir);
0170: }
0171: // delete empty target directories, except root dir
0172: if (!targetDir.equals(rootDir)
0173: && targetDir.list().length == 0) {
0174: targetDir.delete();
0175: }
0176: }
0177: }
0178: }
0179:
0180: /*
0181: * --- object members ---
0182: *
0183: *
0184: */
0185:
0186: protected String workDir;
0187: protected String storeDir;
0188: protected boolean cleanUp = true;
0189: protected boolean dirty = false;
0190: protected int operationMode = OPERATION_MODE_STOPPED;
0191: protected long defaultTimeout = DEFAULT_TIMEOUT_MSECS;
0192: protected boolean debug;
0193:
0194: protected LoggerFacade logger;
0195:
0196: protected Map globalTransactions;
0197: protected List globalOpenResources;
0198: protected LockManager2 lockManager;
0199:
0200: protected ResourceIdToPathMapper idMapper = null;
0201: protected TransactionIdToPathMapper txIdMapper = null;
0202:
0203: protected int idCnt = 0;
0204:
0205: /*
0206: * --- ctor and general getter / setter methods ---
0207: *
0208: *
0209: */
0210:
0211: /**
0212: * Creates a new resource manager operation on the specified directories.
0213: *
0214: * @param storeDir directory where main data should go after commit
0215: * @param workDir directory where transactions store temporary data
0216: * @param urlEncodePath if set to <code>true</code> encodes all paths to allow for any kind of characters
0217: * @param logger the logger to be used by this store
0218: */
0219: public FileResourceManager(String storeDir, String workDir,
0220: boolean urlEncodePath, LoggerFacade logger) {
0221: this (storeDir, workDir, urlEncodePath, logger, false);
0222: }
0223:
0224: /**
0225: * Creates a new resource manager operation on the specified directories.
0226: *
0227: * @param storeDir directory where main data should go after commit
0228: * @param workDir directory where transactions store temporary data
0229: * @param urlEncodePath if set to <code>true</code> encodes all paths to allow for any kind of characters
0230: * @param logger the logger to be used by this store
0231: * @param debug if set to <code>true</code> logs all locking information to "transaction.log" for debugging inspection
0232: */
0233: public FileResourceManager(String storeDir, String workDir,
0234: boolean urlEncodePath, LoggerFacade logger, boolean debug) {
0235: this (storeDir, workDir, urlEncodePath ? new URLEncodeIdMapper()
0236: : null, new NoOpTransactionIdToPathMapper(), logger,
0237: debug);
0238: }
0239:
0240: /**
0241: * Creates a new resource manager operation on the specified directories.
0242: * This constructor is reintroduced for backwards API compatibility and is used by jakarta-slide.
0243: *
0244: * @param storeDir directory where main data should go after commit
0245: * @param workDir directory where transactions store temporary data
0246: * @param idMapper mapper for resourceId to path
0247: * @param logger the logger to be used by this store
0248: * @param debug if set to <code>true</code> logs all locking information to "transaction.log" for debugging inspection
0249: */
0250: public FileResourceManager(String storeDir, String workDir,
0251: ResourceIdToPathMapper idMapper, LoggerFacade logger,
0252: boolean debug) {
0253: this (storeDir, workDir, idMapper,
0254: new NoOpTransactionIdToPathMapper(), logger, debug);
0255: }
0256:
0257: /**
0258: * Creates a new resource manager operation on the specified directories.
0259: *
0260: * @param storeDir directory where main data should go after commit
0261: * @param workDir directory where transactions store temporary data
0262: * @param idMapper mapper for resourceId to path
0263: * @param txIdMapper mapper for transaction id to path
0264: * @param logger the logger to be used by this store
0265: * @param debug if set to <code>true</code> logs all locking information to "transaction.log" for debugging inspection
0266: */
0267: public FileResourceManager(String storeDir, String workDir,
0268: ResourceIdToPathMapper idMapper,
0269: TransactionIdToPathMapper txIdMapper, LoggerFacade logger,
0270: boolean debug) {
0271: this .workDir = workDir;
0272: this .storeDir = storeDir;
0273: this .idMapper = idMapper;
0274: this .txIdMapper = txIdMapper;
0275: this .logger = logger;
0276: this .debug = debug;
0277: }
0278:
0279: /**
0280: * Gets the store directory.
0281: *
0282: * @return the store directory
0283: * @see #FileResourceManager(String, String, boolean, LoggerFacade)
0284: * @see #FileResourceManager(String, String, boolean, LoggerFacade, boolean)
0285: */
0286: public String getStoreDir() {
0287: return storeDir;
0288: }
0289:
0290: /**
0291: * Gets the working directory.
0292: *
0293: * @return the work directory
0294: * @see #FileResourceManager(String, String, boolean, LoggerFacade)
0295: * @see #FileResourceManager(String, String, boolean, LoggerFacade, boolean)
0296: */
0297: public String getWorkDir() {
0298: return workDir;
0299: }
0300:
0301: /**
0302: * Gets the logger used by this resource manager.
0303: *
0304: * @return used logger
0305: */
0306: public LoggerFacade getLogger() {
0307: return logger;
0308: }
0309:
0310: /*
0311: * --- public methods of interface ResourceManager ---
0312: *
0313: *
0314: */
0315:
0316: public boolean lockResource(Object resourceId, Object txId)
0317: throws ResourceManagerException {
0318: lockResource(resourceId, txId, false);
0319: // XXX will never return false as it will either throw or return true
0320: return true;
0321: }
0322:
0323: public boolean lockResource(Object resourceId, Object txId,
0324: boolean shared) throws ResourceManagerException {
0325: lockResource(resourceId, txId, shared, true, Long.MAX_VALUE,
0326: true);
0327: // XXX will never return false as it will either throw or return true
0328: return true;
0329: }
0330:
0331: public boolean lockResource(Object resourceId, Object txId,
0332: boolean shared, boolean wait, long timeoutMSecs,
0333: boolean reentrant) throws ResourceManagerException {
0334:
0335: TransactionContext context = (shared ? txInitialSaneCheck(txId)
0336: : txInitialSaneCheckForWriting(txId));
0337: assureNotMarkedForRollback(context);
0338: fileInitialSaneCheck(txId, resourceId);
0339:
0340: // XXX allows locking of non existent resources (e.g. to prepare a create)
0341: int level = (shared ? getSharedLockLevel(context)
0342: : LOCK_EXCLUSIVE);
0343: try {
0344: lockManager.lock(txId, resourceId, level, reentrant, Math
0345: .min(timeoutMSecs, context.timeoutMSecs));
0346: // XXX will never return false as it will either throw or return true
0347: return true;
0348: } catch (LockException e) {
0349: switch (e.getCode()) {
0350: case LockException.CODE_INTERRUPTED:
0351: throw new ResourceManagerException(
0352: "Could not get lock for resource at '"
0353: + resourceId + "'", ERR_NO_LOCK, txId);
0354: case LockException.CODE_TIMED_OUT:
0355: throw new ResourceManagerException(
0356: "Lock timed out for resource at '" + resourceId
0357: + "'", ERR_NO_LOCK, txId);
0358: case LockException.CODE_DEADLOCK_VICTIM:
0359: throw new ResourceManagerException(
0360: "Deadlock victim resource at '" + resourceId
0361: + "'", ERR_DEAD_LOCK, txId);
0362: default:
0363: throw new ResourceManagerException(
0364: "Locking exception for resource at '"
0365: + resourceId + "'", ERR_DEAD_LOCK, txId);
0366: }
0367: }
0368: }
0369:
0370: public int getDefaultIsolationLevel() {
0371: return DEFAULT_ISOLATION_LEVEL;
0372: }
0373:
0374: public int[] getSupportedIsolationLevels()
0375: throws ResourceManagerException {
0376: return new int[] { ISOLATION_LEVEL_READ_COMMITTED,
0377: ISOLATION_LEVEL_REPEATABLE_READ };
0378: }
0379:
0380: public boolean isIsolationLevelSupported(int level)
0381: throws ResourceManagerException {
0382: return (level == ISOLATION_LEVEL_READ_COMMITTED || level == ISOLATION_LEVEL_REPEATABLE_READ);
0383: }
0384:
0385: /**
0386: * Gets the default transaction timeout in <em>milliseconds</em>.
0387: */
0388: public long getDefaultTransactionTimeout() {
0389: return defaultTimeout;
0390: }
0391:
0392: /**
0393: * Sets the default transaction timeout.
0394: *
0395: * @param timeout timeout in <em>milliseconds</em>
0396: */
0397: public void setDefaultTransactionTimeout(long timeout) {
0398: defaultTimeout = timeout;
0399: }
0400:
0401: public long getTransactionTimeout(Object txId)
0402: throws ResourceManagerException {
0403: assureRMReady();
0404: long msecs = 0;
0405: TransactionContext context = getContext(txId);
0406: if (context == null) {
0407: msecs = getDefaultTransactionTimeout();
0408: } else {
0409: msecs = context.timeoutMSecs;
0410: }
0411: return msecs;
0412: }
0413:
0414: public void setTransactionTimeout(Object txId, long mSecs)
0415: throws ResourceManagerException {
0416: assureRMReady();
0417: TransactionContext context = getContext(txId);
0418: if (context != null) {
0419: context.timeoutMSecs = mSecs;
0420: } else {
0421: throw new ResourceManagerException(ERR_NO_TX, txId);
0422: }
0423: }
0424:
0425: public int getIsolationLevel(Object txId)
0426: throws ResourceManagerException {
0427: assureRMReady();
0428: TransactionContext context = getContext(txId);
0429: if (context == null) {
0430: return DEFAULT_ISOLATION_LEVEL;
0431: } else {
0432: return context.isolationLevel;
0433: }
0434: }
0435:
0436: public void setIsolationLevel(Object txId, int level)
0437: throws ResourceManagerException {
0438: assureRMReady();
0439: TransactionContext context = getContext(txId);
0440: if (context != null) {
0441: if (level != ISOLATION_LEVEL_READ_COMMITTED
0442: || level != ISOLATION_LEVEL_REPEATABLE_READ) {
0443: context.isolationLevel = level;
0444: } else {
0445: throw new ResourceManagerException(
0446: ERR_ISOLATION_LEVEL_UNSUPPORTED, txId);
0447: }
0448: } else {
0449: throw new ResourceManagerException(ERR_NO_TX, txId);
0450: }
0451: }
0452:
0453: public synchronized void start()
0454: throws ResourceManagerSystemException {
0455:
0456: logger.logInfo("Starting RM at '" + storeDir + "' / '"
0457: + workDir + "'");
0458:
0459: operationMode = OPERATION_MODE_STARTING;
0460:
0461: globalTransactions = Collections.synchronizedMap(new HashMap());
0462: lockManager = new GenericLockManager(LOCK_COMMIT, logger);
0463: globalOpenResources = Collections
0464: .synchronizedList(new ArrayList());
0465:
0466: recover();
0467: sync();
0468:
0469: operationMode = OPERATION_MODE_STARTED;
0470:
0471: if (dirty) {
0472: logger
0473: .logWarning("Started RM, but in dirty mode only (Recovery of pending transactions failed)");
0474: } else {
0475: logger.logInfo("Started RM");
0476: }
0477:
0478: }
0479:
0480: public synchronized boolean stop(int mode)
0481: throws ResourceManagerSystemException {
0482: return stop(mode, getDefaultTransactionTimeout()
0483: * DEFAULT_COMMIT_TIMEOUT_FACTOR);
0484: }
0485:
0486: public synchronized boolean stop(int mode, long timeOut)
0487: throws ResourceManagerSystemException {
0488:
0489: logger.logInfo("Stopping RM at '" + storeDir + "' / '"
0490: + workDir + "'");
0491:
0492: operationMode = OPERATION_MODE_STOPPING;
0493:
0494: sync();
0495: boolean success = shutdown(mode, timeOut);
0496:
0497: releaseGlobalOpenResources();
0498:
0499: if (success) {
0500: operationMode = OPERATION_MODE_STOPPED;
0501: logger.logInfo("Stopped RM");
0502: } else {
0503: logger.logWarning("Failed to stop RM");
0504: }
0505:
0506: return success;
0507: }
0508:
0509: public synchronized boolean recover()
0510: throws ResourceManagerSystemException {
0511: if (operationMode != OPERATION_MODE_STARTED
0512: && operationMode != OPERATION_MODE_STARTING) {
0513: throw new ResourceManagerSystemException(ERR_SYSTEM,
0514: "Recovery is possible in started or starting resource manager only");
0515: }
0516: int oldMode = operationMode;
0517: operationMode = OPERATION_MODE_RECOVERING;
0518:
0519: recoverContexts();
0520: if (globalTransactions.size() > 0) {
0521: logger.logInfo("Recovering pending transactions");
0522: }
0523:
0524: dirty = !rollBackOrForward();
0525:
0526: operationMode = oldMode;
0527: return dirty;
0528: }
0529:
0530: public int getTransactionState(Object txId)
0531: throws ResourceManagerException {
0532: TransactionContext context = getContext(txId);
0533:
0534: if (context == null) {
0535: return STATUS_NO_TRANSACTION;
0536: } else {
0537: return context.status;
0538: }
0539:
0540: }
0541:
0542: public void startTransaction(Object txId)
0543: throws ResourceManagerException {
0544:
0545: if (logger.isFineEnabled())
0546: logger.logFine("Starting Tx " + txId);
0547:
0548: assureStarted(); // can only start a new transaction when not already stopping
0549: if (txId == null || txIdMapper.getPathForId(txId).length() == 0) {
0550: throw new ResourceManagerException(ERR_TXID_INVALID, txId);
0551: }
0552:
0553: // be sure we are the only ones who create this tx
0554: synchronized (globalTransactions) {
0555: TransactionContext context = getContext(txId);
0556:
0557: if (context != null) {
0558: throw new ResourceManagerException(ERR_DUP_TX, txId);
0559: }
0560:
0561: context = new TransactionContext(txId);
0562: context.init();
0563: globalTransactions.put(txId, context);
0564:
0565: }
0566: }
0567:
0568: public void markTransactionForRollback(Object txId)
0569: throws ResourceManagerException {
0570: assureRMReady();
0571: TransactionContext context = txInitialSaneCheckForWriting(txId);
0572: try {
0573: context.status = STATUS_MARKED_ROLLBACK;
0574: context.saveState();
0575: } finally {
0576: // be very sure to free locks and resources, as application might crash or otherwise forget to roll this tx back
0577: context.finalCleanUp();
0578: }
0579: }
0580:
0581: public int prepareTransaction(Object txId)
0582: throws ResourceManagerException {
0583: assureRMReady();
0584: // do not allow any further writing or commit or rollback when db is corrupt
0585: if (dirty) {
0586: throw new ResourceManagerSystemException(
0587: "Database is set to dirty, this *may* mean it is corrupt. No modifications are allowed until a recovery run has been performed!",
0588: ERR_SYSTEM, txId);
0589: }
0590:
0591: if (txId == null) {
0592: throw new ResourceManagerException(ERR_TXID_INVALID, txId);
0593: }
0594:
0595: TransactionContext context = getContext(txId);
0596:
0597: if (context == null) {
0598: return PREPARE_FAILURE;
0599: }
0600:
0601: synchronized (context) {
0602:
0603: sync();
0604:
0605: if (context.status != STATUS_ACTIVE) {
0606: context.status = STATUS_MARKED_ROLLBACK;
0607: context.saveState();
0608: return PREPARE_FAILURE;
0609: }
0610:
0611: if (logger.isFineEnabled())
0612: logger.logFine("Preparing Tx " + txId);
0613:
0614: int prepareStatus = PREPARE_FAILURE;
0615:
0616: context.status = STATUS_PREPARING;
0617: context.saveState();
0618: // do all checks as early as possible
0619: context.closeResources();
0620: if (context.readOnly) {
0621: prepareStatus = PREPARE_SUCCESS_READONLY;
0622: } else {
0623: // do all checks as early as possible
0624: try {
0625: context.upgradeLockToCommit();
0626: } catch (ResourceManagerException rme) {
0627: // if this did not work, mark it for roll back as early as possible
0628: markTransactionForRollback(txId);
0629: throw rme;
0630: }
0631: prepareStatus = PREPARE_SUCCESS;
0632: }
0633: context.status = STATUS_PREPARED;
0634: context.saveState();
0635: if (logger.isFineEnabled())
0636: logger.logFine("Prepared Tx " + txId);
0637:
0638: return prepareStatus;
0639: }
0640: }
0641:
0642: public void rollbackTransaction(Object txId)
0643: throws ResourceManagerException {
0644: assureRMReady();
0645: TransactionContext context = txInitialSaneCheckForWriting(txId);
0646: // needing synchronization in order not to interfer with shutdown thread
0647: synchronized (context) {
0648: try {
0649:
0650: if (logger.isFineEnabled())
0651: logger.logFine("Rolling back Tx " + txId);
0652:
0653: context.status = STATUS_ROLLING_BACK;
0654: context.saveState();
0655: context.rollback();
0656: context.status = STATUS_ROLLEDBACK;
0657: context.saveState();
0658: globalTransactions.remove(txId);
0659: context.cleanUp();
0660:
0661: if (logger.isFineEnabled())
0662: logger.logFine("Rolled back Tx " + txId);
0663:
0664: // any system or runtime exceptions or errors thrown in rollback means we are in deep trouble, set the dirty flag
0665: } catch (Error e) {
0666: setDirty(txId, e);
0667: throw e;
0668: } catch (RuntimeException e) {
0669: setDirty(txId, e);
0670: throw e;
0671: } catch (ResourceManagerSystemException e) {
0672: setDirty(txId, e);
0673: throw e;
0674: } finally {
0675: context.finalCleanUp();
0676: // tell shutdown thread this tx is finished
0677: context.notifyFinish();
0678: }
0679: }
0680: }
0681:
0682: public void commitTransaction(Object txId)
0683: throws ResourceManagerException {
0684: assureRMReady();
0685: TransactionContext context = txInitialSaneCheckForWriting(txId);
0686: assureNotMarkedForRollback(context);
0687:
0688: // needing synchronization in order not to interfer with shutdown thread
0689: synchronized (context) {
0690: try {
0691:
0692: if (logger.isFineEnabled())
0693: logger.logFine("Committing Tx " + txId);
0694:
0695: context.status = STATUS_COMMITTING;
0696: context.saveState();
0697: context.commit();
0698: context.status = STATUS_COMMITTED;
0699: context.saveState();
0700: globalTransactions.remove(txId);
0701: context.cleanUp();
0702:
0703: if (logger.isFineEnabled())
0704: logger.logFine("Committed Tx " + txId);
0705:
0706: // any system or runtime exceptions or errors thrown in rollback means we are in deep trouble, set the dirty flag
0707: } catch (Error e) {
0708: setDirty(txId, e);
0709: throw e;
0710: } catch (RuntimeException e) {
0711: setDirty(txId, e);
0712: throw e;
0713: } catch (ResourceManagerSystemException e) {
0714: setDirty(txId, e);
0715: throw e;
0716: // like "could not upgrade lock"
0717: } catch (ResourceManagerException e) {
0718: logger.logWarning("Could not commit tx " + txId
0719: + ", rolling back instead", e);
0720: rollbackTransaction(txId);
0721: } finally {
0722: context.finalCleanUp();
0723: // tell shutdown thread this tx is finished
0724: context.notifyFinish();
0725: }
0726: }
0727: }
0728:
0729: public boolean resourceExists(Object resourceId)
0730: throws ResourceManagerException {
0731: // create temporary light weight tx
0732: Object txId;
0733: TransactionContext context;
0734: synchronized (globalTransactions) {
0735: txId = generatedUniqueTxId();
0736: if (logger.isFinerEnabled())
0737: logger.logFiner("Creating temporary light weight tx "
0738: + txId + " to check for exists");
0739: context = new TransactionContext(txId);
0740: context.isLightWeight = true;
0741: // XXX higher isolation might be needed to make sure upgrade to commit lock always works
0742: context.isolationLevel = ISOLATION_LEVEL_READ_COMMITTED;
0743: // context.isolationLevel = ISOLATION_LEVEL_REPEATABLE_READ;
0744: globalTransactions.put(txId, context);
0745: }
0746:
0747: boolean exists = resourceExists(txId, resourceId);
0748:
0749: context.freeLocks();
0750: globalTransactions.remove(txId);
0751: if (logger.isFinerEnabled())
0752: logger.logFiner("Removing temporary light weight tx "
0753: + txId);
0754:
0755: return exists;
0756: }
0757:
0758: public boolean resourceExists(Object txId, Object resourceId)
0759: throws ResourceManagerException {
0760: lockResource(resourceId, txId, true);
0761: return (getPathForRead(txId, resourceId) != null);
0762: }
0763:
0764: public void deleteResource(Object txId, Object resourceId)
0765: throws ResourceManagerException {
0766: deleteResource(txId, resourceId, true);
0767: }
0768:
0769: public void deleteResource(Object txId, Object resourceId,
0770: boolean assureOnly) throws ResourceManagerException {
0771:
0772: if (logger.isFineEnabled())
0773: logger.logFine(txId + " deleting " + resourceId);
0774:
0775: lockResource(resourceId, txId, false);
0776:
0777: if (getPathForRead(txId, resourceId) == null) {
0778: if (assureOnly) {
0779: return;
0780: }
0781: throw new ResourceManagerException("No such resource at '"
0782: + resourceId + "'", ERR_NO_SUCH_RESOURCE, txId);
0783: }
0784: String txDeletePath = getDeletePath(txId, resourceId);
0785: String mainPath = getMainPath(resourceId);
0786: try {
0787: getContext(txId).readOnly = false;
0788:
0789: // first undo change / create when there was one
0790: undoScheduledChangeOrCreate(txId, resourceId);
0791:
0792: // if there still is a file in main store, we need to schedule
0793: // a delete additionally
0794: if (FileHelper.fileExists(mainPath)) {
0795: FileHelper.createFile(txDeletePath);
0796: }
0797: } catch (IOException e) {
0798: throw new ResourceManagerSystemException(
0799: "Can not delete resource at '" + resourceId + "'",
0800: ERR_SYSTEM, txId, e);
0801: }
0802: }
0803:
0804: public void createResource(Object txId, Object resourceId)
0805: throws ResourceManagerException {
0806: createResource(txId, resourceId, true);
0807: }
0808:
0809: public void createResource(Object txId, Object resourceId,
0810: boolean assureOnly) throws ResourceManagerException {
0811:
0812: if (logger.isFineEnabled())
0813: logger.logFine(txId + " creating " + resourceId);
0814:
0815: lockResource(resourceId, txId, false);
0816:
0817: if (getPathForRead(txId, resourceId) != null) {
0818: if (assureOnly) {
0819: return;
0820: }
0821: throw new ResourceManagerException("Resource at '"
0822: + resourceId + "', already exists",
0823: ERR_RESOURCE_EXISTS, txId);
0824: }
0825:
0826: String txChangePath = getChangePath(txId, resourceId);
0827: try {
0828: getContext(txId).readOnly = false;
0829:
0830: // creation means either undoing a delete or actually scheduling a create
0831: if (!undoScheduledDelete(txId, resourceId)) {
0832: FileHelper.createFile(txChangePath);
0833: }
0834:
0835: } catch (IOException e) {
0836: throw new ResourceManagerSystemException(
0837: "Can not create resource at '" + resourceId + "'",
0838: ERR_SYSTEM, txId, e);
0839: }
0840: }
0841:
0842: public void copyResource(Object txId, Object fromResourceId,
0843: Object toResourceId, boolean overwrite)
0844: throws ResourceManagerException {
0845: if (logger.isFineEnabled())
0846: logger.logFine(txId + " copying " + fromResourceId + " to "
0847: + toResourceId);
0848:
0849: lockResource(fromResourceId, txId, true);
0850: lockResource(toResourceId, txId, false);
0851:
0852: if (resourceExists(txId, toResourceId) && !overwrite) {
0853: throw new ResourceManagerException("Resource at '"
0854: + toResourceId + "' already exists",
0855: ERR_RESOURCE_EXISTS, txId);
0856: }
0857:
0858: InputStream fromResourceStream = null;
0859: OutputStream toResourceStream = null;
0860: try {
0861: fromResourceStream = readResource(txId, fromResourceId);
0862: toResourceStream = writeResource(txId, toResourceId);
0863: FileHelper.copy(fromResourceStream, toResourceStream);
0864: } catch (IOException e) {
0865: throw new ResourceManagerException(ERR_SYSTEM, txId, e);
0866: } finally {
0867: closeOpenResource(fromResourceStream);
0868: closeOpenResource(toResourceStream);
0869: }
0870: }
0871:
0872: public void moveResource(Object txId, Object fromResourceId,
0873: Object toResourceId, boolean overwrite)
0874: throws ResourceManagerException {
0875: if (logger.isFineEnabled())
0876: logger.logFine(txId + " moving " + fromResourceId + " to "
0877: + toResourceId);
0878:
0879: lockResource(fromResourceId, txId, false);
0880: lockResource(toResourceId, txId, false);
0881:
0882: copyResource(txId, fromResourceId, toResourceId, overwrite);
0883:
0884: deleteResource(txId, fromResourceId, false);
0885: }
0886:
0887: public InputStream readResource(Object resourceId)
0888: throws ResourceManagerException {
0889: // create temporary light weight tx
0890: Object txId;
0891: synchronized (globalTransactions) {
0892: txId = generatedUniqueTxId();
0893: if (logger.isFinerEnabled())
0894: logger.logFiner("Creating temporary light weight tx "
0895: + txId + " for reading");
0896: TransactionContext context = new TransactionContext(txId);
0897: context.isLightWeight = true;
0898: // XXX higher isolation might be needed to make sure upgrade to commit lock always works
0899: context.isolationLevel = ISOLATION_LEVEL_READ_COMMITTED;
0900: // context.isolationLevel = ISOLATION_LEVEL_REPEATABLE_READ;
0901: globalTransactions.put(txId, context);
0902: }
0903:
0904: InputStream is = readResource(txId, resourceId);
0905: return is;
0906: }
0907:
0908: public InputStream readResource(Object txId, Object resourceId)
0909: throws ResourceManagerException {
0910:
0911: if (logger.isFineEnabled())
0912: logger.logFine(txId + " reading " + resourceId);
0913:
0914: lockResource(resourceId, txId, true);
0915:
0916: String resourcePath = getPathForRead(txId, resourceId);
0917: if (resourcePath == null) {
0918: throw new ResourceManagerException("No such resource at '"
0919: + resourceId + "'", ERR_NO_SUCH_RESOURCE, txId);
0920: }
0921:
0922: File file = new File(resourcePath);
0923: try {
0924: FileInputStream stream = new FileInputStream(file);
0925: getContext(txId).registerResource(stream);
0926: return new InputStreamWrapper(stream, txId, resourceId);
0927: } catch (FileNotFoundException e) {
0928: throw new ResourceManagerSystemException("File '"
0929: + resourcePath + "' does not exist", ERR_SYSTEM,
0930: txId);
0931: }
0932: }
0933:
0934: public OutputStream writeResource(Object txId, Object resourceId)
0935: throws ResourceManagerException {
0936: return writeResource(txId, resourceId, false);
0937: }
0938:
0939: public OutputStream writeResource(Object txId, Object resourceId,
0940: boolean append) throws ResourceManagerException {
0941:
0942: if (logger.isFineEnabled())
0943: logger.logFine(txId + " writing " + resourceId);
0944:
0945: lockResource(resourceId, txId, false);
0946:
0947: if (append) {
0948: String mainPath = getMainPath(resourceId);
0949: String txChangePath = getChangePath(txId, resourceId);
0950: String txDeletePath = getDeletePath(txId, resourceId);
0951:
0952: boolean changeExists = FileHelper.fileExists(txChangePath);
0953: boolean deleteExists = FileHelper.fileExists(txDeletePath);
0954: boolean mainExists = FileHelper.fileExists(mainPath);
0955:
0956: if (mainExists && !changeExists && !deleteExists) {
0957: // the read and the write path for resourceId will be different!
0958: copyResource(txId, resourceId, resourceId, true);
0959: }
0960: }
0961:
0962: String resourcePath = getPathForWrite(txId, resourceId);
0963:
0964: try {
0965: FileOutputStream stream = new FileOutputStream(
0966: resourcePath, append);
0967: TransactionContext context = getContext(txId);
0968: context.registerResource(stream);
0969: context.readOnly = false;
0970: return stream;
0971: } catch (FileNotFoundException e) {
0972: throw new ResourceManagerSystemException("File '"
0973: + resourcePath + "' does not exist", ERR_SYSTEM,
0974: txId);
0975: }
0976: }
0977:
0978: /*
0979: * --- additional public methods complementing implementation of interfaces ---
0980: *
0981: *
0982: */
0983:
0984: /**
0985: * Resets the store by deleting work <em>and</em> store directory.
0986: */
0987: public synchronized void reset() {
0988: FileHelper.removeRec(new File(storeDir));
0989: FileHelper.removeRec(new File(workDir));
0990: new File(storeDir).mkdirs();
0991: new File(workDir).mkdirs();
0992: }
0993:
0994: /**
0995: * Synchronizes persistent data with caches. Is implemented with an empty
0996: * body, but called by other methods relying on synchronization. Subclasses
0997: * that utilize caching must implement this method reasonably.
0998: *
0999: * @throws ResourceManagerSystemException if anything fatal hapened during synchonization
1000: */
1001: public synchronized void sync()
1002: throws ResourceManagerSystemException {
1003: }
1004:
1005: /**
1006: * Generates a transaction identifier unique to this resource manager. To do so
1007: * it requires this resource manager to be started.
1008: *
1009: * @return generated transaction identifier
1010: * @throws ResourceManagerSystemException if this resource manager has not been started, yet
1011: */
1012: public String generatedUniqueTxId()
1013: throws ResourceManagerSystemException {
1014: assureRMReady();
1015: String txId;
1016: synchronized (globalTransactions) {
1017: do {
1018: txId = Long.toHexString(System.currentTimeMillis())
1019: + "-" + Integer.toHexString(idCnt++);
1020: // XXX busy loop
1021: } while (getContext(txId) != null);
1022: }
1023: return txId;
1024: }
1025:
1026: /*
1027: * --- sane checks ---
1028: *
1029: *
1030: */
1031:
1032: protected void fileInitialSaneCheck(Object txId, Object path)
1033: throws ResourceManagerException {
1034: if (path == null || path.toString().length() == 0) {
1035: throw new ResourceManagerException(ERR_RESOURCEID_INVALID,
1036: txId);
1037: }
1038: }
1039:
1040: protected void assureStarted()
1041: throws ResourceManagerSystemException {
1042: if (operationMode != OPERATION_MODE_STARTED) {
1043: throw new ResourceManagerSystemException(
1044: "Resource Manager Service not started", ERR_SYSTEM,
1045: null);
1046: }
1047: }
1048:
1049: protected void assureRMReady()
1050: throws ResourceManagerSystemException {
1051: if (operationMode != OPERATION_MODE_STARTED
1052: && operationMode != OPERATION_MODE_STOPPING) {
1053: throw new ResourceManagerSystemException(
1054: "Resource Manager Service not ready", ERR_SYSTEM,
1055: null);
1056: }
1057: }
1058:
1059: protected void assureNotMarkedForRollback(TransactionContext context)
1060: throws ResourceManagerException {
1061: if (context.status == STATUS_MARKED_ROLLBACK) {
1062: throw new ResourceManagerException(ERR_MARKED_FOR_ROLLBACK,
1063: context.txId);
1064: }
1065: }
1066:
1067: protected TransactionContext txInitialSaneCheckForWriting(
1068: Object txId) throws ResourceManagerException {
1069: assureRMReady();
1070: // do not allow any further writing or commit or rollback when db is corrupt
1071: if (dirty) {
1072: throw new ResourceManagerSystemException(
1073: "Database is set to dirty, this *may* mean it is corrupt. No modifications are allowed until a recovery run has been performed!",
1074: ERR_SYSTEM, txId);
1075: }
1076: return txInitialSaneCheck(txId);
1077: }
1078:
1079: protected TransactionContext txInitialSaneCheck(Object txId)
1080: throws ResourceManagerException {
1081: assureRMReady();
1082: if (txId == null) {
1083: throw new ResourceManagerException(ERR_TXID_INVALID, txId);
1084: }
1085:
1086: TransactionContext context = getContext(txId);
1087:
1088: if (context == null) {
1089: throw new ResourceManagerException(ERR_NO_TX, txId);
1090: }
1091:
1092: return context;
1093: }
1094:
1095: /*
1096: * --- General Helpers ---
1097: *
1098: *
1099: */
1100:
1101: protected TransactionContext getContext(Object txId) {
1102: return (TransactionContext) globalTransactions.get(txId);
1103: }
1104:
1105: protected String assureLeadingSlash(Object pathObject) {
1106: String path = "";
1107: if (pathObject != null) {
1108: if (idMapper != null) {
1109: path = idMapper.getPathForId(pathObject);
1110: } else {
1111: path = pathObject.toString();
1112: }
1113: if (path.length() > 0 && path.charAt(0) != '/'
1114: && path.charAt(0) != '\\') {
1115: path = "/" + path;
1116: }
1117: }
1118: return path;
1119: }
1120:
1121: protected String getMainPath(Object path) {
1122: StringBuffer buf = new StringBuffer(storeDir.length()
1123: + path.toString().length() + 5);
1124: buf.append(storeDir).append(assureLeadingSlash(path));
1125: return buf.toString();
1126: }
1127:
1128: protected String getTransactionBaseDir(Object txId) {
1129: return workDir + '/' + txIdMapper.getPathForId(txId);
1130: }
1131:
1132: protected String getChangePath(Object txId, Object path) {
1133: String txBaseDir = getTransactionBaseDir(txId);
1134: StringBuffer buf = new StringBuffer(txBaseDir.length()
1135: + path.toString().length() + WORK_CHANGE_DIR.length()
1136: + 5);
1137: buf.append(txBaseDir).append('/').append(WORK_CHANGE_DIR)
1138: .append(assureLeadingSlash(path));
1139: return buf.toString();
1140: }
1141:
1142: protected String getDeletePath(Object txId, Object path) {
1143: String txBaseDir = getTransactionBaseDir(txId);
1144: StringBuffer buf = new StringBuffer(txBaseDir.length()
1145: + path.toString().length() + WORK_DELETE_DIR.length()
1146: + 5);
1147: buf.append(txBaseDir).append('/').append(WORK_DELETE_DIR)
1148: .append(assureLeadingSlash(path));
1149: return buf.toString();
1150: }
1151:
1152: protected boolean undoScheduledDelete(Object txId, Object resourceId)
1153: throws ResourceManagerException {
1154: String txDeletePath = getDeletePath(txId, resourceId);
1155: File deleteFile = new File(txDeletePath);
1156: if (deleteFile.exists()) {
1157: if (!deleteFile.delete()) {
1158: throw new ResourceManagerSystemException(
1159: "Failed to undo delete of '" + resourceId + "'",
1160: ERR_SYSTEM, txId);
1161: }
1162: return true;
1163: }
1164: return false;
1165: }
1166:
1167: protected boolean undoScheduledChangeOrCreate(Object txId,
1168: Object resourceId) throws ResourceManagerException {
1169: String txChangePath = getChangePath(txId, resourceId);
1170: File changeFile = new File(txChangePath);
1171: if (changeFile.exists()) {
1172: if (!changeFile.delete()) {
1173: throw new ResourceManagerSystemException(
1174: "Failed to undo change / create of '"
1175: + resourceId + "'", ERR_SYSTEM, txId);
1176: }
1177: return true;
1178: }
1179: return false;
1180: }
1181:
1182: protected String getPathForWrite(Object txId, Object resourceId)
1183: throws ResourceManagerException {
1184: try {
1185: // when we want to write, be sure to write to a local copy
1186: String txChangePath = getChangePath(txId, resourceId);
1187: if (!FileHelper.fileExists(txChangePath)) {
1188: FileHelper.createFile(txChangePath);
1189: }
1190: return txChangePath;
1191: } catch (IOException e) {
1192: throw new ResourceManagerSystemException(
1193: "Can not write to resource at '" + resourceId + "'",
1194: ERR_SYSTEM, txId, e);
1195: }
1196: }
1197:
1198: protected String getPathForRead(Object txId, Object resourceId)
1199: throws ResourceManagerException {
1200:
1201: String mainPath = getMainPath(resourceId);
1202: String txChangePath = getChangePath(txId, resourceId);
1203: String txDeletePath = getDeletePath(txId, resourceId);
1204:
1205: // now, this gets a bit complicated:
1206:
1207: boolean changeExists = FileHelper.fileExists(txChangePath);
1208: boolean deleteExists = FileHelper.fileExists(txDeletePath);
1209: boolean mainExists = FileHelper.fileExists(mainPath);
1210: boolean resourceIsDir = ((mainExists && new File(mainPath)
1211: .isDirectory()) || (changeExists && new File(
1212: txChangePath).isDirectory()));
1213: if (resourceIsDir) {
1214: logger.logWarning("Resource at '" + resourceId
1215: + "' maps to directory");
1216: }
1217:
1218: // first do some sane checks
1219:
1220: // this may never be, two cases are possible, both disallowing to have a delete together with a change
1221: // 1. first there was a change, than a delete -> at least delete file exists (when there is a file in main store)
1222: // 2. first there was a delete, than a change -> only change file exists
1223: if (!resourceIsDir && changeExists && deleteExists) {
1224: throw new ResourceManagerSystemException(
1225: "Inconsistent delete and change combination for resource at '"
1226: + resourceId + "'", ERR_TX_INCONSISTENT,
1227: txId);
1228: }
1229:
1230: // you should not have been allowed to delete a file that does not exist at all
1231: if (deleteExists && !mainExists) {
1232: throw new ResourceManagerSystemException(
1233: "Inconsistent delete for resource at '"
1234: + resourceId + "'", ERR_TX_INCONSISTENT,
1235: txId);
1236: }
1237:
1238: if (changeExists) {
1239: return txChangePath;
1240: } else if (mainExists && !deleteExists) {
1241: return mainPath;
1242: } else {
1243: return null;
1244: }
1245: }
1246:
1247: /*
1248: * --- Locking Helpers ---
1249: *
1250: *
1251: */
1252:
1253: protected int getSharedLockLevel(TransactionContext context)
1254: throws ResourceManagerException {
1255: if (context.isolationLevel == ISOLATION_LEVEL_READ_COMMITTED
1256: || context.isolationLevel == ISOLATION_LEVEL_READ_UNCOMMITTED) {
1257: return LOCK_ACCESS;
1258: } else if (context.isolationLevel == ISOLATION_LEVEL_REPEATABLE_READ
1259: || context.isolationLevel == ISOLATION_LEVEL_SERIALIZABLE) {
1260: return LOCK_SHARED;
1261: } else {
1262: return LOCK_ACCESS;
1263: }
1264: }
1265:
1266: /*
1267: * --- Resource Management ---
1268: *
1269: *
1270: */
1271:
1272: protected void registerOpenResource(Object openResource) {
1273: if (logger.isFinerEnabled())
1274: logger
1275: .logFiner("Registering open resource "
1276: + openResource);
1277: globalOpenResources.add(openResource);
1278: }
1279:
1280: protected void releaseGlobalOpenResources() {
1281: ArrayList copy;
1282: synchronized (globalOpenResources) {
1283: // XXX need to copy in order to allow removal in releaseOpenResource
1284: copy = new ArrayList(globalOpenResources);
1285: for (Iterator it = copy.iterator(); it.hasNext();) {
1286: Object stream = it.next();
1287: closeOpenResource(stream);
1288: }
1289: }
1290: }
1291:
1292: protected void closeOpenResource(Object openResource) {
1293: if (logger.isFinerEnabled())
1294: logger.logFiner("Releasing resource " + openResource);
1295: globalOpenResources.remove(openResource);
1296: if (openResource instanceof InputStream) {
1297: InputStream is = (InputStream) openResource;
1298: try {
1299: is.close();
1300: } catch (IOException e) {
1301: // do not care, as it might have been closed somewhere else, before
1302: }
1303: } else if (openResource instanceof OutputStream) {
1304: OutputStream os = (OutputStream) openResource;
1305: try {
1306: os.close();
1307: } catch (IOException e) {
1308: // do not care, as it might have been closed somewhere else, before
1309: }
1310: }
1311: }
1312:
1313: /*
1314: * --- Recovery / Shutdown Support ---
1315: *
1316: *
1317: */
1318:
1319: protected boolean rollBackOrForward() {
1320: boolean allCool = true;
1321:
1322: synchronized (globalTransactions) {
1323: ArrayList contexts = new ArrayList(globalTransactions
1324: .values());
1325: for (Iterator it = contexts.iterator(); it.hasNext();) {
1326: TransactionContext context = (TransactionContext) it
1327: .next();
1328: if (context.status == STATUS_COMMITTING) {
1329: // roll forward
1330: logger.logInfo("Rolling forward " + context.txId);
1331:
1332: try {
1333: context.commit();
1334: context.status = STATUS_COMMITTED;
1335: context.saveState();
1336: globalTransactions.remove(context.txId);
1337: context.cleanUp();
1338: } catch (ResourceManagerException e) {
1339: // this is not good, but what can we do now?
1340: allCool = false;
1341: logger.logSevere("Rolling forward of "
1342: + context.txId + " failed", e);
1343: }
1344: } else if (context.status == STATUS_COMMITTED) {
1345: logger.logInfo("Cleaning already commited "
1346: + context.txId);
1347: globalTransactions.remove(context.txId);
1348: try {
1349: context.cleanUp();
1350: } catch (ResourceManagerException e) {
1351: // this is not good, but what can we do now?
1352: allCool = false;
1353: logger.logWarning("Cleaning of " + context.txId
1354: + " failed", e);
1355: }
1356: } else {
1357: // in all other cases roll back and warn when not rollback was explicitely selected for tx
1358: if (context.status != STATUS_ROLLING_BACK
1359: && context.status != STATUS_ROLLEDBACK
1360: && context.status != STATUS_MARKED_ROLLBACK) {
1361: logger.logWarning("Irregularly rolling back "
1362: + context.txId);
1363: } else {
1364: logger.logInfo("Rolling back " + context.txId);
1365: }
1366: try {
1367: context.rollback();
1368: context.status = STATUS_ROLLEDBACK;
1369: context.saveState();
1370: globalTransactions.remove(context.txId);
1371: context.cleanUp();
1372: } catch (ResourceManagerException e) {
1373: logger.logWarning("Rolling back of "
1374: + context.txId + " failed", e);
1375: }
1376: }
1377: }
1378:
1379: }
1380: return allCool;
1381: }
1382:
1383: protected void recoverContexts() {
1384: File dir = new File(workDir);
1385: File[] files = dir.listFiles();
1386: if (files == null)
1387: return;
1388: for (int i = 0; i < files.length; i++) {
1389: File file = files[i];
1390: Object txId = txIdMapper.getIdForPath(file.getName());
1391: // recover all transactions we do not already know
1392: if (!globalTransactions.containsKey(txId)) {
1393:
1394: logger.logInfo("Recovering " + txId);
1395: TransactionContext context;
1396: try {
1397: context = new TransactionContext(txId);
1398: context.recoverState();
1399: globalTransactions.put(txId, context);
1400: } catch (ResourceManagerException e) {
1401: // this is not good, but the best we get, just log as warning
1402: logger.logWarning("Recovering of " + txId
1403: + " failed");
1404: }
1405: }
1406: }
1407: }
1408:
1409: protected boolean waitForAllTxToStop(long timeoutMSecs) {
1410: long startTime = System.currentTimeMillis();
1411:
1412: // be sure not to lock globalTransactions for too long, as we need to give
1413: // txs the chance to complete (otherwise deadlocks are very likely to occur)
1414: // instead iterate over a copy as we can be sure no new txs will be registered
1415: // after operation level has been set to stopping
1416:
1417: Collection transactionsToStop;
1418: synchronized (globalTransactions) {
1419: transactionsToStop = new ArrayList(globalTransactions
1420: .values());
1421: }
1422: for (Iterator it = transactionsToStop.iterator(); it.hasNext();) {
1423: long remainingTimeout = startTime
1424: - System.currentTimeMillis() + timeoutMSecs;
1425:
1426: if (remainingTimeout <= 0) {
1427: return false;
1428: }
1429:
1430: TransactionContext context = (TransactionContext) it.next();
1431: synchronized (context) {
1432: if (!context.finished) {
1433: logger.logInfo("Waiting for tx " + context.txId
1434: + " to finish for " + remainingTimeout
1435: + " milli seconds");
1436: }
1437: while (!context.finished && remainingTimeout > 0) {
1438: try {
1439: context.wait(remainingTimeout);
1440: } catch (InterruptedException e) {
1441: return false;
1442: }
1443: remainingTimeout = startTime
1444: - System.currentTimeMillis() + timeoutMSecs;
1445: }
1446: if (context.finished) {
1447: logger.logInfo("Tx " + context.txId + " finished");
1448: } else {
1449: logger.logWarning("Tx " + context.txId
1450: + " failed to finish in given time");
1451: }
1452: }
1453: }
1454:
1455: return (globalTransactions.size() == 0);
1456: }
1457:
1458: protected boolean shutdown(int mode, long timeoutMSecs) {
1459: switch (mode) {
1460: case SHUTDOWN_MODE_NORMAL:
1461: return waitForAllTxToStop(timeoutMSecs);
1462: case SHUTDOWN_MODE_ROLLBACK:
1463: return rollBackOrForward();
1464: case SHUTDOWN_MODE_KILL:
1465: return true;
1466: default:
1467: return false;
1468: }
1469: }
1470:
1471: protected void setDirty(Object txId, Throwable t) {
1472: logger.logSevere(
1473: "Fatal error during critical commit/rollback of transaction "
1474: + txId + ", setting database to dirty.", t);
1475: dirty = true;
1476: }
1477:
1478: /**
1479: * Inner class to hold the complete context, i.e. all information needed, for a transaction.
1480: *
1481: */
1482: protected class TransactionContext {
1483:
1484: protected Object txId;
1485: protected int status = STATUS_ACTIVE;
1486: protected int isolationLevel = DEFAULT_ISOLATION_LEVEL;
1487: protected long timeoutMSecs = getDefaultTransactionTimeout();
1488: protected long startTime;
1489: protected long commitTime = -1L;
1490: protected boolean isLightWeight = false;
1491: protected boolean readOnly = true;
1492: protected boolean finished = false;
1493:
1494: // list of streams participating in this tx
1495: private List openResources = new ArrayList();
1496:
1497: public TransactionContext(Object txId)
1498: throws ResourceManagerException {
1499: this .txId = txId;
1500: startTime = System.currentTimeMillis();
1501: }
1502:
1503: public long getRemainingTimeout() {
1504: long now = System.currentTimeMillis();
1505: return (startTime - now + timeoutMSecs);
1506: }
1507:
1508: public synchronized void init() throws ResourceManagerException {
1509: String baseDir = getTransactionBaseDir(txId);
1510: String changeDir = baseDir + "/" + WORK_CHANGE_DIR;
1511: String deleteDir = baseDir + "/" + WORK_DELETE_DIR;
1512:
1513: new File(changeDir).mkdirs();
1514: new File(deleteDir).mkdirs();
1515:
1516: saveState();
1517: }
1518:
1519: public synchronized void rollback()
1520: throws ResourceManagerException {
1521: closeResources();
1522: freeLocks();
1523: }
1524:
1525: public synchronized void commit()
1526: throws ResourceManagerException {
1527: String baseDir = getTransactionBaseDir(txId);
1528: String changeDir = baseDir + "/" + WORK_CHANGE_DIR;
1529: String deleteDir = baseDir + "/" + WORK_DELETE_DIR;
1530:
1531: closeResources();
1532: upgradeLockToCommit();
1533: try {
1534: applyDeletes(new File(deleteDir), new File(storeDir),
1535: new File(storeDir));
1536: FileHelper.moveRec(new File(changeDir), new File(
1537: storeDir));
1538: } catch (IOException e) {
1539: throw new ResourceManagerSystemException(
1540: "Commit failed", ERR_SYSTEM, txId, e);
1541: }
1542: freeLocks();
1543: commitTime = System.currentTimeMillis();
1544: }
1545:
1546: public synchronized void notifyFinish() {
1547: finished = true;
1548: notifyAll();
1549: }
1550:
1551: public synchronized void cleanUp()
1552: throws ResourceManagerException {
1553: if (!cleanUp)
1554: return; // XXX for debugging only
1555: boolean clean = true;
1556: Exception cleanException = null;
1557: String baseDir = getTransactionBaseDir(txId);
1558: FileHelper.removeRec(new File(baseDir));
1559: if (!clean) {
1560: throw new ResourceManagerSystemException(
1561: "Clean up failed due to unreleasable lock",
1562: ERR_SYSTEM, txId, cleanException);
1563: }
1564: }
1565:
1566: public synchronized void finalCleanUp()
1567: throws ResourceManagerException {
1568: closeResources();
1569: freeLocks();
1570: }
1571:
1572: public synchronized void upgradeLockToCommit()
1573: throws ResourceManagerException {
1574: for (Iterator it = lockManager.getAll(txId).iterator(); it
1575: .hasNext();) {
1576: GenericLock lock = (GenericLock) it.next();
1577: // only upgrade if we had write access
1578: if (lock.getLockLevel(txId) == LOCK_EXCLUSIVE) {
1579: try {
1580: // in case of deadlocks, make failure of non-committing tx more likely
1581: if (!lock
1582: .acquire(
1583: txId,
1584: LOCK_COMMIT,
1585: true,
1586: true,
1587: getDefaultTransactionTimeout()
1588: * DEFAULT_COMMIT_TIMEOUT_FACTOR)) {
1589: throw new ResourceManagerException(
1590: "Could not upgrade to commit lock for resource at '"
1591: + lock.getResourceId()
1592: .toString() + "'",
1593: ERR_NO_LOCK, txId);
1594: }
1595: } catch (InterruptedException e) {
1596: throw new ResourceManagerSystemException(
1597: ERR_SYSTEM, txId, e);
1598: }
1599: }
1600:
1601: }
1602: }
1603:
1604: public synchronized void freeLocks() {
1605: lockManager.releaseAll(txId);
1606: }
1607:
1608: public synchronized void closeResources() {
1609: synchronized (globalOpenResources) {
1610: for (Iterator it = openResources.iterator(); it
1611: .hasNext();) {
1612: Object stream = it.next();
1613: closeOpenResource(stream);
1614: }
1615: }
1616: }
1617:
1618: public synchronized void registerResource(Object openResource) {
1619: synchronized (globalOpenResources) {
1620: registerOpenResource(openResource);
1621: openResources.add(openResource);
1622: }
1623: }
1624:
1625: public synchronized void saveState()
1626: throws ResourceManagerException {
1627: String statePath = getTransactionBaseDir(txId) + "/"
1628: + CONTEXT_FILE;
1629: File file = new File(statePath);
1630: BufferedWriter writer = null;
1631: try {
1632: OutputStream os = new FileOutputStream(file);
1633: writer = new BufferedWriter(new OutputStreamWriter(os,
1634: DEFAULT_PARAMETER_ENCODING));
1635: writer.write(toString());
1636: } catch (FileNotFoundException e) {
1637: String msg = "Saving status information to '"
1638: + statePath + "' failed! Could not create file";
1639: logger.logSevere(msg, e);
1640: throw new ResourceManagerSystemException(msg,
1641: ERR_SYSTEM, txId, e);
1642: } catch (IOException e) {
1643: String msg = "Saving status information to '"
1644: + statePath + "' failed";
1645: logger.logSevere(msg, e);
1646: throw new ResourceManagerSystemException(msg,
1647: ERR_SYSTEM, txId, e);
1648: } finally {
1649: if (writer != null) {
1650: try {
1651: writer.close();
1652: } catch (IOException e) {
1653: }
1654:
1655: }
1656: }
1657: }
1658:
1659: public synchronized void recoverState()
1660: throws ResourceManagerException {
1661: String statePath = getTransactionBaseDir(txId) + "/"
1662: + CONTEXT_FILE;
1663: File file = new File(statePath);
1664: BufferedReader reader = null;
1665: try {
1666: InputStream is = new FileInputStream(file);
1667:
1668: reader = new BufferedReader(new InputStreamReader(is,
1669: DEFAULT_PARAMETER_ENCODING));
1670: txId = reader.readLine();
1671: status = Integer.parseInt(reader.readLine());
1672: isolationLevel = Integer.parseInt(reader.readLine());
1673: timeoutMSecs = Long.parseLong(reader.readLine());
1674: startTime = Long.parseLong(reader.readLine());
1675: } catch (FileNotFoundException e) {
1676: String msg = "Recovering status information from '"
1677: + statePath + "' failed! Could not find file";
1678: logger.logSevere(msg, e);
1679: throw new ResourceManagerSystemException(msg,
1680: ERR_SYSTEM, txId);
1681: } catch (IOException e) {
1682: String msg = "Recovering status information from '"
1683: + statePath + "' failed";
1684: logger.logSevere(msg, e);
1685: throw new ResourceManagerSystemException(msg,
1686: ERR_SYSTEM, txId, e);
1687: } catch (Throwable t) {
1688: String msg = "Recovering status information from '"
1689: + statePath + "' failed";
1690: logger.logSevere(msg, t);
1691: throw new ResourceManagerSystemException(msg,
1692: ERR_SYSTEM, txId, t);
1693: } finally {
1694: if (reader != null) {
1695: try {
1696: reader.close();
1697: } catch (IOException e) {
1698: }
1699:
1700: }
1701: }
1702: }
1703:
1704: public synchronized String toString() {
1705: StringBuffer buf = new StringBuffer();
1706: buf.append(txId).append('\n');
1707: buf.append(Integer.toString(status)).append('\n');
1708: buf.append(Integer.toString(isolationLevel)).append('\n');
1709: buf.append(Long.toString(timeoutMSecs)).append('\n');
1710: buf.append(Long.toString(startTime)).append('\n');
1711: if (debug) {
1712: buf.append("----- Lock Debug Info -----\n");
1713:
1714: for (Iterator it = lockManager.getAll(txId).iterator(); it
1715: .hasNext();) {
1716: GenericLock lock = (GenericLock) it.next();
1717: buf.append(lock.toString() + "\n");
1718: }
1719:
1720: }
1721: return buf.toString();
1722: }
1723:
1724: }
1725:
1726: private class InputStreamWrapper extends InputStream {
1727: private InputStream is;
1728: private Object txId;
1729: private Object resourceId;
1730:
1731: public InputStreamWrapper(InputStream is, Object txId,
1732: Object resourceId) {
1733: this .is = is;
1734: this .txId = txId;
1735: this .resourceId = resourceId;
1736: }
1737:
1738: public int read() throws IOException {
1739: return is.read();
1740: }
1741:
1742: public int read(byte b[]) throws IOException {
1743: return is.read(b);
1744: }
1745:
1746: public int read(byte b[], int off, int len) throws IOException {
1747: return is.read(b, off, len);
1748: }
1749:
1750: public int available() throws IOException {
1751: return is.available();
1752: }
1753:
1754: public void close() throws IOException {
1755: try {
1756: is.close();
1757: } finally {
1758: TransactionContext context;
1759: synchronized (globalTransactions) {
1760: context = getContext(txId);
1761: if (context == null) {
1762: return;
1763: }
1764: }
1765: synchronized (context) {
1766: if (context.isLightWeight) {
1767: if (logger.isFinerEnabled())
1768: logger
1769: .logFiner("Upon close of resource removing temporary light weight tx "
1770: + txId);
1771: context.freeLocks();
1772: globalTransactions.remove(txId);
1773: } else {
1774: // release access lock in order to allow other transactions to commit
1775: if (lockManager.getLevel(txId, resourceId) == LOCK_ACCESS) {
1776: if (logger.isFinerEnabled()) {
1777: logger
1778: .logFiner("Upon close of resource releasing access lock for tx "
1779: + txId
1780: + " on resource at "
1781: + resourceId);
1782: }
1783: lockManager.release(txId, resourceId);
1784: }
1785: }
1786: }
1787: }
1788: }
1789:
1790: public void mark(int readlimit) {
1791: is.mark(readlimit);
1792: }
1793:
1794: public void reset() throws IOException {
1795: is.reset();
1796: }
1797:
1798: public boolean markSupported() {
1799: return is.markSupported();
1800:
1801: }
1802:
1803: }
1804:
1805: }
|