001: // $Id: ProtocolStack.java,v 1.27.6.1 2007/04/27 06:26:38 belaban Exp $
002:
003: package org.jgroups.stack;
004:
005: import org.jgroups.*;
006: import org.jgroups.conf.ClassConfigurator;
007: import org.jgroups.util.Promise;
008: import org.jgroups.util.TimeScheduler;
009:
010: import java.util.*;
011:
012: /**
013: * A ProtocolStack manages a number of protocols layered above each other. It creates all
014: * protocol classes, initializes them and, when ready, starts all of them, beginning with the
015: * bottom most protocol. It also dispatches messages received from the stack to registered
016: * objects (e.g. channel, GMP) and sends messages sent by those objects down the stack.<p>
017: * The ProtocolStack makes use of the Configurator to setup and initialize stacks, and to
018: * destroy them again when not needed anymore
019: * @author Bela Ban
020: */
021: public class ProtocolStack extends Protocol implements Transport {
022: private Protocol top_prot = null;
023: private Protocol bottom_prot = null;
024: private final Configurator conf = new Configurator();
025: private String setup_string;
026: private JChannel channel = null;
027: private boolean stopped = true;
028: public TimeScheduler timer = new TimeScheduler();
029:
030: /** Used to sync on START/START_OK events for start()*/
031: Promise start_promise = null;
032:
033: /** used to sync on STOP/STOP_OK events for stop() */
034: Promise stop_promise = null;
035:
036: public static final int ABOVE = 1; // used by insertProtocol()
037: public static final int BELOW = 2; // used by insertProtocol()
038:
039: public ProtocolStack(JChannel channel, String setup_string)
040: throws ChannelException {
041: this .setup_string = setup_string;
042: this .channel = channel;
043: ClassConfigurator.getInstance(true); // will create the singleton
044: }
045:
046: /** Only used by Simulator; don't use */
047: public ProtocolStack() {
048:
049: }
050:
051: public Channel getChannel() {
052: return channel;
053: }
054:
055: /** Returns all protocols in a list, from top to bottom. <em>These are not copies of protocols,
056: so modifications will affect the actual instances !</em> */
057: public Vector getProtocols() {
058: Protocol p;
059: Vector v = new Vector();
060:
061: p = top_prot;
062: while (p != null) {
063: v.addElement(p);
064: p = p.getDownProtocol();
065: }
066: return v;
067: }
068:
069: /**
070: *
071: * @return Map<String,Map<key,val>>
072: */
073: public Map dumpStats() {
074: Protocol p;
075: Map retval = new HashMap(), tmp;
076: String prot_name;
077:
078: p = top_prot;
079: while (p != null) {
080: prot_name = p.getName();
081: tmp = p.dumpStats();
082: if (prot_name != null && tmp != null)
083: retval.put(prot_name, tmp);
084: p = p.getDownProtocol();
085: }
086: return retval;
087: }
088:
089: public String dumpTimerQueue() {
090: return timer != null ? timer.dumpTaskQueue() : "";
091: }
092:
093: /**
094: * Prints the names of the protocols, from the bottom to top. If include_properties is true,
095: * the properties for each protocol will also be printed.
096: */
097: public String printProtocolSpec(boolean include_properties) {
098: StringBuffer sb = new StringBuffer();
099: Protocol prot = top_prot;
100: Properties tmpProps;
101: String name;
102: Map.Entry entry;
103:
104: while (prot != null) {
105: name = prot.getName();
106: if (name != null) {
107: if ("ProtocolStack".equals(name))
108: break;
109: sb.append(name);
110: if (include_properties) {
111: tmpProps = prot.getProperties();
112: if (tmpProps != null) {
113: sb.append('\n');
114: for (Iterator it = tmpProps.entrySet()
115: .iterator(); it.hasNext();) {
116: entry = (Map.Entry) it.next();
117: sb.append(entry).append("\n");
118: }
119: }
120: }
121: sb.append('\n');
122:
123: prot = prot.getDownProtocol();
124: }
125: }
126:
127: return sb.toString();
128: }
129:
130: public String printProtocolSpecAsXML() {
131: StringBuffer sb = new StringBuffer();
132: Protocol prot = bottom_prot;
133: Properties tmpProps;
134: String name;
135: Map.Entry entry;
136: int len, max_len = 30;
137:
138: sb.append("<config>\n");
139: while (prot != null) {
140: name = prot.getName();
141: if (name != null) {
142: if ("ProtocolStack".equals(name))
143: break;
144: sb.append(" <").append(name).append(" ");
145: tmpProps = prot.getProperties();
146: if (tmpProps != null) {
147: len = name.length();
148: String s;
149: for (Iterator it = tmpProps.entrySet().iterator(); it
150: .hasNext();) {
151: entry = (Map.Entry) it.next();
152: s = entry.getKey() + "=\"" + entry.getValue()
153: + "\" ";
154: if (len + s.length() > max_len) {
155: sb.append("\n ");
156: len = 8;
157: }
158: sb.append(s);
159: len += s.length();
160: }
161: }
162: sb.append("/>\n");
163: prot = prot.getUpProtocol();
164: }
165: }
166: sb.append("</config>");
167:
168: return sb.toString();
169: }
170:
171: public void setup() throws Exception {
172: if (timer == null) {
173: timer = new TimeScheduler();
174: timer.start();
175: }
176: if (top_prot == null) {
177: top_prot = conf.setupProtocolStack(setup_string, this );
178: if (top_prot == null)
179: throw new Exception("couldn't create protocol stack");
180: top_prot.setUpProtocol(this );
181: bottom_prot = conf.getBottommostProtocol(top_prot);
182: conf.initProtocolStack(bottom_prot); // calls init() on each protocol, from bottom to top
183: conf.startProtocolStack(bottom_prot); // sets up queues and threads
184: }
185: }
186:
187: /**
188: * Creates a new protocol given the protocol specification.
189: * @param prot_spec The specification of the protocol. Same convention as for specifying a protocol stack.
190: * An exception will be thrown if the class cannot be created. Example:
191: * <pre>"VERIFY_SUSPECT(timeout=1500)"</pre> Note that no colons (:) have to be
192: * specified
193: * @return Protocol The newly created protocol
194: * @exception Exception Will be thrown when the new protocol cannot be created
195: */
196: public Protocol createProtocol(String prot_spec) throws Exception {
197: return conf.createProtocol(prot_spec, this );
198: }
199:
200: /**
201: * Inserts an already created (and initialized) protocol into the protocol list. Sets the links
202: * to the protocols above and below correctly and adjusts the linked list of protocols accordingly.
203: * Note that this method may change the value of top_prot or bottom_prot.
204: * @param prot The protocol to be inserted. Before insertion, a sanity check will ensure that none
205: * of the existing protocols have the same name as the new protocol.
206: * @param position Where to place the protocol with respect to the neighbor_prot (ABOVE, BELOW)
207: * @param neighbor_prot The name of the neighbor protocol. An exception will be thrown if this name
208: * is not found
209: * @exception Exception Will be thrown when the new protocol cannot be created, or inserted.
210: */
211: public void insertProtocol(Protocol prot, int position,
212: String neighbor_prot) throws Exception {
213: conf.insertProtocol(prot, position, neighbor_prot, this );
214: }
215:
216: /**
217: * Removes a protocol from the stack. Stops the protocol and readjusts the linked lists of
218: * protocols.
219: * @param prot_name The name of the protocol. Since all protocol names in a stack have to be unique
220: * (otherwise the stack won't be created), the name refers to just 1 protocol.
221: * @exception Exception Thrown if the protocol cannot be stopped correctly.
222: */
223: public void removeProtocol(String prot_name) throws Exception {
224: conf.removeProtocol(prot_name);
225: }
226:
227: /** Returns a given protocol or null if not found */
228: public Protocol findProtocol(String name) {
229: Protocol tmp = top_prot;
230: String prot_name;
231: while (tmp != null) {
232: prot_name = tmp.getName();
233: if (prot_name != null && prot_name.equals(name))
234: return tmp;
235: tmp = tmp.getDownProtocol();
236: }
237: return null;
238: }
239:
240: public void destroy() {
241: if (timer != null) {
242: try {
243: timer.stop();
244: timer.cancel();
245: timer = null;
246: } catch (Exception ex) {
247: }
248: }
249:
250: if (top_prot != null) {
251: conf.stopProtocolStack(top_prot); // destroys msg queues and threads
252: top_prot = null;
253: }
254: }
255:
256: /**
257: * Start all layers. The {@link Protocol#start()} method is called in each protocol,
258: * <em>from top to bottom</em>.
259: * Each layer can perform some initialization, e.g. create a multicast socket
260: */
261: public void startStack() throws Exception {
262: Object start_result = null;
263: if (stopped == false)
264: return;
265:
266: if (start_promise == null)
267: start_promise = new Promise();
268: else
269: start_promise.reset();
270:
271: down(new Event(Event.START));
272: start_result = start_promise.getResult(0);
273: if (start_result != null && start_result instanceof Throwable) {
274: if (start_result instanceof Exception)
275: throw (Exception) start_result;
276: else
277: throw new Exception("failed starting stack: "
278: + start_result);
279: }
280:
281: stopped = false;
282: }
283:
284: public void startUpHandler() {
285: // DON'T REMOVE !!!! Avoids a superfluous thread
286: }
287:
288: public void startDownHandler() {
289: // DON'T REMOVE !!!! Avoids a superfluous thread
290: }
291:
292: /**
293: * Iterates through all the protocols <em>from top to bottom</em> and does the following:
294: * <ol>
295: * <li>Waits until all messages in the down queue have been flushed (ie., size is 0)
296: * <li>Calls stop() on the protocol
297: * </ol>
298: */
299: public void stopStack() {
300: if (stopped)
301: return;
302:
303: if (stop_promise == null)
304: stop_promise = new Promise();
305: else
306: stop_promise.reset();
307:
308: down(new Event(Event.STOP));
309: stop_promise.getResult(5000);
310: stopped = true;
311: }
312:
313: /**
314: * Not needed anymore, just left in here for backwards compatibility with JBoss AS
315: * @deprecated
316: */
317: public void flushEvents() {
318:
319: }
320:
321: public void stopInternal() {
322: // do nothing, DON'T REMOVE !!!!
323: }
324:
325: /*--------------------------- Transport interface ------------------------------*/
326:
327: public void send(Message msg) throws Exception {
328: down(new Event(Event.MSG, msg));
329: }
330:
331: public Object receive(long timeout) throws Exception {
332: throw new Exception(
333: "ProtocolStack.receive(): not implemented !");
334: }
335:
336: /*------------------------- End of Transport interface ---------------------------*/
337:
338: /*--------------------------- Protocol functionality ------------------------------*/
339: public String getName() {
340: return "ProtocolStack";
341: }
342:
343: public void up(Event evt) {
344: switch (evt.getType()) {
345: case Event.START_OK:
346: if (start_promise != null)
347: start_promise.setResult(evt.getArg());
348: return;
349: case Event.STOP_OK:
350: if (stop_promise != null)
351: stop_promise.setResult(evt.getArg());
352: return;
353: }
354:
355: if (channel != null)
356: channel.up(evt);
357: }
358:
359: public void down(Event evt) {
360: if (top_prot != null)
361: top_prot.receiveDownEvent(evt);
362: else
363: log.error("no down protocol available !");
364: }
365:
366: protected void receiveUpEvent(Event evt) {
367: up(evt);
368: }
369:
370: /** Override with null functionality: we don't need any threads to be started ! */
371: public void startWork() {
372: }
373:
374: /** Override with null functionality: we don't need any threads to be started ! */
375: public void stopWork() {
376: }
377:
378: /*----------------------- End of Protocol functionality ---------------------------*/
379:
380: }
|