001: /*
002: * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright
003: * notice. All rights reserved.
004: */
005: package com.tc.objectserver.managedobject;
006:
007: import com.tc.object.ObjectID;
008: import com.tc.object.SerializationUtil;
009: import com.tc.object.dna.api.DNACursor;
010: import com.tc.object.dna.api.DNAWriter;
011: import com.tc.object.dna.api.LogicalAction;
012: import com.tc.object.dna.api.PhysicalAction;
013: import com.tc.objectserver.mgmt.LogicalManagedObjectFacade;
014: import com.tc.objectserver.mgmt.ManagedObjectFacade;
015: import com.tc.util.Assert;
016:
017: import java.io.IOException;
018: import java.io.ObjectInput;
019: import java.io.ObjectOutput;
020: import java.util.Iterator;
021: import java.util.LinkedList;
022: import java.util.List;
023: import java.util.Set;
024:
025: /**
026: * Server representation of a list
027: */
028: public class QueueManagedObjectState extends LogicalManagedObjectState {
029: private static final String TAKE_LOCK_FIELD_NAME = "java.util.concurrent.LinkedBlockingQueue.takeLock";
030: private static final String PUT_LOCK_FIELD_NAME = "java.util.concurrent.LinkedBlockingQueue.putLock";
031: private static final String CAPACITY_FIELD_NAME = "java.util.concurrent.LinkedBlockingQueue.capacity";
032:
033: private ObjectID takeLockField;
034: private ObjectID putLockField;
035: private Object capacityField;
036:
037: private List references;
038:
039: QueueManagedObjectState(ObjectInput in) throws IOException {
040: super (in);
041: }
042:
043: protected QueueManagedObjectState(long classID) {
044: super (classID);
045: references = new LinkedList();
046: }
047:
048: public void apply(ObjectID objectID, DNACursor cursor,
049: BackReferences includeIDs) throws IOException {
050: while (cursor.next()) {
051: Object action = cursor.getAction();
052: if (action instanceof LogicalAction) {
053: LogicalAction logicalAction = (LogicalAction) action;
054: int method = logicalAction.getMethod();
055: Object[] params = logicalAction.getParameters();
056: applyMethod(objectID, includeIDs, method, params);
057: } else if (action instanceof PhysicalAction) {
058: PhysicalAction physicalAction = (PhysicalAction) action;
059: updateReference(objectID,
060: physicalAction.getFieldName(), physicalAction
061: .getObject(), includeIDs);
062: }
063: }
064: }
065:
066: private void updateReference(ObjectID objectID, String fieldName,
067: Object value, BackReferences includeIDs) {
068: if (TAKE_LOCK_FIELD_NAME.equals(fieldName)) {
069: takeLockField = (ObjectID) value;
070: getListener().changed(objectID, null, takeLockField);
071: includeIDs.addBackReference(takeLockField, objectID);
072: } else if (PUT_LOCK_FIELD_NAME.equals(fieldName)) {
073: putLockField = (ObjectID) value;
074: getListener().changed(objectID, null, putLockField);
075: includeIDs.addBackReference(putLockField, objectID);
076: } else if (CAPACITY_FIELD_NAME.equals(fieldName)) {
077: capacityField = value;
078: }
079: }
080:
081: public void applyMethod(ObjectID objectID,
082: BackReferences includeIDs, int method, Object[] params) {
083: switch (method) {
084: case SerializationUtil.PUT:
085: addChangeToCollector(objectID, params[0], includeIDs);
086: references.add(params[0]);
087: break;
088: case SerializationUtil.TAKE:
089: references.remove(0);
090: break;
091: case SerializationUtil.CLEAR:
092: references.clear();
093: break;
094: case SerializationUtil.REMOVE_FIRST_N:
095: int n = ((Integer) params[0]).intValue();
096: for (int i = 0; i < n; i++) {
097: references.remove(0);
098: }
099: break;
100: case SerializationUtil.REMOVE_AT:
101: int i = ((Integer) params[0]).intValue();
102: Assert.assertTrue(references.size() > i);
103: references.remove(i);
104: break;
105: default:
106: throw new AssertionError("Invalid method:" + method
107: + " state:" + this );
108: }
109: }
110:
111: // Since LinkedBlockingQueue supports partial collection, we are not adding it to back references
112: private void addChangeToCollector(ObjectID objectID,
113: Object newValue, BackReferences includeIDs) {
114: if (newValue instanceof ObjectID) {
115: getListener().changed(objectID, null, (ObjectID) newValue);
116: }
117: }
118:
119: public void addObjectReferencesTo(ManagedObjectTraverser traverser) {
120: traverser.addReachableObjectIDs(getObjectReferences());
121: }
122:
123: protected void addAllObjectReferencesTo(Set refs) {
124: addAllObjectReferencesFromIteratorTo(references.iterator(),
125: refs);
126: if (takeLockField != null) {
127: refs.add(takeLockField);
128: }
129: if (putLockField != null) {
130: refs.add(putLockField);
131: }
132: }
133:
134: public void dehydrate(ObjectID objectID, DNAWriter writer) {
135: dehydrateFields(objectID, writer);
136: dehydrateMembers(objectID, writer);
137: }
138:
139: private void dehydrateFields(ObjectID objectId, DNAWriter writer) {
140: writer.addPhysicalAction(TAKE_LOCK_FIELD_NAME, takeLockField);
141: writer.addPhysicalAction(PUT_LOCK_FIELD_NAME, putLockField);
142: writer.addPhysicalAction(CAPACITY_FIELD_NAME, capacityField);
143: }
144:
145: private void dehydrateMembers(ObjectID objectID, DNAWriter writer) {
146: for (Iterator i = references.iterator(); i.hasNext();) {
147: Object o = i.next();
148: writer.addLogicalAction(SerializationUtil.PUT,
149: new Object[] { o });
150: }
151: }
152:
153: public String toString() {
154: return "QueueManagedStateObject(" + references + ")";
155: }
156:
157: public ManagedObjectFacade createFacade(ObjectID objectID,
158: String className, int limit) {
159: final int size = references.size();
160:
161: if (limit < 0) {
162: limit = size;
163: } else {
164: limit = Math.min(limit, size);
165: }
166:
167: Object[] data = new Object[limit];
168:
169: int index = 0;
170: for (Iterator iter = references.iterator(); iter.hasNext()
171: && index < limit; index++) {
172: data[index] = iter.next();
173: }
174:
175: return LogicalManagedObjectFacade.createListInstance(objectID,
176: className, data, size);
177: }
178:
179: public byte getType() {
180: return QUEUE_TYPE;
181: }
182:
183: private void writeField(ObjectOutput out, String fieldName,
184: Object fieldValue) throws IOException {
185: out.writeUTF(fieldName);
186: if (fieldValue == null) {
187: out.writeBoolean(false);
188: } else {
189: out.writeBoolean(true);
190: if (fieldValue instanceof ObjectID) {
191: out.writeLong(((ObjectID) fieldValue).toLong());
192: } else {
193: out.writeObject(fieldValue);
194: }
195: }
196: }
197:
198: protected void basicWriteTo(ObjectOutput out) throws IOException {
199: writeField(out, TAKE_LOCK_FIELD_NAME, takeLockField);
200: writeField(out, PUT_LOCK_FIELD_NAME, putLockField);
201: writeField(out, CAPACITY_FIELD_NAME, capacityField);
202:
203: out.writeInt(references.size());
204: for (Iterator i = references.iterator(); i.hasNext();) {
205: out.writeObject(i.next());
206: }
207: }
208:
209: protected boolean basicEquals(LogicalManagedObjectState o) {
210: QueueManagedObjectState mo = (QueueManagedObjectState) o;
211: return ((takeLockField == mo.takeLockField) || (takeLockField != null && takeLockField
212: .equals(mo.takeLockField)))
213: && ((putLockField == mo.putLockField) || (putLockField != null && putLockField
214: .equals(mo.putLockField)))
215: && ((capacityField == mo.capacityField) || (capacityField != null && capacityField
216: .equals(mo.capacityField)))
217: && references.equals(mo.references);
218: }
219:
220: private static void readField(ObjectInput in,
221: QueueManagedObjectState mo) throws ClassNotFoundException,
222: IOException {
223: String fieldName = in.readUTF();
224: boolean fieldExist = in.readBoolean();
225: if (fieldExist) {
226: if (fieldName.equals(TAKE_LOCK_FIELD_NAME)) {
227: mo.takeLockField = new ObjectID(in.readLong());
228: } else if (fieldName.equals(PUT_LOCK_FIELD_NAME)) {
229: mo.putLockField = new ObjectID(in.readLong());
230: } else if (fieldName.equals(CAPACITY_FIELD_NAME)) {
231: mo.capacityField = in.readObject();
232: } else {
233: throw new AssertionError(
234: "Field not recognized in QueueManagedObjectState.readFrom().");
235: }
236: }
237: }
238:
239: static QueueManagedObjectState readFrom(ObjectInput in)
240: throws IOException, ClassNotFoundException {
241: QueueManagedObjectState mo = new QueueManagedObjectState(in);
242: readField(in, mo);
243: readField(in, mo);
244: readField(in, mo);
245: int size = in.readInt();
246: LinkedList list = new LinkedList();
247: for (int i = 0; i < size; i++) {
248: list.add(in.readObject());
249: }
250: mo.references = list;
251: return mo;
252: }
253: }
|