001: /*
002: * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright
003: * notice. All rights reserved.
004: */
005: package com.tc.object.tx;
006:
007: import com.tc.logging.TCLogger;
008: import com.tc.object.lockmanager.api.LockFlushCallback;
009: import com.tc.object.lockmanager.api.LockID;
010: import com.tc.object.msg.CompletedTransactionLowWaterMarkMessage;
011: import com.tc.object.net.DSOClientMessageChannel;
012: import com.tc.object.session.SessionID;
013: import com.tc.object.session.SessionManager;
014: import com.tc.properties.TCPropertiesImpl;
015: import com.tc.util.Assert;
016: import com.tc.util.SequenceID;
017: import com.tc.util.State;
018: import com.tc.util.TCAssertionError;
019: import com.tc.util.TCTimerImpl;
020: import com.tc.util.Util;
021:
022: import java.util.ArrayList;
023: import java.util.Arrays;
024: import java.util.Collection;
025: import java.util.Collections;
026: import java.util.HashMap;
027: import java.util.HashSet;
028: import java.util.Iterator;
029: import java.util.LinkedHashSet;
030: import java.util.List;
031: import java.util.Map;
032: import java.util.Set;
033: import java.util.Timer;
034: import java.util.TimerTask;
035: import java.util.Map.Entry;
036:
037: /**
038: * Sends off committed transactions
039: *
040: * @author steve
041: */
042: public class RemoteTransactionManagerImpl implements
043: RemoteTransactionManager {
044:
045: private static final long TIMEOUT = 30000L;
046:
047: private static final int MAX_OUTSTANDING_BATCHES = TCPropertiesImpl
048: .getProperties().getInt(
049: "l1.transactionmanager.maxOutstandingBatchSize");
050: private static final long COMPLETED_ACK_FLUSH_TIMEOUT = TCPropertiesImpl
051: .getProperties().getLong(
052: "l1.transactionmanager.completedAckFlushTimeout");
053:
054: private static final State STARTING = new State("STARTING");
055: private static final State RUNNING = new State("RUNNING");
056: private static final State PAUSED = new State("PAUSED");
057: private static final State STOP_INITIATED = new State(
058: "STOP-INITIATED");
059: private static final State STOPPED = new State("STOPPED");
060:
061: private final Object lock = new Object();
062: private final Map incompleteBatches = new HashMap();
063: private final HashMap lockFlushCallbacks = new HashMap();
064:
065: private int outStandingBatches = 0;
066: private final TCLogger logger;
067: private final TransactionBatchAccounting batchAccounting;
068: private final LockAccounting lockAccounting;
069:
070: private State status;
071: private final SessionManager sessionManager;
072: private final TransactionSequencer sequencer;
073: private final DSOClientMessageChannel channel;
074: private final Timer timer = new TCTimerImpl(
075: "RemoteTransactionManager Flusher", true);
076:
077: public RemoteTransactionManagerImpl(TCLogger logger,
078: final TransactionBatchFactory batchFactory,
079: TransactionBatchAccounting batchAccounting,
080: LockAccounting lockAccounting,
081: SessionManager sessionManager,
082: DSOClientMessageChannel channel) {
083: this .logger = logger;
084: this .batchAccounting = batchAccounting;
085: this .lockAccounting = lockAccounting;
086: this .sessionManager = sessionManager;
087: this .channel = channel;
088: this .status = RUNNING;
089: this .sequencer = new TransactionSequencer(batchFactory);
090: this .timer.schedule(new RemoteTransactionManagerTimerTask(),
091: COMPLETED_ACK_FLUSH_TIMEOUT,
092: COMPLETED_ACK_FLUSH_TIMEOUT);
093: }
094:
095: public void pause() {
096: synchronized (lock) {
097: if (isStoppingOrStopped())
098: return;
099: if (this .status == PAUSED)
100: throw new AssertionError(
101: "Attempt to pause while already paused.");
102: this .status = PAUSED;
103: }
104: }
105:
106: public void starting() {
107: synchronized (lock) {
108: if (isStoppingOrStopped())
109: return;
110: if (this .status != PAUSED)
111: throw new AssertionError(
112: "Attempt to start while not paused.");
113: this .status = STARTING;
114: }
115: }
116:
117: public void unpause() {
118: synchronized (lock) {
119: if (isStoppingOrStopped())
120: return;
121: if (this .status != STARTING)
122: throw new AssertionError(
123: "Attempt to unpause while not in starting.");
124: this .status = RUNNING;
125: lock.notifyAll();
126: }
127: }
128:
129: /**
130: * This is for testing only.
131: */
132: public void clear() {
133: synchronized (lock) {
134: sequencer.clear();
135: incompleteBatches.clear();
136: }
137: }
138:
139: /**
140: * This is for testing only.
141: */
142: public int getMaxOutStandingBatches() {
143: return MAX_OUTSTANDING_BATCHES;
144: }
145:
146: public void stopProcessing() {
147: sequencer.shutdown();
148: channel.close();
149: }
150:
151: public void stop() {
152: final long start = System.currentTimeMillis();
153: logger.debug("stop() is called on "
154: + System.identityHashCode(this ));
155: synchronized (lock) {
156: this .status = STOP_INITIATED;
157:
158: sendBatches(true, "stop()");
159:
160: int count = 10;
161: long t0 = System.currentTimeMillis();
162: if (incompleteBatches.size() != 0) {
163: try {
164: int incompleteBatchesCount = 0;
165: while (status != STOPPED
166: && (t0 + TIMEOUT * count) > System
167: .currentTimeMillis()) {
168: if (incompleteBatchesCount != incompleteBatches
169: .size()) {
170: logger
171: .debug("stop(): incompleteBatches.size() = "
172: + (incompleteBatchesCount = incompleteBatches
173: .size()));
174: }
175: lock.wait(TIMEOUT);
176: }
177: } catch (InterruptedException e) {
178: logger.warn("stop(): Interrupted " + e);
179: }
180: if (status != STOPPED) {
181: logger
182: .error("stop() : There are still UNACKed Transactions! incompleteBatches.size() = "
183: + incompleteBatches.size());
184: }
185: }
186: this .status = STOPPED;
187: }
188: logger.info("stop(): took "
189: + (System.currentTimeMillis() - start)
190: + " millis to complete");
191: }
192:
193: public void flush(LockID lockID) {
194: boolean isInterrupted = false;
195: Collection c;
196: synchronized (lock) {
197: while ((!(c = lockAccounting.getTransactionsFor(lockID))
198: .isEmpty())) {
199: try {
200: long waitTime = 15 * 1000;
201: long t0 = System.currentTimeMillis();
202: lock.wait(waitTime);
203: if ((System.currentTimeMillis() - t0) > waitTime) {
204: logger.info("Flush for " + lockID
205: + " took longer than: " + waitTime
206: + "ms. # Transactions not yet Acked = "
207: + c.size() + "\n");
208: }
209: } catch (InterruptedException e) {
210: isInterrupted = true;
211: }
212: }
213: }
214: Util.selfInterruptIfNeeded(isInterrupted);
215: }
216:
217: /* This does not block unlike flush() */
218: public boolean isTransactionsForLockFlushed(LockID lockID,
219: LockFlushCallback callback) {
220: synchronized (lock) {
221:
222: if ((lockAccounting.getTransactionsFor(lockID)).isEmpty()) {
223: // All transactions are flushed !
224: return true;
225: } else {
226: // register for call back
227: Object prev = lockFlushCallbacks.put(lockID, callback);
228: if (prev != null) {
229: // Will this scenario comeup in server restart scenario ? It should as we check for greediness in the Lock
230: // Manager before making this call
231: throw new TCAssertionError(
232: "There is already a registered call back on Lock Flush for this lock ID - "
233: + lockID);
234: }
235: return false;
236: }
237: }
238: }
239:
240: public void commit(ClientTransaction txn) {
241:
242: if (!txn.hasChangesOrNotifies())
243: throw new AssertionError(
244: "Attempt to commit an empty transaction.");
245:
246: commitInternal(txn);
247: }
248:
249: private void commitInternal(ClientTransaction txn) {
250: TransactionID txID = txn.getTransactionID();
251: if (!txn.isConcurrent()) {
252: lockAccounting
253: .add(txID, Arrays.asList(txn.getAllLockIDs()));
254: }
255:
256: long start = System.currentTimeMillis();
257: sequencer.addTransaction(txn);
258: long diff = System.currentTimeMillis() - start;
259: if (diff > 1000) {
260: logger
261: .info("WARNING ! Took more than 1000ms to add to sequencer : "
262: + diff + " ms");
263: }
264:
265: synchronized (lock) {
266: if (isStoppingOrStopped()) {
267: // Send now if stop is requested
268: sendBatches(true, "commit() : Stop initiated.");
269: }
270: waitUntilRunning();
271: sendBatches(false);
272: }
273: }
274:
275: private void sendBatches(boolean ignoreMax) {
276: sendBatches(ignoreMax, null);
277: }
278:
279: private void sendBatches(boolean ignoreMax, String message) {
280: ClientTransactionBatch batch;
281: while ((ignoreMax || canSendBatch())
282: && (batch = sequencer.getNextBatch()) != null) {
283: if (message != null) {
284: logger.debug(message + " : Sending batch containing "
285: + batch.numberOfTxns() + " Txns.");
286: }
287: sendBatch(batch, true);
288: }
289: }
290:
291: private boolean canSendBatch() {
292: return (outStandingBatches < MAX_OUTSTANDING_BATCHES);
293: }
294:
295: public void resendOutstanding() {
296: synchronized (lock) {
297: if (status != STARTING && !isStoppingOrStopped()) {
298: // formatting
299: throw new AssertionError(
300: this
301: + ": Attempt to resend incomplete batches while not starting. Status="
302: + status);
303: }
304: logger.debug("resendOutstanding()...");
305: outStandingBatches = 0;
306: List toSend = batchAccounting
307: .addIncompleteBatchIDsTo(new ArrayList());
308: if (toSend.size() == 0) {
309: sendBatches(false, " resendOutstanding()");
310: } else {
311: for (Iterator i = toSend.iterator(); i.hasNext();) {
312: TxnBatchID id = (TxnBatchID) i.next();
313: ClientTransactionBatch batch = (ClientTransactionBatch) incompleteBatches
314: .get(id);
315: if (batch == null)
316: throw new AssertionError("Unknown batch: " + id);
317: logger
318: .debug("Resending outstanding batch: "
319: + id
320: + ", "
321: + batch
322: .addTransactionIDsTo(new LinkedHashSet()));
323: sendBatch(batch, false);
324: }
325: }
326: }
327: }
328:
329: public Collection getTransactionSequenceIDs() {
330: synchronized (lock) {
331: HashSet sequenceIDs = new HashSet();
332: if (!isStoppingOrStopped() && (status != STARTING)) {
333: throw new AssertionError(
334: "Attempt to get current transaction sequence while not starting: "
335: + status);
336: } else {
337: // Add list of SequenceIDs that are going to be resent
338: List toSend = batchAccounting
339: .addIncompleteBatchIDsTo(new ArrayList());
340: for (Iterator i = toSend.iterator(); i.hasNext();) {
341: TxnBatchID id = (TxnBatchID) i.next();
342: ClientTransactionBatch batch = (ClientTransactionBatch) incompleteBatches
343: .get(id);
344: if (batch == null)
345: throw new AssertionError("Unknown batch: " + id);
346: batch.addTransactionSequenceIDsTo(sequenceIDs);
347: }
348: // Add Last next
349: SequenceID currentBatchMinSeq = sequencer
350: .getNextSequenceID();
351: Assert.assertFalse(SequenceID.NULL_ID
352: .equals(currentBatchMinSeq));
353: sequenceIDs.add(currentBatchMinSeq);
354: }
355: return sequenceIDs;
356: }
357: }
358:
359: public Collection getResentTransactionIDs() {
360: synchronized (lock) {
361: HashSet txIDs = new HashSet();
362: if (!isStoppingOrStopped() && (status != STARTING)) {
363: throw new AssertionError(
364: "Attempt to get resent transaction IDs while not starting: "
365: + status);
366: } else {
367: // Add list of TransactionIDs that are going to be resent
368: List toSend = batchAccounting
369: .addIncompleteBatchIDsTo(new ArrayList());
370: for (Iterator i = toSend.iterator(); i.hasNext();) {
371: TxnBatchID id = (TxnBatchID) i.next();
372: ClientTransactionBatch batch = (ClientTransactionBatch) incompleteBatches
373: .get(id);
374: if (batch == null)
375: throw new AssertionError("Unknown batch: " + id);
376: batch.addTransactionIDsTo(txIDs);
377: }
378: }
379: return txIDs;
380: }
381: }
382:
383: private boolean isStoppingOrStopped() {
384: return status == STOP_INITIATED || status == STOPPED;
385: }
386:
387: private void sendBatch(ClientTransactionBatch batchToSend,
388: boolean account) {
389: synchronized (lock) {
390: if (account) {
391: if (incompleteBatches.put(batchToSend
392: .getTransactionBatchID(), batchToSend) != null) {
393: // formatting
394: throw new AssertionError(
395: "Batch has already been sent!");
396: }
397: Collection txnIds = batchToSend
398: .addTransactionIDsTo(new HashSet());
399: batchAccounting.addBatch(batchToSend
400: .getTransactionBatchID(), txnIds);
401: }
402: batchToSend.send();
403: outStandingBatches++;
404: }
405: }
406:
407: // XXX:: Currently server always sends NULL BatchID
408: public void receivedBatchAcknowledgement(TxnBatchID txnBatchID) {
409: synchronized (lock) {
410: if (status == STOP_INITIATED) {
411: logger.warn(status + " : Received ACK for batch = "
412: + txnBatchID);
413: lock.notifyAll();
414: return;
415: }
416:
417: waitUntilRunning();
418: outStandingBatches--;
419: sendBatches(false);
420: lock.notifyAll();
421: }
422: }
423:
424: public void receivedAcknowledgement(SessionID sessionID,
425: TransactionID txID) {
426: Map callbacks;
427: synchronized (lock) {
428: // waitUntilRunning();
429: if (!sessionManager.isCurrentSession(sessionID)) {
430: logger.warn("Ignoring Transaction ACK for " + txID
431: + " from previous session = " + sessionID);
432: return;
433: }
434:
435: Set completedLocks = lockAccounting.acknowledge(txID);
436:
437: TxnBatchID container = batchAccounting
438: .getBatchByTransactionID(txID);
439: if (!container.isNull()) {
440: ClientTransactionBatch containingBatch = (ClientTransactionBatch) incompleteBatches
441: .get(container);
442: containingBatch.removeTransaction(txID);
443: TxnBatchID completed = batchAccounting
444: .acknowledge(txID);
445: if (!completed.isNull()) {
446: incompleteBatches.remove(completed);
447: if (status == STOP_INITIATED
448: && incompleteBatches.size() == 0) {
449: logger
450: .debug("Received ACK for the last Transaction. Moving to STOPPED state.");
451: status = STOPPED;
452: }
453: }
454: } else {
455: logger.fatal("No batch found for acknowledgement: "
456: + txID + " The batch accounting is "
457: + batchAccounting);
458: throw new AssertionError(
459: "No batch found for acknowledgement: " + txID);
460: }
461: lock.notifyAll();
462: callbacks = getLockFlushCallbacks(completedLocks);
463: }
464: fireLockFlushCallbacks(callbacks);
465: }
466:
467: private TransactionID getCompletedTransactionIDLowWaterMark() {
468: synchronized (lock) {
469: waitUntilRunning();
470: return batchAccounting.getLowWaterMark();
471: }
472: }
473:
474: /*
475: * Never fire callbacks while holding lock
476: */
477: private void fireLockFlushCallbacks(Map callbacks) {
478: if (callbacks.isEmpty())
479: return;
480: for (Iterator i = callbacks.entrySet().iterator(); i.hasNext();) {
481: Entry e = (Entry) i.next();
482: LockID lid = (LockID) e.getKey();
483: LockFlushCallback callback = (LockFlushCallback) e
484: .getValue();
485: callback.transactionsForLockFlushed(lid);
486: }
487: }
488:
489: private Map getLockFlushCallbacks(Set completedLocks) {
490: Map callbacks = Collections.EMPTY_MAP;
491: if (!completedLocks.isEmpty() && !lockFlushCallbacks.isEmpty()) {
492: for (Iterator i = completedLocks.iterator(); i.hasNext();) {
493: Object lid = i.next();
494: Object callback = lockFlushCallbacks.remove(lid);
495: if (callback != null) {
496: if (callbacks == Collections.EMPTY_MAP) {
497: callbacks = new HashMap();
498: }
499: callbacks.put(lid, callback);
500: }
501: }
502: }
503: return callbacks;
504: }
505:
506: private void waitUntilRunning() {
507: boolean isInterrupted = false;
508: while (status != RUNNING) {
509: try {
510: lock.wait();
511: } catch (InterruptedException e) {
512: isInterrupted = true;
513: }
514: }
515: Util.selfInterruptIfNeeded(isInterrupted);
516: }
517:
518: // This method exists so that both these (resending and unpausing) should happen in
519: // atomically or else there exists a race condition.
520: public void resendOutstandingAndUnpause() {
521: synchronized (lock) {
522: resendOutstanding();
523: unpause();
524: }
525: }
526:
527: private class RemoteTransactionManagerTimerTask extends TimerTask {
528:
529: public void run() {
530: try {
531: TransactionID lwm = getCompletedTransactionIDLowWaterMark();
532: if (lwm.isNull())
533: return;
534: CompletedTransactionLowWaterMarkMessage ctm = channel
535: .getCompletedTransactionLowWaterMarkMessageFactory()
536: .newCompletedTransactionLowWaterMarkMessage();
537: ctm.initialize(lwm);
538: ctm.send();
539: } catch (Exception e) {
540: logger.error("Error sending Low water mark : ", e);
541: throw new AssertionError(e);
542: }
543: }
544: }
545: }
|