001: /*
002: * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright
003: * notice. All rights reserved.
004: */
005: package com.tc.objectserver.tx;
006:
007: import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList;
008:
009: import com.tc.logging.TCLogger;
010: import com.tc.logging.TCLogging;
011: import com.tc.net.groups.NodeID;
012: import com.tc.object.ObjectID;
013: import com.tc.object.dna.api.DNA;
014: import com.tc.object.dna.impl.VersionizedDNAWrapper;
015: import com.tc.object.gtx.GlobalTransactionID;
016: import com.tc.object.gtx.GlobalTransactionManager;
017: import com.tc.object.net.ChannelStats;
018: import com.tc.object.tx.ServerTransactionID;
019: import com.tc.object.tx.TransactionID;
020: import com.tc.objectserver.api.ObjectInstanceMonitor;
021: import com.tc.objectserver.api.ObjectManager;
022: import com.tc.objectserver.core.api.ManagedObject;
023: import com.tc.objectserver.gtx.GlobalTransactionIDLowWaterMarkProvider;
024: import com.tc.objectserver.gtx.ServerGlobalTransactionManager;
025: import com.tc.objectserver.l1.api.ClientStateManager;
026: import com.tc.objectserver.l1.impl.TransactionAcknowledgeAction;
027: import com.tc.objectserver.lockmanager.api.LockManager;
028: import com.tc.objectserver.managedobject.BackReferences;
029: import com.tc.objectserver.persistence.api.PersistenceTransaction;
030: import com.tc.objectserver.persistence.api.PersistenceTransactionProvider;
031: import com.tc.objectserver.persistence.api.TransactionStore;
032: import com.tc.stats.counter.Counter;
033: import com.tc.util.Assert;
034: import com.tc.util.State;
035:
036: import java.util.ArrayList;
037: import java.util.Collection;
038: import java.util.Collections;
039: import java.util.HashMap;
040: import java.util.HashSet;
041: import java.util.Iterator;
042: import java.util.List;
043: import java.util.Map;
044: import java.util.Set;
045: import java.util.Map.Entry;
046:
047: public class ServerTransactionManagerImpl implements
048: ServerTransactionManager, ServerTransactionManagerMBean,
049: GlobalTransactionManager {
050:
051: private static final TCLogger logger = TCLogging
052: .getLogger(ServerTransactionManager.class);
053:
054: private static final State PASSIVE_MODE = new State("PASSIVE-MODE");
055: private static final State ACTIVE_MODE = new State("ACTIVE-MODE");
056:
057: // TODO::FIXME::Change this to concurrent hashmap with top level txn accounting
058: private final Map transactionAccounts = Collections
059: .synchronizedMap(new HashMap());
060: private final ClientStateManager stateManager;
061: private final ObjectManager objectManager;
062: private final ResentTransactionSequencer resentTxnSequencer;
063: private final TransactionAcknowledgeAction action;
064: private final LockManager lockManager;
065: private final List rootEventListeners = new CopyOnWriteArrayList();
066: private final List txnEventListeners = new CopyOnWriteArrayList();
067: private final GlobalTransactionIDLowWaterMarkProvider lwmProvider;
068:
069: private final Counter transactionRateCounter;
070:
071: private final ChannelStats channelStats;
072:
073: private final ServerGlobalTransactionManager gtxm;
074:
075: private final ServerTransactionLogger txnLogger;
076:
077: private volatile State state = PASSIVE_MODE;
078:
079: public ServerTransactionManagerImpl(
080: ServerGlobalTransactionManager gtxm,
081: TransactionStore transactionStore, LockManager lockManager,
082: ClientStateManager stateManager,
083: ObjectManager objectManager,
084: TransactionalObjectManager txnObjectManager,
085: TransactionAcknowledgeAction action,
086: Counter transactionRateCounter, ChannelStats channelStats,
087: ServerTransactionManagerConfig config) {
088: this .gtxm = gtxm;
089: this .lockManager = lockManager;
090: this .objectManager = objectManager;
091: this .stateManager = stateManager;
092: this .resentTxnSequencer = new ResentTransactionSequencer(this ,
093: gtxm, txnObjectManager);
094: this .action = action;
095: this .transactionRateCounter = transactionRateCounter;
096: this .channelStats = channelStats;
097: this .lwmProvider = new GlobalTransactionIDLowWaterMarkProvider(
098: this , gtxm);
099: this .txnLogger = new ServerTransactionLogger(logger, config);
100: if (config.isLoggingEnabled()) {
101: enableTransactionLogger();
102: }
103: }
104:
105: public void enableTransactionLogger() {
106: synchronized (txnLogger) {
107: removeTransactionListener(txnLogger);
108: addTransactionListener(txnLogger);
109: }
110: }
111:
112: public void disableTransactionLogger() {
113: synchronized (txnLogger) {
114: removeTransactionListener(txnLogger);
115: }
116: }
117:
118: public void dump() {
119: StringBuffer buf = new StringBuffer(
120: "ServerTransactionManager\n");
121: buf.append("transactionAccounts: " + transactionAccounts);
122: buf.append("\n/ServerTransactionManager");
123: System.err.println(buf.toString());
124: }
125:
126: /**
127: * Shutdown clients are not cleared immediately. Only on completing of all txns this is processed.
128: */
129: public void shutdownNode(final NodeID deadNodeID) {
130: boolean callBackAdded = false;
131: synchronized (transactionAccounts) {
132: TransactionAccount deadClientTA = (TransactionAccount) transactionAccounts
133: .get(deadNodeID);
134: if (deadClientTA != null) {
135: deadClientTA
136: .nodeDead(new TransactionAccount.CallBackOnComplete() {
137: public void onComplete(NodeID dead) {
138: synchronized (ServerTransactionManagerImpl.this .transactionAccounts) {
139: transactionAccounts
140: .remove(deadNodeID);
141: }
142: stateManager.shutdownNode(deadNodeID);
143: lockManager
144: .clearAllLocksFor(deadNodeID);
145: gtxm.shutdownNode(deadNodeID);
146: fireClientDisconnectedEvent(deadNodeID);
147: }
148: });
149: callBackAdded = true;
150: }
151:
152: TransactionAccount tas[] = (TransactionAccount[]) transactionAccounts
153: .values().toArray(
154: new TransactionAccount[transactionAccounts
155: .size()]);
156: for (int i = 0; i < tas.length; i++) {
157: TransactionAccount client = tas[i];
158: if (client == deadClientTA)
159: continue;
160: for (Iterator it = client.requestersWaitingFor(
161: deadNodeID).iterator(); it.hasNext();) {
162: TransactionID reqID = (TransactionID) it.next();
163: acknowledgement(client.getNodeID(), reqID,
164: deadNodeID);
165: }
166: }
167: }
168:
169: if (!callBackAdded) {
170: stateManager.shutdownNode(deadNodeID);
171: lockManager.clearAllLocksFor(deadNodeID);
172: gtxm.shutdownNode(deadNodeID);
173: fireClientDisconnectedEvent(deadNodeID);
174: }
175: }
176:
177: public void start(Set cids) {
178: synchronized (transactionAccounts) {
179: int sizeB4 = transactionAccounts.size();
180: transactionAccounts.keySet().retainAll(cids);
181: int sizeAfter = transactionAccounts.size();
182: if (sizeB4 != sizeAfter) {
183: logger.warn("Cleaned up Transaction Accounts for : "
184: + (sizeB4 - sizeAfter) + " clients");
185: }
186: }
187: // XXX:: The server could have crashed right after a client crash/disconnect before it had a chance to remove
188: // transactions from the DB. If we dont do this, then these will stick around for ever and cause low-water mark to
189: // remain the same for ever and ever.
190: // For Network enabled Active/Passive, when a passive becomes active, this will be called and the passive (now
191: // active) will correct itself.
192: gtxm.shutdownAllClientsExcept(cids);
193: fireTransactionManagerStartedEvent(cids);
194: }
195:
196: public void goToActiveMode() {
197: state = ACTIVE_MODE;
198: resentTxnSequencer.goToActiveMode();
199: lwmProvider.goToActiveMode();
200: }
201:
202: public GlobalTransactionID getLowGlobalTransactionIDWatermark() {
203: return lwmProvider.getLowGlobalTransactionIDWatermark();
204: }
205:
206: public void addWaitingForAcknowledgement(NodeID waiter,
207: TransactionID txnID, NodeID waitee) {
208: TransactionAccount ci = getTransactionAccount(waiter);
209: if (ci != null) {
210: ci.addWaitee(waitee, txnID);
211: } else {
212: logger
213: .warn("Not adding to Wating for Ack since Waiter not found in the states map: "
214: + waiter);
215: }
216: }
217:
218: // For testing
219: public boolean isWaiting(NodeID waiter, TransactionID txnID) {
220: TransactionAccount c = getTransactionAccount(waiter);
221: return c != null && c.hasWaitees(txnID);
222: }
223:
224: private void acknowledge(NodeID waiter, TransactionID txnID) {
225: final ServerTransactionID serverTxnID = new ServerTransactionID(
226: waiter, txnID);
227: fireTransactionCompleteEvent(serverTxnID);
228: if (isActive()) {
229: action.acknowledgeTransaction(serverTxnID);
230: }
231: }
232:
233: public void acknowledgement(NodeID waiter, TransactionID txnID,
234: NodeID waitee) {
235:
236: TransactionAccount transactionAccount = getTransactionAccount(waiter);
237: if (transactionAccount == null) {
238: // This can happen if an ack makes it into the system and the server crashed
239: // leading to a removed state;
240: logger
241: .warn("Waiter not found in the states map: "
242: + waiter);
243: return;
244: }
245:
246: if (transactionAccount.removeWaitee(waitee, txnID)) {
247: acknowledge(waiter, txnID);
248: }
249: }
250:
251: public void apply(ServerTransaction txn, Map objects,
252: BackReferences includeIDs,
253: ObjectInstanceMonitor instanceMonitor) {
254:
255: final ServerTransactionID stxnID = txn.getServerTransactionID();
256: final NodeID sourceID = txn.getSourceID();
257: final TransactionID txnID = txn.getTransactionID();
258: final List changes = txn.getChanges();
259:
260: GlobalTransactionID gtxID = txn.getGlobalTransactionID();
261:
262: boolean active = isActive();
263:
264: for (Iterator i = changes.iterator(); i.hasNext();) {
265: DNA orgDNA = (DNA) i.next();
266: long version = orgDNA.getVersion();
267: if (version == DNA.NULL_VERSION) {
268: Assert.assertFalse(gtxID.isNull());
269: version = gtxID.toLong();
270: }
271: DNA change = new VersionizedDNAWrapper(orgDNA, version,
272: true);
273: ManagedObject mo = (ManagedObject) objects.get(change
274: .getObjectID());
275: mo.apply(change, txnID, includeIDs, instanceMonitor,
276: !active);
277: if (active && !change.isDelta()) {
278: // Only New objects reference are added here
279: stateManager
280: .addReference(txn.getSourceID(), mo.getID());
281: }
282: }
283:
284: Map newRoots = txn.getNewRoots();
285:
286: if (newRoots.size() > 0) {
287: for (Iterator i = newRoots.entrySet().iterator(); i
288: .hasNext();) {
289: Entry entry = (Entry) i.next();
290: String rootName = (String) entry.getKey();
291: ObjectID newID = (ObjectID) entry.getValue();
292: objectManager.createRoot(rootName, newID);
293: }
294: }
295: if (active) {
296: channelStats.notifyTransaction(sourceID);
297: }
298: transactionRateCounter.increment();
299:
300: fireTransactionAppliedEvent(stxnID);
301: }
302:
303: public void skipApplyAndCommit(ServerTransaction txn) {
304: final NodeID nodeID = txn.getSourceID();
305: final TransactionID txnID = txn.getTransactionID();
306: TransactionAccount ci = getTransactionAccount(nodeID);
307: fireTransactionAppliedEvent(txn.getServerTransactionID());
308: if (ci.skipApplyAndCommit(txnID)) {
309: acknowledge(nodeID, txnID);
310: }
311: }
312:
313: public void commit(PersistenceTransactionProvider ptxp,
314: Collection objects, Map newRoots,
315: Collection appliedServerTransactionIDs) {
316: PersistenceTransaction ptx = ptxp.newTransaction();
317: release(ptx, objects, newRoots);
318: gtxm.commitAll(ptx, appliedServerTransactionIDs);
319: ptx.commit();
320: committed(appliedServerTransactionIDs);
321: }
322:
323: private void release(PersistenceTransaction ptx,
324: Collection objects, Map newRoots) {
325: // change done so now we can release the objects
326: objectManager.releaseAll(ptx, objects);
327:
328: // NOTE: important to have released all objects in the TXN before
329: // calling this event as the listeners tries to lookup for the object and blocks
330: for (Iterator i = newRoots.entrySet().iterator(); i.hasNext();) {
331: Map.Entry entry = (Entry) i.next();
332: fireRootCreatedEvent((String) entry.getKey(),
333: (ObjectID) entry.getValue());
334: }
335: }
336:
337: public void incomingTransactions(NodeID source, Set txnIDs,
338: Collection txns, boolean relayed) {
339: final boolean active = isActive();
340: TransactionAccount ci = getOrCreateTransactionAccount(source);
341: ci.incommingTransactions(txnIDs);
342: for (Iterator i = txns.iterator(); i.hasNext();) {
343: final ServerTransaction txn = (ServerTransaction) i.next();
344: final ServerTransactionID stxnID = txn
345: .getServerTransactionID();
346: final TransactionID txnID = stxnID.getClientTransactionID();
347: if (active && !relayed) {
348: ci.relayTransactionComplete(txnID);
349: } else if (!active) {
350: gtxm.createGlobalTransactionDescIfNeeded(stxnID, txn
351: .getGlobalTransactionID());
352: }
353: }
354: fireIncomingTransactionsEvent(source, txnIDs);
355: resentTxnSequencer.addTransactions(txns);
356: }
357:
358: private boolean isActive() {
359: return (state == ACTIVE_MODE);
360: }
361:
362: public void transactionsRelayed(NodeID node, Set serverTxnIDs) {
363: TransactionAccount ci = getTransactionAccount(node);
364: if (ci == null) {
365: logger
366: .warn("transactionsRelayed(): TransactionAccount not found for "
367: + node);
368: return;
369: }
370: for (Iterator i = serverTxnIDs.iterator(); i.hasNext();) {
371: final ServerTransactionID txnId = (ServerTransactionID) i
372: .next();
373: final TransactionID txnID = txnId.getClientTransactionID();
374: if (ci.relayTransactionComplete(txnID)) {
375: acknowledge(node, txnID);
376: }
377: }
378: }
379:
380: private void committed(Collection txnsIds) {
381: for (Iterator i = txnsIds.iterator(); i.hasNext();) {
382: final ServerTransactionID txnId = (ServerTransactionID) i
383: .next();
384: final NodeID waiter = txnId.getSourceID();
385: final TransactionID txnID = txnId.getClientTransactionID();
386:
387: TransactionAccount ci = getTransactionAccount(waiter);
388: if (ci != null && ci.applyCommitted(txnID)) {
389: acknowledge(waiter, txnID);
390: }
391: }
392: }
393:
394: public void broadcasted(NodeID waiter, TransactionID txnID) {
395: TransactionAccount ci = getTransactionAccount(waiter);
396:
397: if (ci != null && ci.broadcastCompleted(txnID)) {
398: acknowledge(waiter, txnID);
399: }
400: }
401:
402: private TransactionAccount getOrCreateTransactionAccount(
403: NodeID source) {
404: synchronized (transactionAccounts) {
405: TransactionAccount ta = (TransactionAccount) transactionAccounts
406: .get(source);
407: if (state == ACTIVE_MODE) {
408: if ((ta == null)
409: || (ta instanceof PassiveTransactionAccount)) {
410: Object old = transactionAccounts.put(source,
411: (ta = new TransactionAccountImpl(source)));
412: if (old != null) {
413: logger
414: .info("Transaction Account changed from : "
415: + old + " to " + ta);
416: }
417: }
418: } else {
419: if ((ta == null)
420: || (ta instanceof TransactionAccountImpl)) {
421: Object old = transactionAccounts
422: .put(
423: source,
424: (ta = new PassiveTransactionAccount(
425: source)));
426: if (old != null) {
427: logger
428: .info("Transaction Account changed from : "
429: + old + " to " + ta);
430: }
431: }
432: }
433: return ta;
434: }
435: }
436:
437: private TransactionAccount getTransactionAccount(NodeID node) {
438: return (TransactionAccount) transactionAccounts.get(node);
439: }
440:
441: public void addRootListener(
442: ServerTransactionManagerEventListener listener) {
443: if (listener == null) {
444: throw new IllegalArgumentException(
445: "listener cannot be null");
446: }
447: this .rootEventListeners.add(listener);
448: }
449:
450: private void fireRootCreatedEvent(String rootName, ObjectID id) {
451: for (Iterator iter = rootEventListeners.iterator(); iter
452: .hasNext();) {
453: try {
454: ServerTransactionManagerEventListener listener = (ServerTransactionManagerEventListener) iter
455: .next();
456: listener.rootCreated(rootName, id);
457: } catch (Exception e) {
458: if (logger.isDebugEnabled()) {
459: logger.debug(e);
460: } else {
461: logger
462: .warn("Exception in rootCreated event callback: "
463: + e.getMessage());
464: }
465: }
466: }
467: }
468:
469: public void addTransactionListener(
470: ServerTransactionListener listener) {
471: if (listener == null) {
472: throw new IllegalArgumentException(
473: "listener cannot be null");
474: }
475: this .txnEventListeners.add(listener);
476: }
477:
478: public void removeTransactionListener(
479: ServerTransactionListener listener) {
480: if (listener == null) {
481: throw new IllegalArgumentException(
482: "listener cannot be null");
483: }
484: this .txnEventListeners.remove(listener);
485: }
486:
487: public void callBackOnTxnsInSystemCompletion(
488: TxnsInSystemCompletionLister l) {
489: boolean callback = false;
490: synchronized (transactionAccounts) {
491: HashSet txnsInSystem = new HashSet();
492: for (Iterator i = transactionAccounts.entrySet().iterator(); i
493: .hasNext();) {
494: Entry entry = (Entry) i.next();
495: TransactionAccount client = (TransactionAccount) entry
496: .getValue();
497: client
498: .addAllPendingServerTransactionIDsTo(txnsInSystem);
499: }
500: if (txnsInSystem.isEmpty()) {
501: callback = true;
502: } else {
503: addTransactionListener(new TxnsInSystemCompletionListenerCallback(
504: l, txnsInSystem));
505: }
506: }
507: if (callback) {
508: l.onCompletion();
509: }
510: }
511:
512: private void fireIncomingTransactionsEvent(NodeID nodeID,
513: Set serverTxnIDs) {
514: for (Iterator iter = txnEventListeners.iterator(); iter
515: .hasNext();) {
516: try {
517: ServerTransactionListener listener = (ServerTransactionListener) iter
518: .next();
519: listener.incomingTransactions(nodeID, serverTxnIDs);
520: } catch (Exception e) {
521: logger
522: .error(
523: "Exception in Txn listener event callback: ",
524: e);
525: throw new AssertionError(e);
526: }
527: }
528: }
529:
530: private void fireTransactionCompleteEvent(ServerTransactionID stxID) {
531: for (Iterator iter = txnEventListeners.iterator(); iter
532: .hasNext();) {
533: try {
534: ServerTransactionListener listener = (ServerTransactionListener) iter
535: .next();
536: listener.transactionCompleted(stxID);
537: } catch (Exception e) {
538: logger
539: .error(
540: "Exception in Txn listener event callback: ",
541: e);
542: throw new AssertionError(e);
543: }
544: }
545: }
546:
547: private void fireTransactionAppliedEvent(ServerTransactionID stxID) {
548: for (Iterator iter = txnEventListeners.iterator(); iter
549: .hasNext();) {
550: try {
551: ServerTransactionListener listener = (ServerTransactionListener) iter
552: .next();
553: listener.transactionApplied(stxID);
554: } catch (Exception e) {
555: logger
556: .error(
557: "Exception in Txn listener event callback: ",
558: e);
559: throw new AssertionError(e);
560: }
561: }
562: }
563:
564: private void fireTransactionManagerStartedEvent(Set cids) {
565: for (Iterator iter = txnEventListeners.iterator(); iter
566: .hasNext();) {
567: try {
568: ServerTransactionListener listener = (ServerTransactionListener) iter
569: .next();
570: listener.transactionManagerStarted(cids);
571: } catch (Exception e) {
572: logger
573: .error(
574: "Exception in Txn listener event callback: ",
575: e);
576: throw new AssertionError(e);
577: }
578: }
579: }
580:
581: public void setResentTransactionIDs(NodeID source,
582: Collection transactionIDs) {
583: if (transactionIDs.isEmpty())
584: return;
585: Collection stxIDs = new ArrayList();
586: for (Iterator iter = transactionIDs.iterator(); iter.hasNext();) {
587: TransactionID txn = (TransactionID) iter.next();
588: stxIDs.add(new ServerTransactionID(source, txn));
589: }
590: fireAddResentTransactionIDsEvent(stxIDs);
591: }
592:
593: private void fireAddResentTransactionIDsEvent(Collection stxIDs) {
594: for (Iterator iter = txnEventListeners.iterator(); iter
595: .hasNext();) {
596: try {
597: ServerTransactionListener listener = (ServerTransactionListener) iter
598: .next();
599: listener.addResentServerTransactionIDs(stxIDs);
600: } catch (Exception e) {
601: logger
602: .error(
603: "Exception in Txn listener event callback: ",
604: e);
605: throw new AssertionError(e);
606: }
607: }
608: }
609:
610: private void fireClientDisconnectedEvent(NodeID deadNodeID) {
611: for (Iterator iter = txnEventListeners.iterator(); iter
612: .hasNext();) {
613: try {
614: ServerTransactionListener listener = (ServerTransactionListener) iter
615: .next();
616: listener.clearAllTransactionsFor(deadNodeID);
617: } catch (Exception e) {
618: logger
619: .error(
620: "Exception in Txn listener event callback: ",
621: e);
622: throw new AssertionError(e);
623: }
624: }
625: }
626:
627: private final class TxnsInSystemCompletionListenerCallback
628: implements ServerTransactionListener {
629:
630: private final TxnsInSystemCompletionLister callback;
631: private final HashSet txnsInSystem;
632:
633: public int count = 0;
634:
635: public TxnsInSystemCompletionListenerCallback(
636: TxnsInSystemCompletionLister callback,
637: HashSet txnsInSystem) {
638: this .callback = callback;
639: this .txnsInSystem = txnsInSystem;
640: }
641:
642: public void addResentServerTransactionIDs(Collection stxIDs) {
643: // NOP
644: }
645:
646: public void clearAllTransactionsFor(NodeID deadNode) {
647: // NOP
648: }
649:
650: public void incomingTransactions(NodeID source, Set serverTxnIDs) {
651: // NOP
652: }
653:
654: public void transactionApplied(ServerTransactionID stxID) {
655: // NOP
656: }
657:
658: public void transactionCompleted(ServerTransactionID stxID) {
659: if (txnsInSystem.remove(stxID)) {
660: if (txnsInSystem.isEmpty()) {
661: ServerTransactionManagerImpl.this
662: .removeTransactionListener(this );
663: callback.onCompletion();
664: }
665: }
666: if (count++ % 100 == 0) {
667: logger
668: .warn("TxnsInSystemCompletionLister :: Still waiting for completion of "
669: + txnsInSystem.size()
670: + " txns to call callback "
671: + callback
672: + " count = " + count);
673: }
674: }
675:
676: public void transactionManagerStarted(Set cids) {
677: // NOP
678: }
679:
680: }
681:
682: }
|