001: package org.jgroups.tests;
002:
003: import org.jgroups.Message;
004: import org.jgroups.conf.ClassConfigurator;
005: import org.jgroups.protocols.*;
006: import org.jgroups.stack.IpAddress;
007: import org.jgroups.util.List;
008: import org.jgroups.util.Buffer;
009: import org.jgroups.util.ExposedByteArrayOutputStream;
010:
011: import java.io.*;
012: import java.util.Enumeration;
013: import java.util.LinkedList;
014:
015: /**
016: * @author Bela Ban Feb 12, 2004
017: * @version $Id: MessageSerializationTest2.java,v 1.14 2005/07/15 09:35:00 belaban Exp $
018: */
019: public class MessageSerializationTest2 {
020: Message msg;
021: Buffer buf;
022: long start, stop, total;
023: double msgs_per_sec, time_per_msg;
024: List my_list = new List();
025: int num = 50000;
026: ObjectOutputStream out;
027: ExposedByteArrayOutputStream output;
028: ByteArrayInputStream input;
029: ObjectInputStream in;
030: DataOutputStream dos;
031: DataInputStream dis;
032: int msgs_read = 0;
033: List l2 = new List();
034:
035: public void start(int num, boolean use_serialization,
036: boolean use_streamable, boolean use_additional_data,
037: boolean add_headers) throws Exception {
038: IpAddress dest = new IpAddress("228.8.8.8", 7500);
039: IpAddress src = new IpAddress("127.0.0.1", 5555);
040: if (use_additional_data)
041: src.setAdditionalData("bela".getBytes());
042:
043: ClassConfigurator.getInstance(true);
044:
045: this .num = num;
046: System.out.println("-- starting to create " + num + " msgs");
047: start = System.currentTimeMillis();
048: for (int i = 1; i <= num; i++) {
049: msg = new Message(dest, src,
050: ("Hello world from message #" + i).getBytes());
051: if (add_headers) {
052: addHeaders(msg);
053: }
054: my_list.add(msg);
055: }
056: stop = System.currentTimeMillis();
057: total = stop - start;
058: msgs_per_sec = num / (total / 1000.0);
059: time_per_msg = total / (double) num;
060: System.out.println("\n-- total time for creating " + num
061: + " msgs = " + total + "ms \n(" + msgs_per_sec
062: + " msgs/sec, time_per_msg=" + time_per_msg + " ms)");
063:
064: LinkedList l_ser = null, l_stream = null;
065:
066: if (use_streamable)
067: l_stream = marshalMessages();
068:
069: if (use_serialization)
070: l_ser = serializeMessage();
071:
072: if (l_ser != null && l_stream != null)
073: printDiffs(l_ser, l_stream);
074: }
075:
076: void addHeaders(Message msg) {
077: msg.putHeader("UDP", new UdpHeader("MyGroup"));
078: msg.putHeader("PING", new PingHeader(PingHeader.GET_MBRS_REQ,
079: null));
080: msg.putHeader("FD_SOCK", new FD_SOCK.FdHeader());
081: msg.putHeader("VERIFY_SUSPECT",
082: new VERIFY_SUSPECT.VerifyHeader());
083: msg.putHeader("STABLE",
084: new org.jgroups.protocols.pbcast.STABLE.StableHeader());
085: msg.putHeader("NAKACK",
086: new org.jgroups.protocols.pbcast.NakAckHeader());
087: msg.putHeader("UNICAST", new UNICAST.UnicastHeader());
088: msg.putHeader("FRAG", new FragHeader());
089: msg.putHeader("GMS",
090: new org.jgroups.protocols.pbcast.GMS.GmsHeader());
091: }
092:
093: private void printDiffs(LinkedList l_ser, LinkedList l_stream) {
094: int size_ser, size_stream;
095: long write_ser, write_stream, read_ser, read_stream;
096:
097: size_ser = ((Integer) l_ser.get(0)).intValue();
098: size_stream = ((Integer) l_stream.get(0)).intValue();
099: write_ser = ((Long) l_ser.get(1)).longValue();
100: read_ser = ((Long) l_ser.get(2)).longValue();
101: write_stream = ((Long) l_stream.get(1)).longValue();
102: read_stream = ((Long) l_stream.get(2)).longValue();
103: System.out.println("\n\nserialized size=" + size_ser
104: + ", streamable size=" + size_stream
105: + ", streamable is "
106: + (100.0 / size_stream * size_ser - 100)
107: + " percent smaller");
108: System.out.println("serialized write=" + write_ser
109: + ", streamable write=" + write_stream
110: + ", streamable write is "
111: + (100.0 / write_stream * write_ser - 100)
112: + " percent faster");
113: System.out.println("serialized read=" + read_ser
114: + ", streamable read=" + read_stream
115: + ", streamable read is "
116: + (100.0 / read_stream * read_ser - 100)
117: + " percent faster");
118: }
119:
120: /**
121: *
122: * @return LinkedList with 3 elements: size of serialized buffer, write time, read time
123: * @throws IOException
124: */
125: LinkedList serializeMessage() throws IOException {
126: LinkedList retval = new LinkedList();
127: System.out.println("-- starting to serialize " + num + " msgs");
128: start = System.currentTimeMillis();
129: output = new ExposedByteArrayOutputStream(65000);
130: out = new ObjectOutputStream(output);
131: my_list.writeExternal(out);
132: out.close();
133: stop = System.currentTimeMillis();
134: buf = new Buffer(output.getRawBuffer(), 0, output.size());
135: System.out.println("** serialized buffer size="
136: + buf.getLength() + " bytes");
137: retval.add(new Integer(buf.getLength()));
138:
139: total = stop - start;
140: retval.add(new Long(total));
141: msgs_per_sec = num / (total / 1000.0);
142: time_per_msg = total / (double) num;
143: System.out.println("\n-- total time for serializing " + num
144: + " msgs = " + total + "ms \n(" + msgs_per_sec
145: + " msgs/sec, time_per_msg=" + time_per_msg + " ms)");
146:
147: System.out.println("-- starting to unserialize msgs");
148: start = System.currentTimeMillis();
149: ByteArrayInputStream input2 = new ByteArrayInputStream(buf
150: .getBuf(), buf.getOffset(), buf.getLength());
151: ObjectInputStream in2 = new ObjectInputStream(input2);
152:
153: try {
154: l2.readExternal(in2);
155: } catch (ClassNotFoundException e) {
156: e.printStackTrace();
157: }
158: stop = System.currentTimeMillis();
159: total = stop - start;
160: retval.add(new Long(total));
161: msgs_read = l2.size();
162: msgs_per_sec = msgs_read / (total / 1000.0);
163: time_per_msg = total / (double) msgs_read;
164: System.out.println("\n-- total time for reading " + msgs_read
165: + " msgs = " + total + "ms \n(" + msgs_per_sec
166: + " msgs/sec, time_per_msg=" + time_per_msg + ')');
167: l2.removeAll();
168: return retval;
169: }
170:
171: LinkedList marshalMessages() throws IOException,
172: IllegalAccessException, InstantiationException {
173: LinkedList retval = new LinkedList();
174: System.out.println("\n\n-- starting to marshal " + num
175: + " msgs (using Streamable)");
176: start = System.currentTimeMillis();
177: output = new ExposedByteArrayOutputStream(65000);
178: dos = new DataOutputStream(output);
179: dos.writeInt(my_list.size());
180: for (Enumeration en = my_list.elements(); en.hasMoreElements();) {
181: Message tmp = (Message) en.nextElement();
182: tmp.writeTo(dos);
183: }
184:
185: dos.close();
186: stop = System.currentTimeMillis();
187: buf = new Buffer(output.getRawBuffer(), 0, output.size());
188: System.out.println("** marshalled buffer size="
189: + buf.getLength() + " bytes");
190: retval.add(new Integer(buf.getLength()));
191:
192: total = stop - start;
193: retval.add(new Long(total));
194: msgs_per_sec = num / (total / 1000.0);
195: time_per_msg = total / (double) num;
196: System.out.println("\n-- total time for marshaling " + num
197: + " msgs = " + total + "ms \n(" + msgs_per_sec
198: + " msgs/sec, time_per_msg=" + time_per_msg + " ms)");
199:
200: System.out
201: .println("-- starting to unmarshal msgs (using Streamable)");
202: start = System.currentTimeMillis();
203: input = new ByteArrayInputStream(buf.getBuf(), buf.getOffset(),
204: buf.getLength());
205: dis = new DataInputStream(input);
206: msgs_read = 0;
207:
208: int b = dis.readInt();
209: Message tmp;
210: for (int i = 0; i < b; i++) {
211: tmp = new Message(false);
212: tmp.readFrom(dis);
213: l2.add(tmp);
214: }
215:
216: stop = System.currentTimeMillis();
217: total = stop - start;
218: retval.add(new Long(total));
219: msgs_read = l2.size();
220: msgs_per_sec = msgs_read / (total / 1000.0);
221: time_per_msg = total / (double) msgs_read;
222: System.out.println("\n-- total time for reading " + msgs_read
223: + " msgs = " + total + "ms \n(" + msgs_per_sec
224: + " msgs/sec, time_per_msg=" + time_per_msg + ')');
225: return retval;
226: }
227:
228: public static void main(String[] args) {
229: int num = 50000;
230: boolean use_serialization = true, use_streamable = true, use_additional_data = false, add_headers = true;
231:
232: for (int i = 0; i < args.length; i++) {
233: if (args[i].equals("-num")) {
234: num = Integer.parseInt(args[++i]);
235: continue;
236: }
237: if (args[i].equals("-use_serialization")) {
238: use_serialization = new Boolean(args[++i])
239: .booleanValue();
240: continue;
241: }
242: if (args[i].equals("-use_streamable")) {
243: use_streamable = new Boolean(args[++i]).booleanValue();
244: continue;
245: }
246: if (args[i].equals("-use_additional_data")) {
247: use_additional_data = new Boolean(args[++i])
248: .booleanValue();
249: continue;
250: }
251: if (args[i].equals("-add_headers")) {
252: add_headers = new Boolean(args[++i]).booleanValue();
253: continue;
254: }
255: help();
256: return;
257: }
258:
259: try {
260: new MessageSerializationTest2().start(num,
261: use_serialization, use_streamable,
262: use_additional_data, add_headers);
263: } catch (Exception e) {
264: e.printStackTrace();
265: }
266: }
267:
268: static void help() {
269: System.out
270: .println("MessageSerializationTest2 [-help] [-num <number>] "
271: + "[-use_serialization <true|false>] [-use_streamable <true|false>] "
272: + "[-use_additional_data <true|false>] [-add_headers <true|false>]");
273: }
274: }
|