001: /*
002: * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright notice. All rights reserved.
003: */
004: package com.tc.net.core;
005:
006: import EDU.oswego.cs.dl.util.concurrent.SynchronizedRef;
007:
008: import com.tc.bytes.TCByteBuffer;
009: import com.tc.bytes.TCByteBufferFactory;
010: import com.tc.net.TCSocketAddress;
011: import com.tc.net.protocol.GenericNetworkMessage;
012: import com.tc.net.protocol.GenericNetworkMessageSink;
013: import com.tc.net.protocol.GenericProtocolAdaptor;
014: import com.tc.util.Assert;
015: import com.tc.util.concurrent.SetOnceFlag;
016: import com.tc.util.concurrent.ThreadUtil;
017:
018: import java.util.HashMap;
019: import java.util.Iterator;
020: import java.util.Random;
021:
022: /**
023: * @author teck
024: */
025: public class VerifierClient implements Runnable {
026: private static final int TIMEOUT = 60000 * 2;
027: private static int clientCounter = 0;
028:
029: private final TCConnectionManager connMgr;
030: private final TCSocketAddress addr;
031: private final int clientNum;
032: private final int dataSize;
033: private final int numToSend;
034: private final int maxDelay;
035: private final int minDelay;
036: private final SynchronizedRef error = new SynchronizedRef(null);
037: private final Random random;
038: private final Verifier verifier;
039: private final Verifier sendVerifier;
040: private int sendSequence = 0;
041: private int sendCounter = 0;
042: private int numToRecv;
043:
044: public VerifierClient(TCConnectionManager connMgr,
045: TCSocketAddress addr, int dataSize, int numToSend,
046: int minDelay, int maxDelay) {
047:
048: if ((maxDelay < 0) || (minDelay < 0)) {
049: // make formatter sane
050: throw new IllegalArgumentException(
051: "delay values must be greater than or equal to zero");
052: }
053:
054: if (maxDelay < minDelay) {
055: throw new IllegalArgumentException(
056: "max cannot be less than min");
057: }
058:
059: this .clientNum = getNextClientNum();
060: this .connMgr = connMgr;
061: this .addr = addr;
062: this .dataSize = dataSize;
063: this .minDelay = minDelay;
064: this .maxDelay = maxDelay;
065: this .numToSend = numToSend;
066: this .random = new Random();
067: this .verifier = new Verifier(this .clientNum);
068: this .sendVerifier = new Verifier(this .clientNum);
069: this .numToRecv = numToSend;
070: }
071:
072: private static synchronized int getNextClientNum() {
073: return ++clientCounter;
074: }
075:
076: private void delay() {
077: final int range = minDelay - maxDelay;
078: if (range > 0) {
079: final long sleepFor = minDelay
080: + random.nextInt(maxDelay - minDelay);
081: ThreadUtil.reallySleep(sleepFor);
082: }
083: }
084:
085: private class Sink implements GenericNetworkMessageSink {
086: public void putMessage(GenericNetworkMessage msg) {
087: try {
088: verifier.putMessage(msg);
089: } catch (Throwable t) {
090: setError(t);
091: } finally {
092: msgRecv();
093: }
094: }
095: }
096:
097: private void msgRecv() {
098: synchronized (this ) {
099: numToRecv--;
100: notify();
101: }
102: }
103:
104: private void setError(Throwable t) {
105: t.printStackTrace();
106: error.set(t);
107: }
108:
109: public void run() {
110: try {
111: run0();
112: } catch (Throwable t) {
113: setError(t);
114: } finally {
115: checkForError();
116: }
117: }
118:
119: public void run0() throws Throwable {
120: final HashMap sentCallbacks = new HashMap();
121: final TCConnection conn = connMgr
122: .createConnection(new GenericProtocolAdaptor(new Sink()));
123: conn.connect(addr, TIMEOUT);
124:
125: for (int i = 0; i < numToSend; i++) {
126: checkForError();
127: delay();
128:
129: final GenericNetworkMessage msg = makeNextMessage(conn);
130: sendVerifier.putMessage(msg);
131:
132: synchronized (sentCallbacks) {
133: sentCallbacks.put(msg, new SetOnceFlag());
134: }
135:
136: msg.setSentCallback(new Runnable() {
137: public void run() {
138: synchronized (sentCallbacks) {
139: ((SetOnceFlag) sentCallbacks.get(msg)).set();
140: }
141: }
142: });
143:
144: conn.putMessage(msg);
145: }
146:
147: checkForError();
148:
149: synchronized (this ) {
150: while (numToRecv > 0) {
151: wait();
152: }
153: }
154:
155: checkForError();
156:
157: conn.close(TIMEOUT);
158:
159: // make sure that the sent callback was called once and only once for each message
160: for (final Iterator iter = sentCallbacks.values().iterator(); iter
161: .hasNext();) {
162: SetOnceFlag sent = (SetOnceFlag) iter.next();
163: Assert.eval(sent.isSet());
164: iter.remove();
165: }
166:
167: checkForError();
168: }
169:
170: private GenericNetworkMessage makeNextMessage(TCConnection conn) {
171: // must use a multiple of 8 for the data in this message. Data is <id><counter><id><counter>....where id and
172: // counter are both 4 byte ints
173: int extra = 8 + (8 * random.nextInt(13));
174: TCByteBuffer data[] = TCByteBufferFactory
175: .getFixedSizedInstancesForLength(false, 4096 * dataSize
176: + extra);
177:
178: if (this .dataSize == 0) {
179: Assert.assertEquals(1, data.length);
180: }
181:
182: for (int d = 0; d < data.length; d++) {
183: TCByteBuffer buf = data[d];
184: Assert.eval((buf.limit() % 8) == 0);
185:
186: while (buf.hasRemaining()) {
187: buf.putInt(clientNum);
188: buf.putInt(sendCounter++);
189: }
190:
191: buf.flip();
192: }
193:
194: GenericNetworkMessage msg = new GenericNetworkMessage(conn,
195: data);
196: msg.setSequence(sendSequence++);
197: msg.setClientNum(this .clientNum);
198: return msg;
199: }
200:
201: private void checkForError() {
202: final Throwable t = (Throwable) error.get();
203: if (t != null) {
204: throw new RuntimeException(t);
205: }
206: }
207: }
|