001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017: package org.apache.catalina.tribes.demos;
018:
019: import java.io.BufferedReader;
020: import java.io.IOException;
021: import java.io.InputStreamReader;
022: import java.util.StringTokenizer;
023:
024: import org.apache.catalina.tribes.ChannelInterceptor;
025: import org.apache.catalina.tribes.ChannelInterceptor.InterceptorEvent;
026: import org.apache.catalina.tribes.Member;
027: import org.apache.catalina.tribes.group.GroupChannel;
028: import org.apache.catalina.tribes.group.interceptors.MessageDispatch15Interceptor;
029: import org.apache.catalina.tribes.group.interceptors.NonBlockingCoordinator;
030: import org.apache.catalina.tribes.group.interceptors.TcpFailureDetector;
031: import org.apache.catalina.tribes.transport.ReceiverBase;
032: import org.apache.catalina.tribes.util.Arrays;
033:
034: public class CoordinationDemo {
035: static int CHANNEL_COUNT = 5;
036: static int SCREEN_WIDTH = 120;
037: static long SLEEP_TIME = 10;
038: static int CLEAR_SCREEN = 30;
039: static boolean MULTI_THREAD = false;
040: static boolean[] VIEW_EVENTS = new boolean[255];
041: StringBuffer statusLine = new StringBuffer();
042: Status[] status = null;
043: BufferedReader reader = null;
044:
045: /**
046: * Construct and show the application.
047: */
048: public CoordinationDemo() {
049: }
050:
051: public void init() {
052: reader = new BufferedReader(new InputStreamReader(System.in));
053: status = new Status[CHANNEL_COUNT];
054: }
055:
056: public void clearScreen() {
057: StringBuffer buf = new StringBuffer(700);
058: for (int i = 0; i < CLEAR_SCREEN; i++)
059: buf.append("\n");
060: System.out.println(buf);
061: }
062:
063: public void printMenuOptions() {
064: System.out.println("Commands:");
065: System.out.println("\tstart [member id]");
066: System.out.println("\tstop [member id]");
067: System.out.println("\tprint (refresh)");
068: System.out.println("\tquit");
069: System.out.print("Enter command:");
070: }
071:
072: public synchronized void printScreen() {
073: clearScreen();
074: System.out.println(" ###." + getHeader());
075: for (int i = 0; i < status.length; i++) {
076: System.out.print(leftfill(String.valueOf(i + 1) + ".", 5,
077: " "));
078: if (status[i] != null)
079: System.out.print(status[i].getStatusLine());
080: }
081: System.out.println("\n\n");
082: System.out.println("Overall status:" + statusLine);
083: printMenuOptions();
084:
085: }
086:
087: public String getHeader() {
088: //member - 30
089: //running- 10
090: //coord - 30
091: //view-id - 24
092: //view count - 8
093:
094: StringBuffer buf = new StringBuffer();
095: buf.append(leftfill("Member", 30, " "));
096: buf.append(leftfill("Running", 10, " "));
097: buf.append(leftfill("Coord", 30, " "));
098: buf.append(leftfill("View-id(short)", 24, " "));
099: buf.append(leftfill("Count", 8, " "));
100: buf.append("\n");
101:
102: buf.append(rightfill("==="
103: + new java.sql.Timestamp(System.currentTimeMillis())
104: .toString(), SCREEN_WIDTH, "="));
105: buf.append("\n");
106: return buf.toString();
107: }
108:
109: public String[] tokenize(String line) {
110: StringTokenizer tz = new StringTokenizer(line, " ");
111: String[] result = new String[tz.countTokens()];
112: for (int i = 0; i < result.length; i++)
113: result[i] = tz.nextToken();
114: return result;
115: }
116:
117: public void waitForInput() throws IOException {
118: for (int i = 0; i < status.length; i++)
119: status[i] = new Status(this );
120: printScreen();
121: String l = reader.readLine();
122: String[] args = tokenize(l);
123: while (args.length >= 1 && (!"quit".equalsIgnoreCase(args[0]))) {
124: if ("start".equalsIgnoreCase(args[0])) {
125: cmdStart(args);
126: } else if ("stop".equalsIgnoreCase(args[0])) {
127: cmdStop(args);
128:
129: }
130: printScreen();
131: l = reader.readLine();
132: args = tokenize(l);
133: }
134: for (int i = 0; i < status.length; i++)
135: status[i].stop();
136: }
137:
138: private void cmdStop(String[] args) {
139: if (args.length == 1) {
140: setSystemStatus("System shutting down...");
141: Thread[] t = new Thread[CHANNEL_COUNT];
142: for (int i = 0; i < status.length; i++) {
143: final int j = i;
144: t[j] = new Thread() {
145: public void run() {
146: status[j].stop();
147: }
148: };
149: }
150: for (int i = 0; i < status.length; i++)
151: if (MULTI_THREAD)
152: t[i].start();
153: else
154: t[i].run();
155: setSystemStatus("System stopped.");
156: } else {
157: int index = -1;
158: try {
159: index = Integer.parseInt(args[1]) - 1;
160: } catch (Exception x) {
161: setSystemStatus("Invalid index:" + args[1]);
162: }
163: if (index >= 0) {
164: setSystemStatus("Stopping member:" + (index + 1));
165: status[index].stop();
166: setSystemStatus("Member stopped:" + (index + 1));
167: }
168: }
169: }
170:
171: private void cmdStart(String[] args) {
172: if (args.length == 1) {
173: setSystemStatus("System starting up...");
174: Thread[] t = new Thread[CHANNEL_COUNT];
175: for (int i = 0; i < status.length; i++) {
176: final int j = i;
177: t[j] = new Thread() {
178: public void run() {
179: status[j].start();
180: }
181: };
182: }
183: for (int i = 0; i < status.length; i++)
184: if (MULTI_THREAD)
185: t[i].start();
186: else
187: t[i].run();
188: setSystemStatus("System started.");
189: } else {
190: int index = -1;
191: try {
192: index = Integer.parseInt(args[1]) - 1;
193: } catch (Exception x) {
194: setSystemStatus("Invalid index:" + args[1]);
195: }
196: if (index >= 0) {
197: setSystemStatus("Starting member:" + (index + 1));
198: status[index].start();
199: setSystemStatus("Member started:" + (index + 1));
200: }
201: }
202: }
203:
204: public void setSystemStatus(String status) {
205: statusLine.delete(0, statusLine.length());
206: statusLine.append(status);
207: }
208:
209: public static void setEvents(String events) {
210: java.util.Arrays.fill(VIEW_EVENTS, false);
211: StringTokenizer t = new StringTokenizer(events, ",");
212: while (t.hasMoreTokens()) {
213: int idx = Integer.parseInt(t.nextToken());
214: VIEW_EVENTS[idx] = true;
215: }
216: }
217:
218: public static void run(String[] args, CoordinationDemo demo)
219: throws Exception {
220: usage();
221: java.util.Arrays.fill(VIEW_EVENTS, true);
222:
223: for (int i = 0; i < args.length; i++) {
224: if ("-c".equals(args[i]))
225: CHANNEL_COUNT = Integer.parseInt(args[++i]);
226: else if ("-t".equals(args[i]))
227: MULTI_THREAD = Boolean.parseBoolean(args[++i]);
228: else if ("-s".equals(args[i]))
229: SLEEP_TIME = Long.parseLong(args[++i]);
230: else if ("-sc".equals(args[i]))
231: CLEAR_SCREEN = Integer.parseInt(args[++i]);
232: else if ("-p".equals(args[i]))
233: setEvents(args[++i]);
234: else if ("-h".equals(args[i]))
235: System.exit(0);
236: }
237: demo.init();
238: demo.waitForInput();
239: }
240:
241: private static void usage() {
242: System.out.println("Usage:");
243: System.out
244: .println("\tjava org.apache.catalina.tribes.demos.CoordinationDemo -c channel-count(int) -t multi-thread(true|false) -s sleep-time(ms) -sc clear-screen(int) -p view_events_csv(1,2,5,7)");
245: System.out.println("Example:");
246: System.out
247: .println("\tjava o.a.c.t.d.CoordinationDemo -> starts demo single threaded start/stop with 5 channels");
248: System.out
249: .println("\tjava o.a.c.t.d.CoordinationDemo -c 10 -> starts demo single threaded start/stop with 10 channels");
250: System.out
251: .println("\tjava o.a.c.t.d.CoordinationDemo -c 7 -t true -s 1000 -sc 50-> starts demo multi threaded start/stop with 7 channels and 1 second sleep time between events and 50 lines to clear screen");
252: System.out
253: .println("\tjava o.a.c.t.d.CoordinationDemo -t true -p 12 -> starts demo multi threaded start/stop with 5 channels and only prints the EVT_CONF_RX event");
254: System.out.println();
255: }
256:
257: public static void main(String[] args) throws Exception {
258: CoordinationDemo demo = new CoordinationDemo();
259: run(args, demo);
260: }
261:
262: public static String leftfill(String value, int length, String ch) {
263: return fill(value, length, ch, true);
264: }
265:
266: public static String rightfill(String value, int length, String ch) {
267: return fill(value, length, ch, false);
268: }
269:
270: public static String fill(String value, int length, String ch,
271: boolean left) {
272: StringBuffer buf = new StringBuffer();
273: if (!left)
274: buf.append(value.trim());
275: for (int i = value.trim().length(); i < length; i++)
276: buf.append(ch);
277: if (left)
278: buf.append(value.trim());
279: return buf.toString();
280: }
281:
282: public static class Status {
283: public CoordinationDemo parent;
284: public GroupChannel channel;
285: NonBlockingCoordinator interceptor = null;
286: public String status;
287: public Exception error;
288: public String startstatus = "new";
289:
290: public Status(CoordinationDemo parent) {
291: this .parent = parent;
292: }
293:
294: public String getStatusLine() {
295: //member - 30
296: //running- 10
297: //coord - 30
298: //view-id - 24
299: //view count - 8
300: StringBuffer buf = new StringBuffer();
301: String local = "";
302: String coord = "";
303: String viewId = "";
304: String count = "0";
305: if (channel != null) {
306: Member lm = channel.getLocalMember(false);
307: local = lm != null ? lm.getName() : "";
308: coord = interceptor != null
309: && interceptor.getCoordinator() != null ? interceptor
310: .getCoordinator().getName()
311: : "";
312: viewId = getByteString(interceptor.getViewId() != null ? interceptor
313: .getViewId().getBytes()
314: : new byte[0]);
315: count = String.valueOf(interceptor.getView().length);
316: }
317: buf.append(leftfill(local, 30, " "));
318: buf.append(leftfill(startstatus, 10, " "));
319: buf.append(leftfill(coord, 30, " "));
320: buf.append(leftfill(viewId, 24, " "));
321: buf.append(leftfill(count, 8, " "));
322: buf.append("\n");
323: buf.append("Status:" + status);
324: buf.append("\n");
325: return buf.toString();
326: }
327:
328: public String getByteString(byte[] b) {
329: if (b == null)
330: return "{}";
331: return Arrays.toString(b, 0, Math.min(b.length, 4));
332: }
333:
334: public void start() {
335: try {
336: if (channel == null) {
337: channel = createChannel();
338: startstatus = "starting";
339: channel.start(channel.DEFAULT);
340: startstatus = "running";
341: } else {
342: status = "Channel already started.";
343: }
344: } catch (Exception x) {
345: synchronized (System.err) {
346: System.err.println("Start failed:");
347: StackTraceElement[] els = x.getStackTrace();
348: for (int i = 0; i < els.length; i++)
349: System.err.println(els[i].toString());
350: }
351: status = "Start failed:" + x.getMessage();
352: error = x;
353: startstatus = "failed";
354: try {
355: channel.stop(GroupChannel.DEFAULT);
356: } catch (Exception ignore) {
357: }
358: channel = null;
359: interceptor = null;
360: }
361: }
362:
363: public void stop() {
364: try {
365: if (channel != null) {
366: channel.stop(channel.DEFAULT);
367: status = "Channel Stopped";
368: } else {
369: status = "Channel Already Stopped";
370: }
371: } catch (Exception x) {
372: synchronized (System.err) {
373: System.err.println("Stop failed:");
374: StackTraceElement[] els = x.getStackTrace();
375: for (int i = 0; i < els.length; i++)
376: System.err.println(els[i].toString());
377: }
378:
379: status = "Stop failed:" + x.getMessage();
380: error = x;
381: } finally {
382: startstatus = "stopped";
383: channel = null;
384: interceptor = null;
385: }
386: }
387:
388: public GroupChannel createChannel() {
389: channel = new GroupChannel();
390: ((ReceiverBase) channel.getChannelReceiver())
391: .setAutoBind(100);
392: interceptor = new NonBlockingCoordinator() {
393: public void fireInterceptorEvent(InterceptorEvent event) {
394: status = event.getEventTypeDesc();
395: int type = event.getEventType();
396: boolean display = VIEW_EVENTS[type];
397: if (display)
398: parent.printScreen();
399: try {
400: Thread.sleep(SLEEP_TIME);
401: } catch (Exception x) {
402: }
403: }
404: };
405: channel.addInterceptor(interceptor);
406: channel.addInterceptor(new TcpFailureDetector());
407: channel.addInterceptor(new MessageDispatch15Interceptor());
408: return channel;
409: }
410: }
411: }
|