001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017: package org.apache.catalina.tribes.demos;
018:
019: import java.io.Serializable;
020:
021: import org.apache.catalina.tribes.Member;
022: import org.apache.catalina.tribes.group.RpcCallback;
023: import org.apache.catalina.tribes.Channel;
024: import org.apache.catalina.tribes.ManagedChannel;
025: import org.apache.catalina.tribes.group.RpcChannel;
026: import org.apache.catalina.tribes.group.Response;
027:
028: /**
029: * <p>Title: </p>
030: *
031: * <p>Description: </p>
032: *
033: * <p>Company: </p>
034: *
035: * @author not attributable
036: * @version 1.0
037: */
038: public class EchoRpcTest implements RpcCallback, Runnable {
039:
040: Channel channel;
041: int count;
042: String message;
043: long pause;
044: RpcChannel rpc;
045: int options;
046: long timeout;
047: String name;
048:
049: public EchoRpcTest(Channel channel, String name, int count,
050: String message, long pause, int options, long timeout) {
051: this .channel = channel;
052: this .count = count;
053: this .message = message;
054: this .pause = pause;
055: this .options = options;
056: this .rpc = new RpcChannel(name.getBytes(), channel, this );
057: this .timeout = timeout;
058: this .name = name;
059: }
060:
061: /**
062: * If the reply has already been sent to the requesting thread, the rpc
063: * callback can handle any data that comes in after the fact.
064: *
065: * @param msg Serializable
066: * @param sender Member
067: * @todo Implement this org.apache.catalina.tribes.tipis.RpcCallback
068: * method
069: */
070: public void leftOver(Serializable msg, Member sender) {
071: System.out.println("Received a left over message from ["
072: + sender.getName() + "] with data [" + msg + "]");
073: }
074:
075: /**
076: *
077: * @param msg Serializable
078: * @param sender Member
079: * @return Serializable - null if no reply should be sent
080: * @todo Implement this org.apache.catalina.tribes.tipis.RpcCallback
081: * method
082: */
083: public Serializable replyRequest(Serializable msg, Member sender) {
084: System.out.println("Received a reply request message from ["
085: + sender.getName() + "] with data [" + msg + "]");
086: return "Reply(" + name + "):" + msg;
087: }
088:
089: public void run() {
090: long counter = 0;
091: while (counter < count) {
092: String msg = message + " cnt=" + (++counter);
093: try {
094: System.out.println("Sending [" + msg + "]");
095: long start = System.currentTimeMillis();
096: Response[] resp = rpc.send(channel.getMembers(),
097: (Serializable) msg, options,
098: Channel.SEND_OPTIONS_DEFAULT, timeout);
099: System.out
100: .println("Send of [" + msg
101: + "] completed. Nr of responses="
102: + resp.length + " Time:"
103: + (System.currentTimeMillis() - start)
104: + " ms.");
105: for (int i = 0; i < resp.length; i++) {
106: System.out
107: .println("Received a response message from ["
108: + resp[i].getSource().getName()
109: + "] with data ["
110: + resp[i].getMessage() + "]");
111: }
112: Thread.sleep(pause);
113: } catch (Exception x) {
114: }
115: }
116: }
117:
118: public static void usage() {
119: System.out.println("Tribes RPC tester.");
120: System.out
121: .println("Usage:\n\t"
122: + "java EchoRpcTest [options]\n\t"
123: + "Options:\n\t\t"
124: + "[-mode all|first|majority] \n\t\t"
125: + "[-debug] \n\t\t"
126: + "[-count messagecount] \n\t\t"
127: + "[-timeout timeoutinms] \n\t\t"
128: + "[-stats statinterval] \n\t\t"
129: + "[-pause nrofsecondstopausebetweensends] \n\t\t"
130: + "[-message message] \n\t\t"
131: + "[-name rpcname] \n\t\t"
132: + "[-break (halts execution on exception)]\n"
133: + "\tChannel options:"
134: + ChannelCreator.usage()
135: + "\n\n"
136: + "Example:\n\t"
137: + "java EchoRpcTest -port 4004\n\t"
138: + "java EchoRpcTest -bind 192.168.0.45 -port 4005\n\t"
139: + "java EchoRpcTest -bind 192.168.0.45 -port 4005 -mbind 192.168.0.45 -count 100 -stats 10\n");
140: }
141:
142: public static void main(String[] args) throws Exception {
143: boolean send = true;
144: boolean debug = false;
145: long pause = 3000;
146: int count = 1000000;
147: int stats = 10000;
148: String name = "EchoRpcId";
149: boolean breakOnEx = false;
150: int threads = 1;
151: int options = RpcChannel.ALL_REPLY;
152: long timeout = 15000;
153: String message = "EchoRpcMessage";
154: if (args.length == 0) {
155: args = new String[] { "-help" };
156: }
157: for (int i = 0; i < args.length; i++) {
158: if ("-threads".equals(args[i])) {
159: threads = Integer.parseInt(args[++i]);
160: } else if ("-count".equals(args[i])) {
161: count = Integer.parseInt(args[++i]);
162: System.out.println("Sending " + count + " messages.");
163: } else if ("-pause".equals(args[i])) {
164: pause = Long.parseLong(args[++i]) * 1000;
165: } else if ("-break".equals(args[i])) {
166: breakOnEx = true;
167: } else if ("-stats".equals(args[i])) {
168: stats = Integer.parseInt(args[++i]);
169: System.out.println("Stats every " + stats + " message");
170: } else if ("-timeout".equals(args[i])) {
171: timeout = Long.parseLong(args[++i]);
172: } else if ("-message".equals(args[i])) {
173: message = args[++i];
174: } else if ("-name".equals(args[i])) {
175: name = args[++i];
176: } else if ("-mode".equals(args[i])) {
177: if ("all".equals(args[++i]))
178: options = RpcChannel.ALL_REPLY;
179: else if ("first".equals(args[i]))
180: options = RpcChannel.FIRST_REPLY;
181: else if ("majority".equals(args[i]))
182: options = RpcChannel.MAJORITY_REPLY;
183: } else if ("-debug".equals(args[i])) {
184: debug = true;
185: } else if ("-help".equals(args[i])) {
186: usage();
187: System.exit(1);
188: }
189: }
190:
191: ManagedChannel channel = (ManagedChannel) ChannelCreator
192: .createChannel(args);
193: EchoRpcTest test = new EchoRpcTest(channel, name, count,
194: message, pause, options, timeout);
195: channel.start(channel.DEFAULT);
196: Runtime.getRuntime().addShutdownHook(new Shutdown(channel));
197: test.run();
198:
199: System.out
200: .println("System test complete, sleeping to let threads finish.");
201: Thread.sleep(60 * 1000 * 60);
202: }
203:
204: public static class Shutdown extends Thread {
205: ManagedChannel channel = null;
206:
207: public Shutdown(ManagedChannel channel) {
208: this .channel = channel;
209: }
210:
211: public void run() {
212: System.out.println("Shutting down...");
213: SystemExit exit = new SystemExit(5000);
214: exit.setDaemon(true);
215: exit.start();
216: try {
217: channel.stop(channel.DEFAULT);
218:
219: } catch (Exception x) {
220: x.printStackTrace();
221: }
222: System.out.println("Channel stopped.");
223: }
224: }
225:
226: public static class SystemExit extends Thread {
227: private long delay;
228:
229: public SystemExit(long delay) {
230: this .delay = delay;
231: }
232:
233: public void run() {
234: try {
235: Thread.sleep(delay);
236: } catch (Exception x) {
237: x.printStackTrace();
238: }
239: System.exit(0);
240:
241: }
242: }
243: }
|