001: /*
002: * Copyright (c) 2003-2006 Terracotta, Inc. All rights reserved.
003: */
004: package com.tc.objectserver.tx;
005:
006: import com.tc.logging.TCLogger;
007: import com.tc.logging.TCLogging;
008: import com.tc.util.Assert;
009:
010: import java.util.Arrays;
011: import java.util.Collection;
012: import java.util.HashSet;
013: import java.util.Iterator;
014: import java.util.LinkedList;
015: import java.util.List;
016: import java.util.Set;
017:
018: public class TransactionSequencer {
019:
020: private static final TCLogger logger = TCLogging
021: .getLogger(TransactionSequencer.class);
022:
023: private final Set pendingTxns = new HashSet();
024:
025: private final LinkedList txnQ = new LinkedList();
026: private final LinkedList blockedQ = new LinkedList();
027:
028: private final BlockedSet locks = new BlockedSet();
029: private final BlockedSet objects = new BlockedSet();
030:
031: private int txnsCount;
032: private boolean reconcile = false;
033:
034: public synchronized void addTransactions(Collection txns) {
035: if (false)
036: log_incoming(txns);
037: txnQ.addAll(txns);
038: txnsCount += txns.size();
039: }
040:
041: private void log_incoming(Collection txns) {
042: for (Iterator i = txns.iterator(); i.hasNext();) {
043: ServerTransaction txn = (ServerTransaction) i.next();
044: logger.info("Incoming : " + txn);
045: }
046: }
047:
048: public synchronized ServerTransaction getNextTxnToProcess() {
049: reconcileIfNeeded();
050: while (!txnQ.isEmpty()) {
051:
052: ServerTransaction txn = (ServerTransaction) txnQ
053: .removeFirst();
054: if (isBlocked(txn)) {
055: addBlocked(txn);
056: } else {
057: if (false)
058: log_outgoing(txn);
059: txnsCount--;
060: return txn;
061: }
062: }
063: if (false)
064: log_no_txns_to_process();
065: return null;
066: }
067:
068: private void reconcileIfNeeded() {
069: if (reconcile) {
070: // Add to begining
071: txnQ.addAll(0, blockedQ);
072: blockedQ.clear();
073: locks.clearBlocked();
074: objects.clearBlocked();
075: reconcile = false;
076: }
077: }
078:
079: private void addBlocked(ServerTransaction txn) {
080: locks.addBlocked(Arrays.asList(txn.getLockIDs()));
081: objects.addBlocked(txn.getObjectIDs());
082: blockedQ.add(txn);
083: }
084:
085: private void log_no_txns_to_process() {
086: if (txnsCount != 0) {
087: int psize = pendingTxns.size();
088: logger
089: .info("No More Txns that can be processed : txnCount = "
090: + txnsCount
091: + " and pending txns = "
092: + psize);
093: }
094: }
095:
096: private void log_outgoing(ServerTransaction txn) {
097: logger.info("Outgoing : " + txn);
098: }
099:
100: private boolean isBlocked(ServerTransaction txn) {
101: return locks.isBlocked(Arrays.asList(txn.getLockIDs()))
102: || objects.isBlocked(txn.getObjectIDs());
103: }
104:
105: public synchronized void makePending(ServerTransaction txn) {
106: locks.makePending(Arrays.asList(txn.getLockIDs()));
107: objects.makePending(txn.getObjectIDs());
108: Assert
109: .assertTrue(pendingTxns.add(txn
110: .getServerTransactionID()));
111: if (false)
112: logger.info("Make Pending : " + txn);
113: }
114:
115: public synchronized void makeUnpending(ServerTransaction txn) {
116: Assert.assertTrue(pendingTxns.remove(txn
117: .getServerTransactionID()));
118: locks.makeUnpending(Arrays.asList(txn.getLockIDs()));
119: objects.makeUnpending(txn.getObjectIDs());
120: reconcile = true;
121: if (false)
122: logger.info("Processed Pending : " + txn);
123: }
124:
125: /*
126: * Used for testing
127: */
128: boolean isPending(List txns) {
129: for (Iterator i = txns.iterator(); i.hasNext();) {
130: ServerTransaction st = (ServerTransaction) i.next();
131: if (pendingTxns.contains(st.getServerTransactionID()))
132: return true;
133: }
134: return false;
135: }
136:
137: private static final class BlockedSet {
138:
139: Set cause = new HashSet();
140: Set effect = new HashSet();
141:
142: public boolean isBlocked(Collection keys) {
143: for (Iterator i = keys.iterator(); i.hasNext();) {
144: Object o = i.next();
145: if (cause.contains(o) || effect.contains(o)) {
146: return true;
147: }
148: }
149: return false;
150: }
151:
152: public void makePending(Collection keys) {
153: cause.addAll(keys);
154: }
155:
156: public void makeUnpending(Collection keys) {
157: cause.removeAll(keys);
158: }
159:
160: public void addBlocked(Collection keys) {
161: effect.addAll(keys);
162: }
163:
164: public void clearBlocked() {
165: effect.clear();
166: }
167: }
168: }
|