001: /*
002: * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright
003: * notice. All rights reserved.
004: */
005: package com.tc.object;
006:
007: import EDU.oswego.cs.dl.util.concurrent.CyclicBarrier;
008:
009: import com.tc.exception.ImplementMe;
010: import com.tc.logging.NullTCLogger;
011: import com.tc.net.groups.ClientID;
012: import com.tc.net.protocol.tcm.ChannelID;
013: import com.tc.net.protocol.tcm.MessageChannel;
014: import com.tc.net.protocol.tcm.TestChannelIDProvider;
015: import com.tc.object.dna.api.DNA;
016: import com.tc.object.msg.RequestManagedObjectMessage;
017: import com.tc.object.msg.RequestManagedObjectMessageFactory;
018: import com.tc.object.msg.RequestRootMessage;
019: import com.tc.object.msg.RequestRootMessageFactory;
020: import com.tc.object.session.NullSessionManager;
021: import com.tc.object.session.SessionID;
022: import com.tc.objectserver.core.api.TestDNA;
023: import com.tc.test.TCTestCase;
024: import com.tc.util.concurrent.NoExceptionLinkedQueue;
025: import com.tc.util.concurrent.ThreadUtil;
026:
027: import java.util.ArrayList;
028: import java.util.Collection;
029: import java.util.HashMap;
030: import java.util.HashSet;
031: import java.util.Iterator;
032: import java.util.LinkedList;
033: import java.util.Map;
034: import java.util.Set;
035:
036: public class RemoteObjectManagerImplTest extends TCTestCase {
037:
038: RemoteObjectManagerImpl manager;
039: ThreadGroup threadGroup;
040: private ClientIDProvider cidProvider;
041: private TestRequestRootMessageFactory rrmf;
042: private TestRequestManagedObjectMessageFactory rmomf;
043: private RetrieverThreads rt;
044:
045: protected void setUp() throws Exception {
046: super .setUp();
047: TestChannelIDProvider channelIDProvider = new TestChannelIDProvider();
048: channelIDProvider.channelID = new ChannelID(1);
049: this .cidProvider = new ClientIDProviderImpl(channelIDProvider);
050: this .rmomf = new TestRequestManagedObjectMessageFactory();
051: newRmom();
052: this .rrmf = new TestRequestRootMessageFactory();
053: newRrm();
054:
055: this .threadGroup = new ThreadGroup(getClass().getName());
056: manager = new RemoteObjectManagerImpl(new NullTCLogger(),
057: cidProvider, rrmf, rmomf,
058: new NullObjectRequestMonitor(), 500,
059: new NullSessionManager());
060: rt = new RetrieverThreads(Thread.currentThread()
061: .getThreadGroup(), manager);
062: }
063:
064: public void testDNACacheClearing() {
065: Collection dnas;
066: int dnaCollectionCount = 4;
067: for (int i = 0; i < dnaCollectionCount; i++) {
068: dnas = new ArrayList();
069: dnas.add(new TestDNA(new ObjectID(i)));
070: manager.addAllObjects(new SessionID(i), i, dnas);
071: }
072: assertEquals(dnaCollectionCount, manager.getDNACacheSize());
073: DNA dna = manager.retrieve(new ObjectID(0));
074: assertNotNull(dna);
075: assertEquals(dnaCollectionCount - 1, manager.getDNACacheSize());
076: manager.pause();
077: manager.starting();
078: manager.clear();
079: assertEquals(0, manager.getDNACacheSize());
080: }
081:
082: public void testMissingObjectIDsThrowsError() throws Exception {
083: final CyclicBarrier barrier = new CyclicBarrier(2);
084: Thread thread = new Thread("Test Thread Saro") {
085: public void run() {
086: System.err.println("Doing a bogus lookup");
087: try {
088: manager.retrieve(new ObjectID(Long.MAX_VALUE));
089: System.err
090: .println("Didnt throw assertion : Not calling barrier()");
091: } catch (AssertionError e) {
092: System.err.println("Got assertion as expected : "
093: + e);
094: try {
095: barrier.barrier();
096: } catch (Exception e1) {
097: e1.printStackTrace();
098: }
099: }
100: }
101: };
102: thread.start();
103: ThreadUtil.reallySleep(5000);
104: Set missingSet = new HashSet();
105: missingSet.add(new ObjectID(Long.MAX_VALUE));
106: manager.objectsNotFoundFor(SessionID.NULL_ID, 1, missingSet);
107: barrier.barrier();
108: }
109:
110: public void testRequestOutstandingRequestRootMessages()
111: throws Exception {
112: final Map expectedResent = new HashMap();
113: final Map expectedNotResent = new HashMap();
114: TestRequestRootMessage rrm = newRrm();
115: assertNoMessageSent(rrm);
116: pauseAndStart();
117: manager.requestOutstanding();
118: manager.unpause();
119: assertNoMessageSent(rrm);
120:
121: int count = 100;
122: for (int i = 0; i < count; i++) {
123: newRrm();
124: String rootID = "root" + i;
125: rt.startNewRootRetriever(rootID);
126: Object tmp = rrmf.newMessageQueue.take();
127: assertFalse(tmp == rrm);
128: rrm = (TestRequestRootMessage) tmp;
129: assertTrue(rrmf.newMessageQueue.isEmpty());
130: rrm.sendQueue.take();
131: assertTrue(rrm.sendQueue.isEmpty());
132: if (i % 2 == 0) {
133: expectedResent.put(rootID, rrm);
134: } else {
135: expectedNotResent.put(rootID, rrm);
136: }
137: }
138: log("rt.getAliveCount() = " + rt.getAliveCount()
139: + " expectedResent.size() = " + expectedResent.size()
140: + " expectedNotResent.size() = "
141: + expectedNotResent.size());
142: assertEquals(count, rt.getAliveCount());
143: // respond to some of the requests
144: int objectIDCount = 1;
145: for (Iterator i = expectedNotResent.keySet().iterator(); i
146: .hasNext();) {
147: String rootID = (String) i.next();
148: log("Adding Root = " + rootID);
149: manager.addRoot(rootID, new ObjectID(objectIDCount++));
150: }
151: // the threads waiting for the roots we just added should fall through.
152: rt.waitForLowWatermark(count - expectedNotResent.size());
153: assertEquals(count - expectedResent.size(), rt.getAliveCount());
154:
155: // TEST REQUEST OUTSTANDING
156: pauseAndStart();
157: manager.requestOutstanding();
158: manager.unpause();
159:
160: assertFalse(rrmf.newMessageQueue.isEmpty());
161:
162: // Check the messages we expect to have been resent
163: for (Iterator i = expectedResent.values().iterator(); i
164: .hasNext(); i.next()) {
165: rrm = (TestRequestRootMessage) rrmf.newMessageQueue.take();
166: assertNotNull(rrm.sendQueue.poll(1));
167: }
168:
169: for (Iterator i = expectedNotResent.values().iterator(); i
170: .hasNext();) {
171: rrm = (TestRequestRootMessage) i.next();
172: assertTrue(rrm.sendQueue.isEmpty());
173: }
174:
175: assertTrue(rrmf.newMessageQueue.isEmpty());
176:
177: // respond to the rest of the requests
178: for (Iterator i = expectedResent.keySet().iterator(); i
179: .hasNext();) {
180: String rootID = (String) i.next();
181: log("Adding Root = " + rootID);
182: manager.addRoot(rootID, new ObjectID(objectIDCount++));
183: }
184:
185: // all the threads should now be able to complete.
186: rt.waitForLowWatermark(0);
187:
188: }
189:
190: private static void log(String s) {
191: if (false)
192: System.err.println(Thread.currentThread().getName()
193: + " :: " + s);
194: }
195:
196: private void pauseAndStart() {
197: manager.pause();
198: // manager.clearCache();
199: manager.starting();
200: }
201:
202: public void testRequestOutstandingRequestManagedObjectMessages()
203: throws Exception {
204:
205: final Map expectedResent = new HashMap();
206: final Map secondaryResent = new HashMap();
207: final Map expectedNotResent = new HashMap();
208:
209: TestRequestManagedObjectMessage rmom = newRmom();
210: assertNoMessageSent(rmom);
211: pauseAndStart();
212: manager.requestOutstanding();
213: manager.unpause();
214: assertNoMessageSent(rmom);
215:
216: int count = 50;
217:
218: for (int i = 0; i < count; i++) {
219: newRmom();
220: ObjectID id = new ObjectID(i);
221: assertTrue(rmomf.newMessageQueue.isEmpty());
222: rt.startNewObjectRetriever(id);
223: Object tmp = rmomf.newMessageQueue.take();
224: assertTrue(rmomf.newMessageQueue.isEmpty());
225: // make sure we aren't mistakenly using the same message all the time
226: assertFalse(rmom == tmp);
227: rmom = (TestRequestManagedObjectMessage) tmp;
228: rmom.sendQueue.take();
229: assertEquals(i + 1, rt.getAliveCount());
230: if (i % 2 == 0) {
231: expectedResent.put(id, rmom);
232: } else {
233: expectedNotResent.put(id, rmom);
234: }
235: }
236:
237: // request the same objects again
238: for (int i = 0; i < count; i++) {
239: newRmom();
240: ObjectID id = new ObjectID(i);
241: assertTrue(rmomf.newMessageQueue.isEmpty());
242: rt.startNewObjectRetriever(id);
243: assertTrue(rmomf.newMessageQueue.isEmpty());
244: }
245:
246: assertTrue(rmomf.newMessageQueue.isEmpty());
247:
248: // now go through all of the messages we don't expect to be resent and respond to their requests
249: for (Iterator i = expectedNotResent.keySet().iterator(); i
250: .hasNext();) {
251: newRmom();
252: assertTrue(rmomf.newMessageQueue.isEmpty());
253: manager.addObject(new TestDNA((ObjectID) i.next()));
254: // collect the messages sent for the secondary threads...
255: Object tmp = rmomf.newMessageQueue.take();
256: assertFalse(rmom == tmp);
257: rmom = (TestRequestManagedObjectMessage) tmp;
258: rmom.sendQueue.take();
259: assertTrue(rmom.sendQueue.isEmpty());
260: secondaryResent.put(rmom.objectIDs.iterator().next(), rmom);
261: }
262:
263: // now tell it to resend outstanding
264: pauseAndStart();
265: manager.requestOutstanding();
266: manager.unpause();
267:
268: final Collection c = new LinkedList();
269: c.addAll(expectedResent.values());
270: c.addAll(secondaryResent.values());
271: // now go through all of the messages we DO expect to be resent and make sure that
272: // they WERE resent
273: for (Iterator i = c.iterator(); i.hasNext(); i.next()) {
274: rmom = (TestRequestManagedObjectMessage) rmomf.newMessageQueue
275: .take();
276: assertFalse(rmom.sendQueue.isEmpty());
277: assertNotNull(rmom.sendQueue.poll(1));
278: }
279:
280: c.clear();
281:
282: // go through all of the messages we DON'T expect to be resent and make sure they WEREN'T resent
283:
284: c.addAll(expectedNotResent.values());
285: for (Iterator i = c.iterator(); i.hasNext();) {
286: rmom = (TestRequestManagedObjectMessage) i.next();
287: assertTrue(rmom.sendQueue.isEmpty());
288: }
289: }
290:
291: public void testBasics() throws Exception {
292:
293: final ObjectID id1 = new ObjectID(1);
294: final ObjectID id2 = new ObjectID(2);
295: final ObjectID id200 = new ObjectID(200);
296: final ObjectID id201 = new ObjectID(201);
297: final Set removed = new HashSet();
298: removed.add(id1);
299: removed.add(id200);
300: removed.add(id201);
301: // set up some removed objects.
302: for (Iterator i = removed.iterator(); i.hasNext();) {
303: this .manager.removed((ObjectID) i.next());
304: }
305:
306: TestRequestManagedObjectMessage rmom = this .rmomf.message;
307: assertNoMessageSent(rmom);
308:
309: rt.startNewObjectRetriever(id1);
310:
311: waitForMessageSend(rmom);
312:
313: assertNoMessageSent(rmom);
314:
315: // Check to see that the message was initialized with the expected
316: // values
317: verifyRmomInit(id1, removed, rmom);
318:
319: assertEquals(1, rt.getAliveCount());
320:
321: rmom = newRmom();
322:
323: // now request the same object id with a different thread.
324: rt.startNewObjectRetriever(id1);
325:
326: // but, no message should have been sent.
327: assertTrue(rmom.sendQueue.isEmpty());
328:
329: assertEquals(2, rt.getAliveCount());
330:
331: // now request a different object id on a different thread
332: rt.startNewObjectRetriever(id2);
333:
334: // this thread should send a message with id2, an empty set for the
335: // removed
336: // ids
337: waitForMessageSend(rmom);
338: verifyRmomInit(id2, new HashSet(), rmom);
339:
340: assertEquals(3, rt.getAliveCount());
341:
342: assertNoMessageSent(rmom);
343: rmom = newRmom();
344:
345: // this should allow the first two threads to fall through
346: // XXX: Actually, it doesn't. The way it's implemented, each object
347: // request
348: // will result in its own message send.
349: //
350: // This is sub-optimal, but it works so we're not changing it right now,
351: // especially since we're going to have to optimize this stuff soon
352: // anyway. --Orion 8/24/05
353:
354: manager.addObject(new TestDNA(id1));
355: rt.waitForLowWatermark(2);
356:
357: waitForMessageSend(rmom);
358: verifyRmomInit(id1, new HashSet(), rmom);
359:
360: rmom = newRmom();
361:
362: // the second thread should now create and send a new message
363: manager.addObject(new TestDNA(id1));
364: rt.waitForLowWatermark(1);
365:
366: // now, allow the third thread to fall through
367: manager.addObject(new TestDNA(id2));
368: rt.waitForLowWatermark(0);
369:
370: // no-one should have sent any messages
371: assertNoMessageSent(rmom);
372: }
373:
374: private void assertNoMessageSent(
375: TestRequestManagedObjectMessage rmom) {
376: assertTrue(rmomf.newMessageQueue.isEmpty());
377: assertTrue(rmom.sendQueue.isEmpty());
378: }
379:
380: private void assertNoMessageSent(TestRequestRootMessage rrm) {
381: assertTrue(rrmf.newMessageQueue.isEmpty());
382: assertTrue(rrm.sendQueue.isEmpty());
383: }
384:
385: private void waitForMessageSend(TestRequestManagedObjectMessage rmom) {
386: rmomf.newMessageQueue.take();
387: rmom.sendQueue.take();
388: }
389:
390: /**
391: * Verifies that the object request message initialization was done according to the given arguments.
392: */
393: private void verifyRmomInit(final ObjectID objectID,
394: final Set removed, TestRequestManagedObjectMessage rmom) {
395: Object[] initArgs = (Object[]) rmom.initializeQueue.take();
396: Set oids = new HashSet();
397: oids.add(objectID);
398: assertTrue(rmom.initializeQueue.isEmpty());
399: ObjectRequestContext ctxt = (ObjectRequestContext) initArgs[0];
400: assertEquals(cidProvider.getClientID(), ctxt.getClientID());
401: assertEquals(oids, ctxt.getObjectIDs());
402: // The object id in the request
403: assertEquals(oids, initArgs[1]);
404: // The proper set of removed object ids
405: assertEquals(removed, initArgs[2]);
406: }
407:
408: private TestRequestRootMessage newRrm() {
409: TestRequestRootMessage rv = new TestRequestRootMessage();
410: this .rrmf.message = rv;
411: return rv;
412: }
413:
414: private TestRequestManagedObjectMessage newRmom() {
415: TestRequestManagedObjectMessage rmom;
416: rmom = new TestRequestManagedObjectMessage();
417: this .rmomf.message = rmom;
418: return rmom;
419: }
420:
421: private static class RetrieverThreads {
422: private int threadCount;
423:
424: private final RemoteObjectManager manager;
425:
426: private final Set inProgress = new HashSet();
427:
428: private final ThreadGroup tg;
429:
430: public RetrieverThreads(ThreadGroup tg,
431: RemoteObjectManager manager) {
432: this .manager = manager;
433: this .tg = tg;
434: }
435:
436: public int getAliveCount() {
437: synchronized (inProgress) {
438: return inProgress.size();
439: }
440: }
441:
442: public void waitForLowWatermark(int max)
443: throws InterruptedException {
444: if (getAliveCount() <= max)
445: return;
446: synchronized (inProgress) {
447: while (getAliveCount() > max) {
448: inProgress.wait();
449: }
450: }
451: }
452:
453: public Thread startNewRootRetriever(final String rootID) {
454: Thread t = new Thread(tg, new Runnable() {
455:
456: public void run() {
457: log("Starting .. " + rootID);
458: manager.retrieveRootID(rootID);
459: log("Retrieved rootID.. " + rootID);
460: synchronized (inProgress) {
461: if (!inProgress.remove(Thread.currentThread()))
462: throw new RuntimeException(
463: "Thread not removed!");
464: log("Removed from inProgress .. size = "
465: + inProgress.size());
466: inProgress.notifyAll();
467: }
468: }
469: }, "Root retriever thread " + threadCount++);
470: synchronized (inProgress) {
471: inProgress.add(t);
472: log("Added : inProgress size = " + inProgress.size());
473: }
474: t.start();
475: return t;
476: }
477:
478: public Thread startNewObjectRetriever(final ObjectID id) {
479: Thread t = new Thread(tg, new Runnable() {
480:
481: public void run() {
482: manager.retrieve(id);
483: synchronized (inProgress) {
484: if (!inProgress.remove(Thread.currentThread()))
485: throw new RuntimeException(
486: "Thread not removed!");
487: inProgress.notifyAll();
488: }
489: }
490: }, "Object retriever thread " + threadCount++);
491: synchronized (inProgress) {
492: inProgress.add(t);
493: }
494: t.start();
495: return t;
496: }
497: }
498:
499: private static class TestRequestRootMessageFactory implements
500: RequestRootMessageFactory {
501: public final NoExceptionLinkedQueue newMessageQueue = new NoExceptionLinkedQueue();
502: public TestRequestRootMessage message;
503:
504: public RequestRootMessage newRequestRootMessage() {
505: newMessageQueue.put(message);
506: return this .message;
507: }
508:
509: }
510:
511: private static class TestRequestRootMessage implements
512: RequestRootMessage {
513:
514: public final NoExceptionLinkedQueue sendQueue = new NoExceptionLinkedQueue();
515:
516: public String getRootName() {
517: throw new ImplementMe();
518: }
519:
520: public void initialize(String name) {
521: return;
522: }
523:
524: public void send() {
525: sendQueue.put(new Object());
526: }
527:
528: public ClientID getClientID() {
529: throw new ImplementMe();
530: }
531:
532: public void recycle() {
533: return;
534: }
535:
536: }
537:
538: private static class TestRequestManagedObjectMessageFactory
539: implements RequestManagedObjectMessageFactory {
540:
541: public final NoExceptionLinkedQueue newMessageQueue = new NoExceptionLinkedQueue();
542:
543: public TestRequestManagedObjectMessage message;
544:
545: public RequestManagedObjectMessage newRequestManagedObjectMessage() {
546: newMessageQueue.put(message);
547: return message;
548: }
549:
550: }
551:
552: private static class TestRequestManagedObjectMessage implements
553: RequestManagedObjectMessage {
554:
555: public final NoExceptionLinkedQueue initializeQueue = new NoExceptionLinkedQueue();
556: public final NoExceptionLinkedQueue sendQueue = new NoExceptionLinkedQueue();
557: public Set objectIDs;
558:
559: public ObjectRequestID getRequestID() {
560: throw new ImplementMe();
561: }
562:
563: public Set getObjectIDs() {
564: throw new ImplementMe();
565: }
566:
567: public Set getRemoved() {
568: throw new ImplementMe();
569: }
570:
571: public void initialize(ObjectRequestContext ctxt, Set oids,
572: Set removedIDs) {
573: this .objectIDs = oids;
574: this .initializeQueue.put(new Object[] { ctxt, oids,
575: removedIDs });
576: }
577:
578: public void send() {
579: sendQueue.put(new Object());
580: }
581:
582: public MessageChannel getChannel() {
583: throw new ImplementMe();
584: }
585:
586: public ClientID getClientID() {
587: throw new ImplementMe();
588: }
589:
590: public int getRequestDepth() {
591: return 400;
592: }
593:
594: public void recycle() {
595: return;
596: }
597:
598: public String getRequestingThreadName() {
599: return "TestThreadDummy";
600: }
601:
602: }
603:
604: }
|