001: /*
002: * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright
003: * notice. All rights reserved.
004: */
005: package com.tc.object.tx;
006:
007: import EDU.oswego.cs.dl.util.concurrent.BoundedLinkedQueue;
008:
009: import com.tc.exception.TCRuntimeException;
010: import com.tc.logging.TCLogger;
011: import com.tc.logging.TCLogging;
012: import com.tc.properties.TCPropertiesImpl;
013: import com.tc.util.SequenceGenerator;
014: import com.tc.util.SequenceID;
015: import com.tc.util.Util;
016:
017: public class TransactionSequencer {
018:
019: private static final TCLogger logger = TCLogging
020: .getLogger(TransactionSequencer.class);
021:
022: private static final boolean LOGGING_ENABLED;
023: private static final int MAX_BYTE_SIZE_FOR_BATCH;
024: private static final int MAX_PENDING_BATCHES;
025: private static final long MAX_SLEEP_TIME_BEFORE_HALT;
026:
027: static {
028: // Set the values from the properties here.
029: LOGGING_ENABLED = TCPropertiesImpl.getProperties().getBoolean(
030: "l1.transactionmanager.logging.enabled");
031: MAX_BYTE_SIZE_FOR_BATCH = TCPropertiesImpl
032: .getProperties()
033: .getInt("l1.transactionmanager.maxBatchSizeInKiloBytes") * 1024;
034: MAX_PENDING_BATCHES = TCPropertiesImpl.getProperties().getInt(
035: "l1.transactionmanager.maxPendingBatches");
036: MAX_SLEEP_TIME_BEFORE_HALT = TCPropertiesImpl
037: .getProperties()
038: .getLong("l1.transactionmanager.maxSleepTimeBeforeHalt");
039: }
040:
041: private final SequenceGenerator sequence = new SequenceGenerator(1);
042: private final TransactionBatchFactory batchFactory;
043: private final BoundedLinkedQueue pendingBatches = new BoundedLinkedQueue(
044: MAX_PENDING_BATCHES);
045:
046: private ClientTransactionBatch currentBatch;
047: private int pending_size = 0;
048:
049: private int slowDownStartsAt;
050: private double sleepTimeIncrements;
051: private int txnsPerBatch = 0;
052: private boolean shutdown = false;
053:
054: public TransactionSequencer(TransactionBatchFactory batchFactory) {
055: this .batchFactory = batchFactory;
056: currentBatch = createNewBatch();
057: this .slowDownStartsAt = (int) (MAX_PENDING_BATCHES * 0.66);
058: this .sleepTimeIncrements = MAX_SLEEP_TIME_BEFORE_HALT
059: / (MAX_PENDING_BATCHES - slowDownStartsAt);
060: if (LOGGING_ENABLED)
061: log_settings();
062: }
063:
064: private void log_settings() {
065: logger.info("Max Byte Size for Batches = "
066: + MAX_BYTE_SIZE_FOR_BATCH + " Max Pending Batches = "
067: + MAX_PENDING_BATCHES);
068: logger.info("Max Sleep time = " + MAX_SLEEP_TIME_BEFORE_HALT
069: + " Slow down starts at = " + slowDownStartsAt
070: + " sleep time increments = " + sleepTimeIncrements);
071: }
072:
073: private ClientTransactionBatch createNewBatch() {
074: return batchFactory.nextBatch();
075: }
076:
077: private void addTransactionToBatch(ClientTransaction txn,
078: ClientTransactionBatch batch) {
079: batch.addTransaction(txn);
080: }
081:
082: public synchronized void addTransaction(ClientTransaction txn) {
083: if (shutdown) {
084: logger.error("Sequencer shutdown. Not committing " + txn);
085: return;
086: }
087:
088: try {
089: addTxnInternal(txn);
090: } catch (Throwable t) {
091: // logging of exceptions is done at a higher level
092: shutdown = true;
093: if (t instanceof Error) {
094: throw (Error) t;
095: }
096: if (t instanceof RuntimeException) {
097: throw (RuntimeException) t;
098: }
099: throw new RuntimeException(t);
100: }
101: }
102:
103: public synchronized void shutdown() {
104: shutdown = true;
105: }
106:
107: /**
108: * XXX::Note : There is automatic throttling built in by adding to a BoundedLinkedQueue from within a synch block
109: */
110: private void addTxnInternal(ClientTransaction txn) {
111: SequenceID sequenceID = new SequenceID(sequence
112: .getNextSequence());
113: txn.setSequenceID(sequenceID);
114: txnsPerBatch++;
115:
116: addTransactionToBatch(txn, currentBatch);
117: if (currentBatch.byteSize() > MAX_BYTE_SIZE_FOR_BATCH) {
118: put(currentBatch);
119: reconcilePendingSize();
120: if (LOGGING_ENABLED)
121: log_stats();
122: currentBatch = createNewBatch();
123: txnsPerBatch = 0;
124: }
125: throttle();
126: }
127:
128: private void throttle() {
129: int diff = pending_size - slowDownStartsAt;
130: if (diff >= 0) {
131: long sleepTime = (long) (1 + diff * sleepTimeIncrements);
132: try {
133: wait(sleepTime);
134: } catch (InterruptedException e) {
135: throw new TCRuntimeException(e);
136: }
137: }
138: }
139:
140: private void reconcilePendingSize() {
141: pending_size = pendingBatches.size();
142: }
143:
144: private void put(ClientTransactionBatch batch) {
145: try {
146: pendingBatches.put(batch);
147: } catch (InterruptedException e) {
148: throw new TCRuntimeException(e);
149: }
150: }
151:
152: private void log_stats() {
153: int size = pending_size;
154: if (size == MAX_PENDING_BATCHES) {
155: logger
156: .info("Max pending size reached !!! : Pending Batches size = "
157: + size + " TxnsInBatch = " + txnsPerBatch);
158: } else if (size % 5 == 0) {
159: logger.info("Pending Batch Size : " + size
160: + " TxnsInBatch = " + txnsPerBatch);
161: }
162: }
163:
164: private ClientTransactionBatch get() {
165: boolean isInterrupted = false;
166: ClientTransactionBatch returnValue = null;
167: while (true) {
168: try {
169: returnValue = (ClientTransactionBatch) pendingBatches
170: .poll(0);
171: break;
172: } catch (InterruptedException e) {
173: isInterrupted = true;
174: if (returnValue != null) {
175: break;
176: }
177: }
178: }
179: Util.selfInterruptIfNeeded(isInterrupted);
180: return returnValue;
181: }
182:
183: private ClientTransactionBatch peek() {
184: return (ClientTransactionBatch) pendingBatches.peek();
185: }
186:
187: public ClientTransactionBatch getNextBatch() {
188: ClientTransactionBatch batch = get();
189: if (batch != null)
190: return batch;
191: synchronized (this ) {
192: // Check again to avoid sending the txn in the wrong order
193: batch = get();
194: reconcilePendingSize();
195: notifyAll();
196: if (batch != null)
197: return batch;
198: if (!currentBatch.isEmpty()) {
199: batch = currentBatch;
200: currentBatch = createNewBatch();
201: return batch;
202: }
203: return null;
204: }
205: }
206:
207: /**
208: * Used only for testing
209: */
210: public synchronized void clear() {
211: while (get() != null) {
212: // remove all pending
213: }
214: currentBatch = createNewBatch();
215: }
216:
217: public SequenceID getNextSequenceID() {
218: ClientTransactionBatch batch = peek();
219: if (batch != null)
220: return batch.getMinTransactionSequence();
221: synchronized (this ) {
222: batch = peek();
223: if (batch != null)
224: return batch.getMinTransactionSequence();
225: if (!currentBatch.isEmpty())
226: return currentBatch.getMinTransactionSequence();
227: SequenceID currentSequenceID = new SequenceID(sequence
228: .getCurrentSequence());
229: return currentSequenceID.next();
230: }
231: }
232:
233: }
|