001: /*
002: * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright
003: * notice. All rights reserved.
004: */
005: package com.tc.object.tx;
006:
007: import com.tc.bytes.TCByteBuffer;
008: import com.tc.io.TCByteBufferOutputStream;
009: import com.tc.lang.Recyclable;
010: import com.tc.logging.TCLogger;
011: import com.tc.logging.TCLogging;
012: import com.tc.object.ObjectID;
013: import com.tc.object.change.TCChangeBuffer;
014: import com.tc.object.dmi.DmiDescriptor;
015: import com.tc.object.dna.api.DNAEncoding;
016: import com.tc.object.dna.impl.ObjectStringSerializer;
017: import com.tc.object.lockmanager.api.LockID;
018: import com.tc.object.lockmanager.api.Notify;
019: import com.tc.object.msg.CommitTransactionMessage;
020: import com.tc.object.msg.CommitTransactionMessageFactory;
021: import com.tc.util.DebugUtil;
022: import com.tc.util.SequenceID;
023:
024: import java.util.ArrayList;
025: import java.util.Collection;
026: import java.util.Iterator;
027: import java.util.LinkedHashMap;
028: import java.util.LinkedList;
029: import java.util.List;
030: import java.util.Map;
031: import java.util.Map.Entry;
032:
033: public class TransactionBatchWriter implements ClientTransactionBatch {
034: private final static TCLogger logger = TCLogging
035: .getLogger(TransactionBatchWriter.class);
036:
037: private final CommitTransactionMessageFactory commitTransactionMessageFactory;
038: private final TxnBatchID batchID;
039: private final LinkedHashMap transactionData = new LinkedHashMap();
040: private final ObjectStringSerializer serializer;
041: private final DNAEncoding encoding;
042: private int txns2Serialize = 0;
043: private final List batchDataOutputStreams = new ArrayList();
044: private short outstandingWriteCount = 0;
045: private int bytesWritten = 0;
046:
047: public TransactionBatchWriter(
048: TxnBatchID batchID,
049: ObjectStringSerializer serializer,
050: DNAEncoding encoding,
051: CommitTransactionMessageFactory commitTransactionMessageFactory) {
052: this .batchID = batchID;
053: this .encoding = encoding;
054: this .commitTransactionMessageFactory = commitTransactionMessageFactory;
055: this .serializer = serializer;
056: }
057:
058: public String toString() {
059: return super .toString() + "[" + this .batchID + ", isEmpty="
060: + isEmpty() + ", size=" + numberOfTxns()
061: + ", txns2Serialize =" + txns2Serialize + "]";
062: }
063:
064: public TxnBatchID getTransactionBatchID() {
065: return this .batchID;
066: }
067:
068: public synchronized boolean isEmpty() {
069: return transactionData.isEmpty();
070: }
071:
072: public synchronized int numberOfTxns() {
073: return transactionData.size();
074: }
075:
076: public synchronized int byteSize() {
077: return bytesWritten;
078: }
079:
080: public boolean isNull() {
081: return false;
082: }
083:
084: public synchronized void removeTransaction(TransactionID txID) {
085: TransactionDescriptor removed = (TransactionDescriptor) transactionData
086: .remove(txID);
087: if (removed == null)
088: throw new AssertionError(
089: "Attempt to remove a transaction that doesn't exist");
090: // if we get some acks from the previous instance of the server after we resend this
091: // transaction, but before we write to the network, then we dont recycle. We lose those
092: // buffers. But since it is a rare scenario we dont lose much, but this check avoid writting
093: // corrupt buffers.
094: if (outstandingWriteCount == 0)
095: removed.recycle();
096: }
097:
098: public synchronized void addTransaction(ClientTransaction txn) {
099: SequenceID sequenceID = txn.getSequenceID();
100: TCByteBufferOutputStream out = newOutputStream();
101:
102: // /////////////////////////////////////////////////////////////////////////////////////////
103: // If you're modifying this format, you'll need to update
104: // TransactionBatchReader as well //
105: // /////////////////////////////////////////////////////////////////////////////////////////
106:
107: out.writeLong(txn.getTransactionID().toLong());
108: out.writeByte(txn.getTransactionType().getType());
109: SequenceID sid = txn.getSequenceID();
110: if (sid.isNull())
111: throw new AssertionError("SequenceID is null: " + txn);
112: out.writeLong(sid.toLong());
113:
114: LockID[] locks = txn.getAllLockIDs();
115: out.writeInt(locks.length);
116: for (int i = 0, n = locks.length; i < n; i++) {
117: out.writeString(locks[i].asString());
118: }
119:
120: Map newRoots = txn.getNewRoots();
121: out.writeInt(newRoots.size());
122: for (Iterator i = newRoots.entrySet().iterator(); i.hasNext();) {
123: Entry entry = (Entry) i.next();
124: String name = (String) entry.getKey();
125: ObjectID id = (ObjectID) entry.getValue();
126: out.writeString(name);
127: out.writeLong(id.toLong());
128: }
129:
130: List notifies = txn.addNotifiesTo(new LinkedList());
131: out.writeInt(notifies.size());
132: for (Iterator i = notifies.iterator(); i.hasNext();) {
133: Notify n = (Notify) i.next();
134: n.serializeTo(out);
135: }
136:
137: List dmis = txn.getDmiDescriptors();
138: out.writeInt(dmis.size());
139: for (Iterator i = dmis.iterator(); i.hasNext();) {
140: DmiDescriptor dd = (DmiDescriptor) i.next();
141: dd.serializeTo(out);
142: }
143:
144: Map changes = txn.getChangeBuffers();
145: out.writeInt(changes.size());
146: for (Iterator i = changes.values().iterator(); i.hasNext();) {
147: TCChangeBuffer buffer = (TCChangeBuffer) i.next();
148: buffer.writeTo(out, serializer, encoding);
149: }
150:
151: bytesWritten += out.getBytesWritten();
152: transactionData.put(txn.getTransactionID(),
153: new TransactionDescriptor(sequenceID, out.toArray(),
154: txn.getReferencesOfObjectsInTxn()));
155: if (DebugUtil.DEBUG) {
156: logger.info("Add transaction " + txn.getTransactionID()
157: + " sequenceID: " + sequenceID + " bytes written: "
158: + out.getBytesWritten()
159: + " aggregate bytes written: " + bytesWritten);
160: }
161: }
162:
163: // Called from CommitTransactionMessageImpl
164: public synchronized TCByteBuffer[] getData() {
165: outstandingWriteCount++;
166: TCByteBufferOutputStream out = newOutputStream();
167: writeHeader(out);
168: for (Iterator i = transactionData.values().iterator(); i
169: .hasNext();) {
170: TransactionDescriptor td = ((TransactionDescriptor) i
171: .next());
172: TCByteBuffer[] data = td.getData();
173: out.write(data);
174: }
175: batchDataOutputStreams.add(out);
176:
177: // System.err.println("Batch size: " + out.getBytesWritten() + ", # TXNs = " + numberOfTxns());
178:
179: return out.toArray();
180: }
181:
182: private void writeHeader(TCByteBufferOutputStream out) {
183: out.writeLong(this .batchID.toLong());
184: out.writeInt(numberOfTxns());
185: }
186:
187: private TCByteBufferOutputStream newOutputStream() {
188: TCByteBufferOutputStream out = new TCByteBufferOutputStream(32,
189: 4096, false);
190: return out;
191: }
192:
193: public synchronized void send() {
194: CommitTransactionMessage msg = this .commitTransactionMessageFactory
195: .newCommitTransactionMessage();
196: msg.setBatch(this , serializer);
197: msg.send();
198: }
199:
200: public synchronized Collection addTransactionIDsTo(Collection c) {
201: c.addAll(transactionData.keySet());
202: return c;
203: }
204:
205: public synchronized SequenceID getMinTransactionSequence() {
206: return transactionData.isEmpty() ? SequenceID.NULL_ID
207: : ((TransactionDescriptor) transactionData.values()
208: .iterator().next()).getSequenceID();
209: }
210:
211: public Collection addTransactionSequenceIDsTo(Collection sequenceIDs) {
212: for (Iterator i = transactionData.values().iterator(); i
213: .hasNext();) {
214: TransactionDescriptor td = ((TransactionDescriptor) i
215: .next());
216: sequenceIDs.add(td.getSequenceID());
217: }
218: return sequenceIDs;
219: }
220:
221: // Called from CommitTransactionMessageImpl recycle on write.
222: public synchronized void recycle() {
223: for (Iterator iter = batchDataOutputStreams.iterator(); iter
224: .hasNext();) {
225: TCByteBufferOutputStream buffer = (TCByteBufferOutputStream) iter
226: .next();
227: buffer.recycle();
228: }
229: batchDataOutputStreams.clear();
230: outstandingWriteCount--;
231: }
232:
233: public synchronized String dump() {
234: StringBuffer sb = new StringBuffer(
235: "TransactionBatchWriter = { \n");
236: for (Iterator i = transactionData.entrySet().iterator(); i
237: .hasNext();) {
238: Map.Entry entry = (Entry) i.next();
239: sb.append(entry.getKey()).append(" = ");
240: sb
241: .append(((TransactionDescriptor) entry.getValue())
242: .dump());
243: sb.append("\n");
244: }
245: return sb.append(" } ").toString();
246: }
247:
248: /**
249: * This is for testing only.
250: */
251: public synchronized void wait4AllTxns2Serialize() {
252: while (txns2Serialize != 0) {
253: try {
254: wait(2000);
255: } catch (InterruptedException e) {
256: throw new AssertionError(e);
257: }
258: }
259: }
260:
261: private static final class TransactionDescriptor implements
262: Recyclable {
263:
264: final SequenceID sequenceID;
265: final TCByteBuffer[] data;
266: // Maintaining hard references so that it doesnt get GCed on us
267: private final Collection references;
268:
269: TransactionDescriptor(SequenceID sequenceID,
270: TCByteBuffer[] data, Collection references) {
271: this .sequenceID = sequenceID;
272: this .data = data;
273: this .references = references;
274: }
275:
276: public String dump() {
277: return " { " + sequenceID + " , Objects in Txn = "
278: + references.size() + " }";
279: }
280:
281: SequenceID getSequenceID() {
282: return this .sequenceID;
283: }
284:
285: TCByteBuffer[] getData() {
286: return data;
287: }
288:
289: public void recycle() {
290: for (int i = 0; i < data.length; i++) {
291: data[i].recycle();
292: }
293: }
294: }
295:
296: }
|