001: /*
002: * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright
003: * notice. All rights reserved.
004: */
005: package com.tc.object.applicator;
006:
007: import com.tc.exception.TCRuntimeException;
008: import com.tc.logging.TCLogger;
009: import com.tc.logging.TCLogging;
010: import com.tc.object.ClientObjectManager;
011: import com.tc.object.ObjectID;
012: import com.tc.object.SerializationUtil;
013: import com.tc.object.TCObject;
014: import com.tc.object.TraversedReferences;
015: import com.tc.object.bytecode.ByteCodeUtil;
016: import com.tc.object.bytecode.Manageable;
017: import com.tc.object.dna.api.DNA;
018: import com.tc.object.dna.api.DNACursor;
019: import com.tc.object.dna.api.DNAWriter;
020: import com.tc.object.dna.api.DNAEncoding;
021: import com.tc.object.dna.api.LogicalAction;
022: import com.tc.object.dna.api.PhysicalAction;
023: import com.tc.object.tx.optimistic.OptimisticTransactionManager;
024: import com.tc.object.tx.optimistic.TCObjectClone;
025: import com.tc.util.Assert;
026: import com.tc.util.FieldUtils;
027:
028: import java.io.IOException;
029: import java.lang.reflect.Field;
030: import java.lang.reflect.InvocationTargetException;
031: import java.lang.reflect.Method;
032: import java.util.IdentityHashMap;
033: import java.util.Iterator;
034: import java.util.Map;
035: import java.util.Queue;
036: import java.util.concurrent.LinkedBlockingQueue;
037:
038: public class LinkedBlockingQueueApplicator extends BaseApplicator {
039: private static final TCLogger logger = TCLogging
040: .getLogger(ListApplicator.class);
041: private static final String LINKED_BLOCKING_QUEUE_FIELD_NAME_PREFIX = LinkedBlockingQueue.class
042: .getName()
043: + ".";
044: private static final String TAKE_LOCK_FIELD_NAME = "takeLock";
045: private static final String PUT_LOCK_FIELD_NAME = "putLock";
046: private static final String CAPACITY_FIELD_NAME = "capacity";
047: private static final String INIT_METHOD_NAME = "init";
048: private static final String TC_TAKE_METHOD_NAME = ByteCodeUtil.TC_METHOD_PREFIX
049: + "take";
050: private static final String TC_PUT_METHOD_NAME = ByteCodeUtil.TC_METHOD_PREFIX
051: + "put";
052:
053: private static final Field TAKE_LOCK_FIELD;
054: private static final Field PUT_LOCK_FIELD;
055: private static final Field CAPACITY_FIELD;
056: private static final Method INIT_METHOD;
057: private static final Method TC_TAKE_METHOD;
058: private static final Method TC_PUT_METHOD;
059:
060: static {
061: try {
062: TAKE_LOCK_FIELD = LinkedBlockingQueue.class
063: .getDeclaredField(TAKE_LOCK_FIELD_NAME);
064: TAKE_LOCK_FIELD.setAccessible(true);
065:
066: PUT_LOCK_FIELD = LinkedBlockingQueue.class
067: .getDeclaredField(PUT_LOCK_FIELD_NAME);
068: PUT_LOCK_FIELD.setAccessible(true);
069:
070: CAPACITY_FIELD = LinkedBlockingQueue.class
071: .getDeclaredField(CAPACITY_FIELD_NAME);
072: CAPACITY_FIELD.setAccessible(true);
073:
074: INIT_METHOD = LinkedBlockingQueue.class.getDeclaredMethod(
075: INIT_METHOD_NAME, new Class[0]);
076: INIT_METHOD.setAccessible(true);
077:
078: TC_TAKE_METHOD = LinkedBlockingQueue.class
079: .getDeclaredMethod(TC_TAKE_METHOD_NAME,
080: new Class[0]);
081: TC_TAKE_METHOD.setAccessible(true);
082:
083: TC_PUT_METHOD = LinkedBlockingQueue.class
084: .getDeclaredMethod(TC_PUT_METHOD_NAME,
085: new Class[] { Object.class });
086: TC_PUT_METHOD.setAccessible(true);
087: } catch (Exception e) {
088: throw new RuntimeException(e);
089: }
090: }
091:
092: public LinkedBlockingQueueApplicator(DNAEncoding encoding) {
093: super (encoding);
094: }
095:
096: public TraversedReferences getPortableObjects(Object pojo,
097: TraversedReferences addTo) {
098: getPhysicalPortableObjects(pojo, addTo);
099: getLogicalPortableObjects(pojo, addTo);
100: return addTo;
101: }
102:
103: private void getLogicalPortableObjects(Object pojo,
104: TraversedReferences addTo) {
105: for (Iterator i = ((Queue) pojo).iterator(); i.hasNext();) {
106: Object o = i.next();
107: filterPortableObject(o, addTo);
108: }
109: }
110:
111: private void getPhysicalPortableObjects(Object pojo,
112: TraversedReferences addTo) {
113: try {
114: filterPortableObject(TAKE_LOCK_FIELD.get(pojo), addTo);
115: filterPortableObject(PUT_LOCK_FIELD.get(pojo), addTo);
116: filterPortableObject(CAPACITY_FIELD.get(pojo), addTo);
117: } catch (IllegalAccessException e) {
118: throw new TCRuntimeException(e);
119: }
120: }
121:
122: private void filterPortableObject(Object value,
123: TraversedReferences addTo) {
124: if (value != null && isPortableReference(value.getClass())) {
125: addTo.addAnonymousReference(value);
126: }
127: }
128:
129: public void hydrate(ClientObjectManager objectManager,
130: TCObject tcObject, DNA dna, Object po) throws IOException,
131: ClassNotFoundException {
132: LinkedBlockingQueue queue = (LinkedBlockingQueue) po;
133: DNACursor cursor = dna.getCursor();
134: boolean hasPhysicalAction = false;
135:
136: Object takeLock = null;
137: Object putLock = null;
138: Object capacity = null;
139: while (cursor.next(encoding)) {
140: Object action = cursor.getAction();
141: if (action instanceof LogicalAction) {
142:
143: LogicalAction logicalAction = (LogicalAction) action;
144: int method = logicalAction.getMethod();
145: Object[] params = logicalAction.getParameters();
146:
147: // Since LinkedBlockingQueue supports partial collection, params is not inspected for containing object ids
148:
149: try {
150: apply(queue, method, params);
151: } catch (IndexOutOfBoundsException ioobe) {
152: logger.error("Error applying update to " + po,
153: ioobe);
154: }
155: } else if (action instanceof PhysicalAction) {
156: if (!hasPhysicalAction) {
157: hasPhysicalAction = true;
158: }
159: PhysicalAction physicalAction = (PhysicalAction) action;
160: Assert.eval(physicalAction.isTruePhysical());
161: String fieldName = physicalAction.getFieldName();
162: Object value = physicalAction.getObject();
163:
164: if (fieldName
165: .equals(LINKED_BLOCKING_QUEUE_FIELD_NAME_PREFIX
166: + TAKE_LOCK_FIELD_NAME)) {
167: takeLock = objectManager
168: .lookupObject((ObjectID) value);
169: } else if (fieldName
170: .equals(LINKED_BLOCKING_QUEUE_FIELD_NAME_PREFIX
171: + PUT_LOCK_FIELD_NAME)) {
172: putLock = objectManager
173: .lookupObject((ObjectID) value);
174: } else if (fieldName
175: .equals(LINKED_BLOCKING_QUEUE_FIELD_NAME_PREFIX
176: + CAPACITY_FIELD_NAME)) {
177: capacity = value;
178: }
179: }
180: }
181:
182: // The setting of these physical field can only happen after the logical actions are
183: // applied.
184: if (!dna.isDelta()) {
185: Assert.assertTrue(hasPhysicalAction);
186: try {
187: FieldUtils.tcSet(po, takeLock, TAKE_LOCK_FIELD);
188: FieldUtils.tcSet(po, putLock, PUT_LOCK_FIELD);
189: FieldUtils.tcSet(po, capacity, CAPACITY_FIELD);
190: } catch (IllegalAccessException e) {
191: throw new TCRuntimeException(e);
192: }
193: invokeInitMethod(po);
194: } else {
195: Assert.assertFalse(hasPhysicalAction);
196: }
197:
198: }
199:
200: private void invokeInitMethod(Object po) {
201: try {
202: INIT_METHOD.invoke(po, new Object[0]);
203: } catch (InvocationTargetException e) {
204: throw new TCRuntimeException(e);
205: } catch (IllegalAccessException e) {
206: throw new TCRuntimeException(e);
207: }
208: }
209:
210: private void apply(LinkedBlockingQueue queue, int method,
211: Object[] params) {
212: switch (method) {
213: case SerializationUtil.PUT:
214: try {
215: TC_PUT_METHOD.invoke(queue, new Object[] { params[0] });
216: } catch (InvocationTargetException e) {
217: throw new TCRuntimeException(e);
218: } catch (IllegalAccessException e) {
219: throw new TCRuntimeException(e);
220: }
221: break;
222: case SerializationUtil.TAKE:
223: try {
224: Object o = TC_TAKE_METHOD.invoke(queue, new Object[0]);
225: } catch (InvocationTargetException e) {
226: throw new TCRuntimeException(e);
227: } catch (IllegalAccessException e) {
228: throw new TCRuntimeException(e);
229: }
230: break;
231: case SerializationUtil.REMOVE_FIRST_N:
232: // This is caused by drainTo(), which requires a full lock.
233: int count = ((Integer) params[0]).intValue();
234: for (int i = 0; i < count; i++) {
235: try {
236: TC_TAKE_METHOD.invoke(queue, new Object[0]);
237: } catch (InvocationTargetException e) {
238: throw new TCRuntimeException(e);
239: } catch (IllegalAccessException e) {
240: throw new TCRuntimeException(e);
241: }
242: }
243: break;
244: case SerializationUtil.REMOVE_AT:
245: int index = ((Integer) params[0]).intValue();
246: Assert.assertTrue(queue.size() > index);
247: int j = 0;
248: for (Iterator i = queue.iterator(); i.hasNext();) {
249: i.next();
250: if (j == index) {
251: i.remove();
252: break;
253: }
254: j++;
255: }
256: break;
257: case SerializationUtil.CLEAR:
258: queue.clear();
259: break;
260: default:
261: throw new AssertionError("Invalid method:" + method
262: + " state:" + this );
263: }
264: }
265:
266: public void dehydrate(ClientObjectManager objectManager,
267: TCObject tcObject, DNAWriter writer, Object pojo) {
268: dehydrateFields(objectManager, tcObject, writer, pojo);
269: dehydrateMembers(objectManager, tcObject, writer, pojo);
270: }
271:
272: private void dehydrateFields(ClientObjectManager objectManager,
273: TCObject tcObject, DNAWriter writer, Object pojo) {
274: try {
275: Object takeLock = TAKE_LOCK_FIELD.get(pojo);
276: takeLock = getDehydratableObject(takeLock, objectManager);
277: writer.addPhysicalAction(
278: LINKED_BLOCKING_QUEUE_FIELD_NAME_PREFIX
279: + TAKE_LOCK_FIELD_NAME, takeLock);
280:
281: Object putLock = PUT_LOCK_FIELD.get(pojo);
282: putLock = getDehydratableObject(putLock, objectManager);
283: writer.addPhysicalAction(
284: LINKED_BLOCKING_QUEUE_FIELD_NAME_PREFIX
285: + PUT_LOCK_FIELD_NAME, putLock);
286:
287: Object capacity = CAPACITY_FIELD.get(pojo);
288: capacity = getDehydratableObject(capacity, objectManager);
289: writer.addPhysicalAction(
290: LINKED_BLOCKING_QUEUE_FIELD_NAME_PREFIX
291: + CAPACITY_FIELD_NAME, capacity);
292: } catch (IllegalAccessException e) {
293: throw new TCRuntimeException(e);
294: }
295: }
296:
297: public void dehydrateMembers(ClientObjectManager objectManager,
298: TCObject tcObject, DNAWriter writer, Object pojo) {
299: Queue queue = (Queue) pojo;
300:
301: for (Iterator i = queue.iterator(); i.hasNext();) {
302: Object value = i.next();
303: if (!(value instanceof ObjectID)) {
304: if (!objectManager.isPortableInstance(value)) {
305: continue;
306: }
307: value = getDehydratableObject(value, objectManager);
308: }
309: if (value == null) {
310: continue;
311: }
312: writer.addLogicalAction(SerializationUtil.PUT,
313: new Object[] { value });
314: }
315: }
316:
317: public Object getNewInstance(ClientObjectManager objectManager,
318: DNA dna) {
319: throw new UnsupportedOperationException();
320: }
321:
322: @SuppressWarnings("unchecked")
323: public Map connectedCopy(Object source, Object dest, Map visited,
324: ClientObjectManager objectManager,
325: OptimisticTransactionManager txManager) {
326: Map cloned = new IdentityHashMap();
327:
328: Manageable sourceManageable = (Manageable) source;
329: Manageable destManaged = (Manageable) dest;
330:
331: Queue sourceQueue = (Queue) source;
332: Queue destQueue = (Queue) dest;
333:
334: for (Iterator i = sourceQueue.iterator(); i.hasNext();) {
335: Object v = i.next();
336: Object copyValue = null;
337:
338: copyValue = createCopyIfNecessary(objectManager, visited,
339: cloned, v);
340: destQueue.add(copyValue);
341: }
342:
343: destManaged.__tc_managed(new TCObjectClone(sourceManageable
344: .__tc_managed(), txManager));
345: return cloned;
346: }
347: }
|