001: /*
002: * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright
003: * notice. All rights reserved.
004: */
005: package com.tc.object.msg;
006:
007: import com.tc.bytes.TCByteBuffer;
008: import com.tc.io.TCByteBufferInput;
009: import com.tc.io.TCByteBufferOutput;
010: import com.tc.io.TCSerializable;
011: import com.tc.net.groups.NodeID;
012: import com.tc.net.groups.NodeIDSerializer;
013: import com.tc.net.protocol.tcm.MessageChannel;
014: import com.tc.net.protocol.tcm.MessageMonitor;
015: import com.tc.net.protocol.tcm.TCMessageHeader;
016: import com.tc.net.protocol.tcm.TCMessageType;
017: import com.tc.object.ObjectID;
018: import com.tc.object.dmi.DmiDescriptor;
019: import com.tc.object.dna.api.DNA;
020: import com.tc.object.dna.impl.DNAImpl;
021: import com.tc.object.dna.impl.ObjectStringSerializer;
022: import com.tc.object.dna.impl.VersionizedDNAWrapper;
023: import com.tc.object.gtx.GlobalTransactionID;
024: import com.tc.object.lockmanager.api.LockContext;
025: import com.tc.object.lockmanager.api.LockID;
026: import com.tc.object.session.SessionID;
027: import com.tc.object.tx.TransactionID;
028: import com.tc.object.tx.TxnType;
029: import com.tc.util.Assert;
030:
031: import java.io.IOException;
032: import java.util.ArrayList;
033: import java.util.Arrays;
034: import java.util.Collection;
035: import java.util.HashMap;
036: import java.util.HashSet;
037: import java.util.Iterator;
038: import java.util.LinkedList;
039: import java.util.List;
040: import java.util.Map;
041: import java.util.Set;
042:
043: /**
044: * @author steve
045: */
046: public class BroadcastTransactionMessageImpl extends DSOMessageBase
047: implements BroadcastTransactionMessage {
048: private final static byte DNA_ID = 1;
049: private final static byte LOCK_ID = 2;
050: private final static byte CHANGE_ID = 3;
051: private final static byte TRANSACTION_ID = 4;
052: private final static byte COMMITTER_ID = 5;
053: private final static byte TRANSACTION_TYPE_ID = 6;
054: private final static byte GLOBAL_TRANSACTION_ID = 7;
055: private final static byte LOW_WATERMARK = 8;
056: private final static byte SERIALIZER_ID = 9;
057: private final static byte NOTIFIED = 10;
058: private final static byte LOOKUP_OBJECT_IDS = 11;
059: private final static byte ROOT_NAME_ID_PAIR = 12;
060: private final static byte DMI_ID = 13;
061:
062: private List changes = new LinkedList();
063: private List dmis = new LinkedList();
064: private Set lookupObjectIDs = new HashSet();
065: private Collection notifies = new LinkedList();
066: private Map newRoots = new HashMap();
067: private List lockIDs;
068:
069: private long changeID;
070: private TransactionID transactionID;
071: private NodeID committerID;
072: private TxnType transactionType;
073: private GlobalTransactionID globalTransactionID;
074: private GlobalTransactionID lowWatermark;
075: private ObjectStringSerializer serializer;
076:
077: public BroadcastTransactionMessageImpl(SessionID sessionID,
078: MessageMonitor monitor, TCByteBufferOutput out,
079: MessageChannel channel, TCMessageType type) {
080: super (sessionID, monitor, out, channel, type);
081: }
082:
083: public BroadcastTransactionMessageImpl(SessionID sessionID,
084: MessageMonitor monitor, MessageChannel channel,
085: TCMessageHeader header, TCByteBuffer[] data) {
086: super (sessionID, monitor, channel, header, data);
087: }
088:
089: protected void dehydrateValues() {
090: putNVPair(TRANSACTION_TYPE_ID, transactionType.getType());
091: for (Iterator i = lockIDs.iterator(); i.hasNext();) {
092: LockID lockID = (LockID) i.next();
093: putNVPair(LOCK_ID, lockID.asString());
094: }
095:
096: for (Iterator i = notifies.iterator(); i.hasNext();) {
097: LockContext notified = (LockContext) i.next();
098: putNVPair(NOTIFIED, notified);
099: }
100:
101: putNVPair(SERIALIZER_ID, serializer);
102:
103: putNVPair(CHANGE_ID, changeID);
104: putNVPair(TRANSACTION_ID, transactionID.toLong());
105: putNVPair(COMMITTER_ID, new NodeIDSerializer(committerID));
106: putNVPair(GLOBAL_TRANSACTION_ID, globalTransactionID.toLong());
107: putNVPair(LOW_WATERMARK, lowWatermark.toLong());
108:
109: for (Iterator i = changes.iterator(); i.hasNext();) {
110: DNAImpl dna = (DNAImpl) i.next();
111: putNVPair(DNA_ID, dna);
112: }
113: for (Iterator i = lookupObjectIDs.iterator(); i.hasNext();) {
114: ObjectID oid = (ObjectID) i.next();
115: putNVPair(LOOKUP_OBJECT_IDS, oid.toLong());
116: }
117: for (Iterator i = newRoots.keySet().iterator(); i.hasNext();) {
118: String key = (String) i.next();
119: ObjectID value = (ObjectID) newRoots.get(key);
120: putNVPair(ROOT_NAME_ID_PAIR, new RootIDPair(key, value));
121: }
122: for (Iterator i = dmis.iterator(); i.hasNext();) {
123: DmiDescriptor dd = (DmiDescriptor) i.next();
124: putNVPair(DMI_ID, dd);
125: }
126: }
127:
128: protected boolean hydrateValue(byte name) throws IOException {
129: switch (name) {
130: case TRANSACTION_TYPE_ID:
131: this .transactionType = TxnType.typeFor(getByteValue());
132: return true;
133: case DNA_ID:
134: this .changes.add(getObject(new DNAImpl(serializer, false)));
135: return true;
136: case SERIALIZER_ID:
137: this .serializer = (ObjectStringSerializer) getObject(new ObjectStringSerializer());
138: return true;
139: case LOCK_ID:
140: if (lockIDs == null) {
141: lockIDs = new LinkedList();
142: }
143: this .lockIDs.add(new LockID(getStringValue()));
144: return true;
145: case NOTIFIED:
146: this .notifies.add(this .getObject(new LockContext()));
147: return true;
148: case CHANGE_ID:
149: this .changeID = getLongValue();
150: return true;
151: case TRANSACTION_ID:
152: this .transactionID = new TransactionID(getLongValue());
153: return true;
154: case COMMITTER_ID:
155: this .committerID = ((NodeIDSerializer) getObject(new NodeIDSerializer()))
156: .getNodeID();
157: return true;
158: case GLOBAL_TRANSACTION_ID:
159: this .globalTransactionID = new GlobalTransactionID(
160: getLongValue());
161: return true;
162: case LOW_WATERMARK:
163: this .lowWatermark = new GlobalTransactionID(getLongValue());
164: return true;
165: case LOOKUP_OBJECT_IDS:
166: this .lookupObjectIDs.add(new ObjectID(getLongValue()));
167: return true;
168: case ROOT_NAME_ID_PAIR:
169: RootIDPair rootIDPair = (RootIDPair) getObject(new RootIDPair());
170: this .newRoots.put(rootIDPair.getRootName(), rootIDPair
171: .getRootID());
172: return true;
173: case DMI_ID:
174: DmiDescriptor dd = (DmiDescriptor) getObject(new DmiDescriptor());
175: dmis.add(dd);
176: return true;
177: default:
178: return false;
179: }
180: }
181:
182: public void initialize(List chges, Set objectIDs,
183: ObjectStringSerializer aSerializer, LockID[] lids,
184: long cid, TransactionID txID, NodeID client,
185: GlobalTransactionID gtx, TxnType txnType,
186: GlobalTransactionID lowGlobalTransactionIDWatermark,
187: Collection theNotifies, Map roots, DmiDescriptor[] dmiDescs) {
188: Assert.eval(lids.length > 0);
189: Assert.assertNotNull(txnType);
190:
191: this .changes = chges;
192: this .lockIDs = Arrays.asList(lids);
193: this .changeID = cid;
194: this .transactionID = txID;
195: this .committerID = client;
196: this .transactionType = txnType;
197: this .globalTransactionID = gtx;
198: this .lowWatermark = lowGlobalTransactionIDWatermark;
199: this .serializer = aSerializer;
200: this .notifies.addAll(theNotifies);
201: this .lookupObjectIDs.addAll(objectIDs);
202: this .newRoots.putAll(roots);
203: for (int i = 0; i < dmiDescs.length; i++) {
204: this .dmis.add(dmiDescs[i]);
205: }
206: }
207:
208: public List getLockIDs() {
209: return lockIDs;
210: }
211:
212: public TxnType getTransactionType() {
213: return transactionType;
214: }
215:
216: public Collection getObjectChanges() {
217: Collection versionizedChanges = new ArrayList(changes.size());
218: for (Iterator iter = changes.iterator(); iter.hasNext();) {
219: versionizedChanges.add(new VersionizedDNAWrapper((DNA) iter
220: .next(), globalTransactionID.toLong()));
221:
222: }
223: return versionizedChanges;
224: }
225:
226: public Set getLookupObjectIDs() {
227: return lookupObjectIDs;
228: }
229:
230: public long getChangeID() {
231: return changeID;
232: }
233:
234: public TransactionID getTransactionID() {
235: return transactionID;
236: }
237:
238: public NodeID getCommitterID() {
239: return committerID;
240: }
241:
242: public GlobalTransactionID getGlobalTransactionID() {
243: return this .globalTransactionID;
244: }
245:
246: public GlobalTransactionID getLowGlobalTransactionIDWatermark() {
247: Assert.assertNotNull(this .lowWatermark);
248: return this .lowWatermark;
249: }
250:
251: public Collection addNotifiesTo(List c) {
252: c.addAll(notifies);
253: return c;
254: }
255:
256: public void doRecycleOnRead() {
257: // dont recycle yet
258: }
259:
260: protected boolean isOutputStreamRecycled() {
261: return true;
262: }
263:
264: public void doRecycleOnWrite() {
265: // recycle only those buffers created for this message
266: recycleOutputStream();
267: }
268:
269: public Map getNewRoots() {
270: return this .newRoots;
271: }
272:
273: private static class RootIDPair implements TCSerializable {
274: private String rootName;
275: private ObjectID rootID;
276:
277: public RootIDPair() {
278: super ();
279: }
280:
281: public RootIDPair(String rootName, ObjectID rootID) {
282: this .rootName = rootName;
283: this .rootID = rootID;
284: }
285:
286: public void serializeTo(TCByteBufferOutput serialOutput) {
287: serialOutput.writeString(rootName);
288: serialOutput.writeLong(rootID.toLong());
289:
290: }
291:
292: public Object deserializeFrom(TCByteBufferInput serialInput)
293: throws IOException {
294: this .rootName = serialInput.readString();
295: this .rootID = new ObjectID(serialInput.readLong());
296: return this ;
297: }
298:
299: public ObjectID getRootID() {
300: return rootID;
301: }
302:
303: public String getRootName() {
304: return rootName;
305: }
306: }
307:
308: public List getDmiDescriptors() {
309: return dmis;
310: }
311:
312: }
|