001: // $Id: LargeState.java,v 1.27 2006/10/11 14:28:25 belaban Exp $
002:
003: package org.jgroups.tests;
004:
005: import org.jgroups.*;
006: import org.jgroups.jmx.JmxConfigurator;
007: import org.jgroups.util.Util;
008: import org.jgroups.util.Promise;
009:
010: import java.io.OutputStream;
011: import java.io.InputStream;
012: import java.io.IOException;
013:
014: import javax.management.MBeanServer;
015:
016: /**
017: * Tests transfer of large states. Start first instance with -provider flag and -size flag (default = 1MB).
018: * The start second instance without these flags: it should acquire the state from the first instance. Possibly
019: * tracing should be turned on for FRAG to see the fragmentation taking place, e.g.:
020: * <pre>
021: * trace1=FRAG DEBUG STDOUT
022: * </pre><br>
023: * Note that because fragmentation might generate a lot of small fragments at basically the same time (e.g. size1MB,
024: * FRAG.frag-size=4096 generates a lot of fragments), the send buffer of the unicast socket in UDP might be overloaded,
025: * causing it to drop some packets (default size is 8096 bytes). Therefore the send (and receive) buffers for the unicast
026: * socket have been increased (see ucast_send_buf_size and ucast_recv_buf_size below).<p>
027: * If we didn't do this, we would have some retransmission, slowing the state transfer down.
028: *
029: * @author Bela Ban Dec 13 2001
030: */
031: public class LargeState extends ExtendedReceiverAdapter {
032: Channel channel;
033: byte[] state = null;
034: Thread getter = null;
035: boolean rc = false;
036: String props;
037: long start, stop;
038: boolean provider = true;
039: int size = 100000;
040: int total_received = 0;
041: final Promise state_promise = new Promise();
042: final int STREAMING_CHUNK_SIZE = 10000;
043:
044: public void start(boolean provider, int size, String props,
045: boolean jmx) throws Exception {
046: this .provider = provider;
047: channel = new JChannel(props);
048: channel.setReceiver(this );
049: channel.connect("TestChannel");
050: if (jmx) {
051: MBeanServer server = Util.getMBeanServer();
052: if (server == null)
053: throw new Exception(
054: "No MBeanServers found;"
055: + "\nLargeState needs to be run with an MBeanServer present, or inside JDK 5");
056: JmxConfigurator.registerChannel((JChannel) channel, server,
057: "jgroups", channel.getClusterName(), true);
058: }
059: System.out.println("-- connected to channel");
060:
061: if (provider) {
062: this .size = size;
063: // System.out.println("Creating state of " + size + " bytes");
064: // state=createLargeState(size);
065: System.out
066: .println("Waiting for other members to join and fetch large state");
067:
068: // System.out.println("sending a few messages");
069: // for(int i=0; i < 100; i++) {
070: // channel.send(null, null, "hello world " + i);
071: // }
072: } else {
073: System.out.println("Getting state");
074: start = System.currentTimeMillis();
075: // total_received=0;
076: state_promise.reset();
077: rc = channel.getState(null, 0);
078: System.out.println("getState(), rc=" + rc);
079: if (rc)
080: state_promise.getResult(10000);
081: }
082:
083: // mainLoop();
084: if (!provider) {
085: channel.close();
086: } else {
087: for (;;) {
088: Util.sleep(10000);
089: }
090: }
091: }
092:
093: byte[] createLargeState(int size) {
094: return new byte[size];
095: }
096:
097: public void receive(Message msg) {
098: System.out.println("-- received msg " + msg.getObject()
099: + " from " + msg.getSrc());
100: }
101:
102: public void viewAccepted(View new_view) {
103: if (provider)
104: System.out.println("-- view: " + new_view);
105: }
106:
107: public byte[] getState() {
108: if (state == null) {
109: System.out.println("creating state of " + size + " bytes");
110: state = createLargeState(size);
111: }
112: System.out.println("--> returning state: " + state.length
113: + " bytes");
114: return state;
115: }
116:
117: public void setState(byte[] state) {
118: stop = System.currentTimeMillis();
119: if (state != null) {
120: this .state = state;
121: System.out
122: .println("<-- Received state, size ="
123: + state.length + " (took " + (stop - start)
124: + "ms)");
125: }
126: state_promise.setResult(Boolean.TRUE);
127: }
128:
129: public byte[] getState(String state_id) {
130: if (state_id == null)
131: return getState();
132: throw new UnsupportedOperationException("not yet implemented");
133: }
134:
135: public void setState(String state_id, byte[] state) {
136: if (state_id == null) {
137: setState(state);
138: state_promise.setResult(Boolean.TRUE);
139: return;
140: }
141: throw new UnsupportedOperationException("not yet implemented");
142: }
143:
144: public void getState(String state_id, OutputStream ostream) {
145: throw new UnsupportedOperationException("not yet implemented");
146: }
147:
148: public void setState(InputStream istream) {
149: try {
150: total_received = 0;
151: int received = 0;
152: while (true) {
153: byte[] buf = new byte[10000];
154: try {
155: received = istream.read(buf);
156: if (received < 0)
157: break;
158: // System.out.println("received " + received + " bytes");
159: total_received += received;
160: } catch (IOException e) {
161: e.printStackTrace();
162: break;
163: }
164: }
165:
166: stop = System.currentTimeMillis();
167: System.out.println("<-- Received state, size="
168: + total_received + " (took " + (stop - start)
169: + "ms)");
170: state_promise.setResult(Boolean.TRUE);
171: } finally {
172: Util.close(istream);
173: }
174: }
175:
176: public void setState(String state_id, InputStream istream) {
177: throw new UnsupportedOperationException("not yet implemented");
178: }
179:
180: public void getState(OutputStream ostream) {
181: try {
182: int frag_size = size / 10;
183: for (int i = 0; i < 10; i++) {
184: byte[] buf = new byte[frag_size];
185: try {
186: ostream.write(buf);
187: } catch (IOException e) {
188: e.printStackTrace();
189: break;
190: }
191: }
192: int remaining = size - (10 * frag_size);
193: if (remaining > 0) {
194: byte[] buf = new byte[remaining];
195: try {
196: ostream.write(buf);
197: } catch (IOException e) {
198: e.printStackTrace();
199: }
200: }
201: } finally {
202: Util.close(ostream);
203: }
204: }
205:
206: public static void main(String[] args) {
207: boolean provider = false;
208: boolean jmx = false;
209: int size = 1024 * 1024;
210: String props = null;
211:
212: for (int i = 0; i < args.length; i++) {
213: if ("-help".equals(args[i])) {
214: help();
215: return;
216: }
217: if ("-provider".equals(args[i])) {
218: provider = true;
219: continue;
220: }
221: if ("-jmx".equals(args[i])) {
222: jmx = true;
223: continue;
224: }
225: if ("-size".equals(args[i])) {
226: size = Integer.parseInt(args[++i]);
227: continue;
228: }
229: if ("-props".equals(args[i])) {
230: props = args[++i];
231: }
232: }
233:
234: try {
235: new LargeState().start(provider, size, props, jmx);
236: } catch (Exception e) {
237: e.printStackTrace();
238: }
239: }
240:
241: static void help() {
242: System.out
243: .println("LargeState [-help] [-size <size of state in bytes] [-provider] [-props <properties>] [-jmx]");
244: }
245:
246: }
|