001: /*
002: * All content copyright (c) 2003-2007 Terracotta, Inc., except as may otherwise be noted in a separate copyright
003: * notice. All rights reserved.
004: */
005: package com.tc.l2.msg;
006:
007: import com.tc.async.api.OrderedEventContext;
008: import com.tc.bytes.TCByteBuffer;
009: import com.tc.net.groups.AbstractGroupMessage;
010: import com.tc.net.groups.NodeID;
011: import com.tc.net.groups.NodeIDSerializer;
012: import com.tc.object.dna.impl.ObjectStringSerializer;
013: import com.tc.object.gtx.GlobalTransactionID;
014: import com.tc.object.gtx.GlobalTransactionIDGenerator;
015: import com.tc.object.tx.ServerTransactionID;
016: import com.tc.object.tx.TransactionID;
017: import com.tc.util.Assert;
018:
019: import java.io.IOException;
020: import java.io.ObjectInput;
021: import java.io.ObjectOutput;
022: import java.util.HashMap;
023: import java.util.Iterator;
024: import java.util.Map;
025: import java.util.Map.Entry;
026:
027: public class RelayedCommitTransactionMessage extends
028: AbstractGroupMessage implements OrderedEventContext,
029: GlobalTransactionIDGenerator {
030:
031: public static final int RELAYED_COMMIT_TXN_MSG_TYPE = 0;
032:
033: private TCByteBuffer[] batchData;
034: private ObjectStringSerializer serializer;
035: private Map sid2gid;
036: private NodeID nodeID;
037: private long sequenceID;
038:
039: private GlobalTransactionID lowWaterMark;
040:
041: // To make serialization happy
042: public RelayedCommitTransactionMessage() {
043: super (-1);
044: }
045:
046: public RelayedCommitTransactionMessage(NodeID nodeID,
047: TCByteBuffer[] batchData,
048: ObjectStringSerializer serializer, Map sid2gid, long seqID,
049: GlobalTransactionID lowWaterMark) {
050: super (RELAYED_COMMIT_TXN_MSG_TYPE);
051: this .nodeID = nodeID;
052: this .batchData = batchData;
053: this .serializer = serializer;
054: this .sid2gid = sid2gid;
055: this .sequenceID = seqID;
056: this .lowWaterMark = lowWaterMark;
057: }
058:
059: public NodeID getClientID() {
060: return nodeID;
061: }
062:
063: public TCByteBuffer[] getBatchData() {
064: return batchData;
065: }
066:
067: public ObjectStringSerializer getSerializer() {
068: return serializer;
069: }
070:
071: protected void basicReadExternal(int msgType, ObjectInput in)
072: throws IOException, ClassNotFoundException {
073: Assert.assertEquals(RELAYED_COMMIT_TXN_MSG_TYPE, msgType);
074: this .nodeID = NodeIDSerializer.readNodeID(in);
075: this .serializer = readObjectStringSerializer(in);
076: this .batchData = readByteBuffers(in);
077: this .sid2gid = readServerTxnIDglobalTxnIDMapping(in);
078: this .sequenceID = in.readLong();
079: this .lowWaterMark = new GlobalTransactionID(in.readLong());
080: }
081:
082: private Map readServerTxnIDglobalTxnIDMapping(ObjectInput in)
083: throws IOException {
084: int size = in.readInt();
085: Map mapping = new HashMap();
086: NodeID cid = nodeID;
087: for (int i = 0; i < size; i++) {
088: TransactionID txnid = new TransactionID(in.readLong());
089: GlobalTransactionID gid = new GlobalTransactionID(in
090: .readLong());
091: mapping.put(new ServerTransactionID(cid, txnid), gid);
092: }
093: return mapping;
094: }
095:
096: protected void basicWriteExternal(int msgType, ObjectOutput out)
097: throws IOException {
098: Assert.assertEquals(RELAYED_COMMIT_TXN_MSG_TYPE, msgType);
099: NodeIDSerializer.writeNodeID(nodeID, out);
100: writeObjectStringSerializer(out, serializer);
101: writeByteBuffers(out, batchData);
102: writeServerTxnIDGlobalTxnIDMapping(out);
103: out.writeLong(this .sequenceID);
104: out.writeLong(this .lowWaterMark.toLong());
105: }
106:
107: private void writeServerTxnIDGlobalTxnIDMapping(ObjectOutput out)
108: throws IOException {
109: out.writeInt(sid2gid.size());
110: NodeID cid = nodeID;
111: for (Iterator i = sid2gid.entrySet().iterator(); i.hasNext();) {
112: Entry e = (Entry) i.next();
113: ServerTransactionID sid = (ServerTransactionID) e.getKey();
114: Assert.assertEquals(cid, sid.getSourceID());
115: out.writeLong(sid.getClientTransactionID().toLong());
116: GlobalTransactionID gid = (GlobalTransactionID) e
117: .getValue();
118: out.writeLong(gid.toLong());
119: }
120: }
121:
122: public GlobalTransactionID getOrCreateGlobalTransactionID(
123: ServerTransactionID serverTransactionID) {
124: GlobalTransactionID gid = (GlobalTransactionID) this .sid2gid
125: .get(serverTransactionID);
126: if (gid == null) {
127: throw new AssertionError("no Mapping found for : "
128: + serverTransactionID);
129: }
130: return gid;
131: }
132:
133: public GlobalTransactionID getLowGlobalTransactionIDWatermark() {
134: return this .lowWaterMark;
135: }
136:
137: public long getSequenceID() {
138: return sequenceID;
139: }
140:
141: }
|