001: /*
002: * All content copyright (c) 2003-2007 Terracotta, Inc., except as may otherwise be noted in a separate copyright
003: * notice. All rights reserved.
004: */
005: package com.tc.l2.objectserver;
006:
007: import com.tc.async.api.AddPredicate;
008: import com.tc.async.api.EventContext;
009: import com.tc.async.impl.OrderedSink;
010: import com.tc.l2.context.StateChangedEvent;
011: import com.tc.l2.ha.L2HAZapNodeRequestProcessor;
012: import com.tc.l2.msg.ObjectSyncResetMessage;
013: import com.tc.l2.msg.ObjectSyncResetMessageFactory;
014: import com.tc.l2.state.StateManager;
015: import com.tc.logging.TCLogger;
016: import com.tc.logging.TCLogging;
017: import com.tc.net.groups.GroupException;
018: import com.tc.net.groups.GroupManager;
019: import com.tc.net.groups.GroupMessage;
020: import com.tc.net.groups.GroupMessageListener;
021: import com.tc.net.groups.NodeID;
022: import com.tc.object.ObjectID;
023: import com.tc.object.dna.api.DNA;
024: import com.tc.object.dna.impl.VersionizedDNAWrapper;
025: import com.tc.object.gtx.GlobalTransactionID;
026: import com.tc.object.tx.ServerTransactionID;
027: import com.tc.objectserver.gtx.ServerGlobalTransactionManager;
028: import com.tc.objectserver.tx.ServerTransaction;
029: import com.tc.objectserver.tx.ServerTransactionManager;
030: import com.tc.util.Assert;
031: import com.tc.util.ObjectIDSet2;
032:
033: import gnu.trove.TLinkable;
034: import gnu.trove.TLinkedList;
035:
036: import java.util.ArrayList;
037: import java.util.Collection;
038: import java.util.Collections;
039: import java.util.HashMap;
040: import java.util.HashSet;
041: import java.util.IdentityHashMap;
042: import java.util.Iterator;
043: import java.util.LinkedHashMap;
044: import java.util.List;
045: import java.util.Map;
046: import java.util.Set;
047: import java.util.TreeMap;
048:
049: public class ReplicatedTransactionManagerImpl implements
050: ReplicatedTransactionManager, GroupMessageListener {
051:
052: private static final TCLogger logger = TCLogging
053: .getLogger(ReplicatedTransactionManagerImpl.class);
054:
055: private final ServerTransactionManager transactionManager;
056: private final GroupManager groupManager;
057: private final OrderedSink objectsSyncSink;
058:
059: private PassiveTransactionManager delegate;
060:
061: private final PassiveUninitializedTransactionManager passiveUninitTxnMgr = new PassiveUninitializedTransactionManager();
062: private final PassiveStandbyTransactionManager passiveStdByTxnMgr = new PassiveStandbyTransactionManager();
063: private final NullPassiveTransactionManager activeTxnMgr = new NullPassiveTransactionManager();
064:
065: private final ServerGlobalTransactionManager gtxm;
066:
067: public ReplicatedTransactionManagerImpl(GroupManager groupManager,
068: OrderedSink objectsSyncSink,
069: ServerTransactionManager transactionManager,
070: ServerGlobalTransactionManager gtxm) {
071: this .groupManager = groupManager;
072: this .objectsSyncSink = objectsSyncSink;
073: this .transactionManager = transactionManager;
074: this .gtxm = gtxm;
075: groupManager.registerForMessages(ObjectSyncResetMessage.class,
076: this );
077: this .delegate = passiveUninitTxnMgr;
078: }
079:
080: public synchronized void init(Set knownObjectIDs) {
081: if (delegate == passiveUninitTxnMgr) {
082: passiveUninitTxnMgr.addKnownObjectIDs(knownObjectIDs);
083: } else {
084: logger
085: .info("Not initing with known Ids since not in UNINITIALIED state : "
086: + knownObjectIDs.size());
087: }
088: }
089:
090: public synchronized void clearTransactionsBelowLowWaterMark(
091: GlobalTransactionID lowGlobalTransactionIDWatermark) {
092: delegate
093: .clearTransactionsBelowLowWaterMark(lowGlobalTransactionIDWatermark);
094: }
095:
096: public synchronized void addCommitedTransactions(NodeID nodeID,
097: Set txnIDs, Collection txns) {
098: delegate.addCommitedTransactions(nodeID, txnIDs, txns);
099: }
100:
101: public synchronized void addObjectSyncTransaction(
102: ServerTransaction txn) {
103: delegate.addObjectSyncTransaction(txn);
104: }
105:
106: public void messageReceived(final NodeID fromNode, GroupMessage msg) {
107: ObjectSyncResetMessage osr = (ObjectSyncResetMessage) msg;
108: Assert
109: .assertTrue(osr.getType() == ObjectSyncResetMessage.REQUEST_RESET);
110: objectsSyncSink.setAddPredicate(new AddPredicate() {
111: public boolean accept(EventContext context) {
112: GroupMessage gp = (GroupMessage) context;
113: return fromNode.equals(gp.messageFrom());
114: }
115: });
116: objectsSyncSink.clear();
117: sendOKResponse(fromNode, osr);
118: }
119:
120: private void validateResponse(NodeID nodeID,
121: ObjectSyncResetMessage msg) {
122: if (msg == null
123: || msg.getType() != ObjectSyncResetMessage.OPERATION_SUCCESS) {
124: String error = "Recd wrong response from : " + nodeID
125: + " : msg = " + msg
126: + " while requesting reset: Killing the node";
127: logger.error(error);
128: groupManager.zapNode(nodeID,
129: L2HAZapNodeRequestProcessor.PROGRAM_ERROR, error
130: + L2HAZapNodeRequestProcessor
131: .getErrorString(new Throwable()));
132: }
133: }
134:
135: private void sendOKResponse(NodeID fromNode,
136: ObjectSyncResetMessage msg) {
137: try {
138: groupManager.sendTo(fromNode, ObjectSyncResetMessageFactory
139: .createOKResponse(msg));
140: } catch (GroupException e) {
141: logger.error("Error handling message : " + msg, e);
142: }
143: }
144:
145: public void publishResetRequest(NodeID nodeID)
146: throws GroupException {
147: ObjectSyncResetMessage osr = (ObjectSyncResetMessage) groupManager
148: .sendToAndWaitForResponse(nodeID,
149: ObjectSyncResetMessageFactory
150: .createObjectSyncResetRequestMessage());
151: validateResponse(nodeID, osr);
152: }
153:
154: public synchronized void l2StateChanged(StateChangedEvent sce) {
155: if (sce.getCurrentState().equals(
156: StateManager.ACTIVE_COORDINATOR)) {
157: passiveUninitTxnMgr.clear(); // Release Memory
158: this .delegate = activeTxnMgr;
159: } else if (sce.getCurrentState().equals(
160: StateManager.PASSIVE_STANDBY)) {
161: passiveUninitTxnMgr.clear(); // Release Memory
162: this .delegate = passiveStdByTxnMgr;
163: }
164: }
165:
166: private void addIncommingTransactions(NodeID nodeID, Set txnIDs,
167: Collection txns) {
168: transactionManager.incomingTransactions(nodeID, txnIDs, txns,
169: false);
170: }
171:
172: private final class NullPassiveTransactionManager implements
173: PassiveTransactionManager {
174:
175: public void addCommitedTransactions(NodeID nodeID, Set txnIDs,
176: Collection txns) {
177: // There could still be some messages in the queue that arrives after the node becomes ACTIVE
178: logger
179: .warn("NullPassiveTransactionManager :: Ignoring commit Txn Messages from "
180: + nodeID);
181: }
182:
183: public void addObjectSyncTransaction(ServerTransaction txn) {
184: throw new AssertionError(
185: "Recd. ObjectSyncTransaction while in ACTIVE state : "
186: + txn);
187: }
188:
189: public void clearTransactionsBelowLowWaterMark(
190: GlobalTransactionID lowGlobalTransactionIDWatermark) {
191: throw new AssertionError(
192: "Recd. LowWaterMark while in ACTIVE state : "
193: + lowGlobalTransactionIDWatermark);
194: }
195: }
196:
197: private final class PassiveStandbyTransactionManager implements
198: PassiveTransactionManager {
199:
200: public void addCommitedTransactions(NodeID nodeID, Set txnIDs,
201: Collection txns) {
202: addIncommingTransactions(nodeID, txnIDs, txns);
203: }
204:
205: public void addObjectSyncTransaction(ServerTransaction txn) {
206: // XXX::NOTE:: This is possible when there are 2 or more passive servers in standby and when the active crashes.
207: // One of them will become passive and it is possible that the one became active has some objects that is missing
208: // from the other guy. So the current active is going to think that the other guy is in passive uninitialized
209: // state and send those objects. This can be ignored as long as all commit transactions are replayed.
210: logger
211: .warn("PassiveStandbyTransactionManager :: Ignoring ObjectSyncTxn Messages since already in PASSIVE-STANDBY"
212: + txn);
213: }
214:
215: public void clearTransactionsBelowLowWaterMark(
216: GlobalTransactionID lowGlobalTransactionIDWatermark) {
217: gtxm
218: .clearCommitedTransactionsBelowLowWaterMark(lowGlobalTransactionIDWatermark);
219: }
220: }
221:
222: private final class PassiveUninitializedTransactionManager
223: implements PassiveTransactionManager {
224:
225: ObjectIDSet2 existingOIDs = new ObjectIDSet2();
226: PendingChangesAccount pca = new PendingChangesAccount();
227:
228: public void addCommitedTransactions(NodeID nodeID, Set txnIDs,
229: Collection txns) {
230: Assert.assertEquals(txnIDs.size(), txns.size());
231: LinkedHashMap prunedTransactionsMap = pruneTransactions(txns);
232: Collection prunedTxns = prunedTransactionsMap.values();
233: addIncommingTransactions(nodeID, prunedTransactionsMap
234: .keySet(), prunedTxns);
235: }
236:
237: public void clearTransactionsBelowLowWaterMark(
238: GlobalTransactionID lowGlobalTransactionIDWatermark) {
239: pca
240: .clearTransactionsBelowLowWaterMark(lowGlobalTransactionIDWatermark);
241: gtxm
242: .clearCommitedTransactionsBelowLowWaterMark(lowGlobalTransactionIDWatermark);
243: }
244:
245: // TODO::Recycle msg after use. Messgaes may have to live longer than Txn acks.
246: private LinkedHashMap pruneTransactions(Collection txns) {
247: LinkedHashMap m = new LinkedHashMap();
248:
249: for (Iterator i = txns.iterator(); i.hasNext();) {
250: ServerTransaction st = (ServerTransaction) i.next();
251: List changes = st.getChanges();
252: List prunedChanges = new ArrayList(changes.size());
253: Set oids = new HashSet(changes.size());
254: Set newOids = new HashSet(changes.size());
255: for (Iterator j = changes.iterator(); j.hasNext();) {
256: DNA dna = (DNA) j.next();
257: ObjectID id = dna.getObjectID();
258: if (!dna.isDelta()) {
259: // New Object
260: if (existingOIDs.add(id)) {
261: prunedChanges.add(dna);
262: oids.add(id);
263: newOids.add(id);
264: } else {
265: // XXX::Note:: We already know about this object, ACTIVE has sent it in Object Sync Transaction
266: logger.warn("Ignoring New Object " + id
267: + "in transaction " + st
268: + " dna = " + dna
269: + " since its already present");
270: }
271: } else if (existingOIDs.contains(id)) {
272: // Already present
273: prunedChanges.add(dna);
274: oids.add(id);
275: } else {
276: // Not present
277: pca.addToPending(st, dna);
278: }
279: }
280: if (prunedChanges.size() == changes.size()) {
281: // The whole transaction could pass thru
282: m.put(st.getServerTransactionID(), st);
283: } else if (!prunedChanges.isEmpty()) {
284: // We have pruned changes
285: m.put(st.getServerTransactionID(),
286: new PrunedServerTransaction(prunedChanges,
287: st, oids, newOids));
288: }
289: }
290:
291: return m;
292: }
293:
294: public void clear() {
295: existingOIDs = new ObjectIDSet2();
296: pca.clear();
297: }
298:
299: public void addKnownObjectIDs(Set knownObjectIDs) {
300: if (existingOIDs.size() < knownObjectIDs.size()) {
301: ObjectIDSet2 old = existingOIDs;
302: existingOIDs = new ObjectIDSet2(knownObjectIDs); // This is optimizeded for ObjectIDSet2
303: existingOIDs.addAll(old);
304: } else {
305: existingOIDs.addAll(knownObjectIDs);
306: }
307: }
308:
309: public void addObjectSyncTransaction(ServerTransaction txn) {
310: Map txns = new LinkedHashMap(1);
311: ServerTransaction newTxn = createCompoundTransactionFrom(txn);
312: if (newTxn != null) {
313: txns.put(txn.getServerTransactionID(), newTxn);
314: addIncommingTransactions(txn.getSourceID(), txns
315: .keySet(), txns.values());
316: } else {
317: logger.warn("Not add Txn " + txn
318: + " to queue since all changes are ignored");
319: }
320: }
321:
322: private ServerTransaction createCompoundTransactionFrom(
323: ServerTransaction txn) {
324: List changes = txn.getChanges();
325: // XXX::NOTE:: Normally even though getChanges() returns a list, you will only find one change for each OID (Look
326: // at ClientTransactionImpl) but here we break that. But hopefully no one is depending on THAT in the system.
327: List compoundChanges = new ArrayList(changes.size() * 2);
328: Set oids = new HashSet(changes.size());
329: boolean modified = false;
330: for (Iterator i = changes.iterator(); i.hasNext();) {
331: DNA dna = (DNA) i.next();
332: ObjectID oid = dna.getObjectID();
333: if (existingOIDs.add(oid)) {
334: compoundChanges.add(dna);
335: oids.add(dna.getObjectID());
336: // Now add if there are more changes pending
337: List moreChanges = pca
338: .getAnyPendingChangesForAndClear(oid);
339: long lastVersion = Long.MIN_VALUE;
340: for (Iterator j = moreChanges.iterator(); j
341: .hasNext();) {
342: PendingRecord pr = (PendingRecord) j.next();
343: long version = pr.getGlobalTransactionID()
344: .toLong();
345: // XXX:: This should be true since we maintain the order in the List.
346: Assert.assertTrue(lastVersion < version);
347: compoundChanges.add(new VersionizedDNAWrapper(
348: pr.getChange(), version));
349: lastVersion = version;
350: modified = true;
351: }
352: } else {
353: // XXX::Note:: This is a possible condition in the 3'rd PASSIVE which is initializing when the ACTIVE crashes.
354: // The new ACTIVE might resend the same Object, but we dont want to pass it thru since technically if we
355: // didint miss any client transaction, we will have the same state. Also if the object sync transaction from
356: // the new passive arrives before the transaction containing a change to this object, then we might apply full
357: // DNA on old object. Coming to think of it, it might happen even in 1 PASSIVE case, since the ACTIVE computes
358: // the diff only after sending a few txns, which migh contain some new objects.
359: logger.warn("Ignoring ObjectSyncTransaction for "
360: + oid + " dna = " + dna
361: + " since its already present");
362: modified = true;
363: List moreChanges = pca
364: .getAnyPendingChangesForAndClear(oid);
365: Assert.assertTrue(moreChanges.isEmpty());
366: }
367: }
368: if (modified) {
369: // This name is little misleading
370: return (compoundChanges.isEmpty() ? null
371: : new PrunedServerTransaction(compoundChanges,
372: txn, oids, oids));
373: } else {
374: return txn;
375: }
376: }
377: }
378:
379: private static final class PendingChangesAccount {
380:
381: HashMap oid2Changes = new HashMap();
382: TreeMap gid2Changes = new TreeMap();
383:
384: public void addToPending(ServerTransaction st, DNA dna) {
385: PendingRecord pr = new PendingRecord(dna, st
386: .getServerTransactionID(), st
387: .getGlobalTransactionID());
388: ObjectID oid = dna.getObjectID();
389: TLinkedList pendingChangesForOid = getOrCreatePendingChangesListFor(oid);
390: pendingChangesForOid.addLast(pr);
391: IdentityHashMap pendingChangesForTxn = getOrCreatePendingChangesSetFor(st
392: .getGlobalTransactionID());
393: pendingChangesForTxn.put(pr, pr);
394: }
395:
396: public void clear() {
397: oid2Changes.clear();
398: gid2Changes.clear();
399: }
400:
401: public void clearTransactionsBelowLowWaterMark(
402: GlobalTransactionID lowWaterMark) {
403: Map lowerThanLWM = gid2Changes.headMap(lowWaterMark);
404: for (Iterator i = lowerThanLWM.values().iterator(); i
405: .hasNext();) {
406: IdentityHashMap pendingChangesForTxn = (IdentityHashMap) i
407: .next();
408: for (Iterator j = pendingChangesForTxn.keySet()
409: .iterator(); j.hasNext();) {
410: PendingRecord pr = (PendingRecord) j.next();
411: TLinkedList pendingChangesForOid = getPendingChangesListFor(pr
412: .getChange().getObjectID());
413: pendingChangesForOid.remove(pr);
414: }
415: i.remove();
416: }
417: }
418:
419: public List getAnyPendingChangesForAndClear(ObjectID oid) {
420: List pendingChangesForOid = removePendingChangesFor(oid);
421: if (pendingChangesForOid != null) {
422: for (Iterator i = pendingChangesForOid.iterator(); i
423: .hasNext();) {
424: PendingRecord pr = (PendingRecord) i.next();
425: IdentityHashMap pendingChangesForTxn = getPendingChangesSetFor(pr
426: .getGlobalTransactionID());
427: pendingChangesForTxn.remove(pr);
428: }
429: return pendingChangesForOid;
430: } else {
431: return Collections.EMPTY_LIST;
432: }
433: }
434:
435: private IdentityHashMap getPendingChangesSetFor(
436: GlobalTransactionID gid) {
437: return (IdentityHashMap) gid2Changes.get(gid);
438: }
439:
440: private TLinkedList getPendingChangesListFor(ObjectID objectID) {
441: return (TLinkedList) oid2Changes.get(objectID);
442: }
443:
444: private TLinkedList removePendingChangesFor(ObjectID oid) {
445: return (TLinkedList) oid2Changes.remove(oid);
446: }
447:
448: private IdentityHashMap getOrCreatePendingChangesSetFor(
449: GlobalTransactionID gid) {
450: IdentityHashMap m = (IdentityHashMap) gid2Changes.get(gid);
451: if (m == null) {
452: m = new IdentityHashMap();
453: gid2Changes.put(gid, m);
454: }
455: return m;
456: }
457:
458: private TLinkedList getOrCreatePendingChangesListFor(
459: ObjectID oid) {
460: TLinkedList l = (TLinkedList) oid2Changes.get(oid);
461: if (l == null) {
462: l = new TLinkedList();
463: oid2Changes.put(oid, l);
464: }
465: return l;
466: }
467:
468: }
469:
470: private static final class PendingRecord implements TLinkable {
471:
472: private TLinkable prev;
473: private TLinkable next;
474:
475: private final DNA dna;
476: private final ServerTransactionID sid;
477: private final GlobalTransactionID gid;
478:
479: public PendingRecord(DNA dna, ServerTransactionID sid,
480: GlobalTransactionID gid) {
481: this .dna = dna;
482: this .sid = sid;
483: this .gid = gid;
484: }
485:
486: public DNA getChange() {
487: return this .dna;
488: }
489:
490: public ServerTransactionID getServerTransactionID() {
491: return this .sid;
492: }
493:
494: public GlobalTransactionID getGlobalTransactionID() {
495: return this .gid;
496: }
497:
498: public TLinkable getNext() {
499: return next;
500: }
501:
502: public TLinkable getPrevious() {
503: return prev;
504: }
505:
506: public void setNext(TLinkable n) {
507: this .next = n;
508: }
509:
510: public void setPrevious(TLinkable p) {
511: this.prev = p;
512: }
513:
514: }
515:
516: }
|