001: // $Id: Protocol.java,v 1.38.6.1 2007/04/27 08:03:57 belaban Exp $
002:
003: package org.jgroups.stack;
004:
005: import org.apache.commons.logging.Log;
006: import org.apache.commons.logging.LogFactory;
007: import org.jgroups.Event;
008: import org.jgroups.util.Queue;
009: import org.jgroups.util.QueueClosedException;
010: import org.jgroups.util.Util;
011:
012: import java.util.Map;
013: import java.util.Properties;
014: import java.util.Vector;
015:
016: class UpHandler extends Thread {
017: private Queue mq = null;
018: private Protocol handler = null;
019: private ProtocolObserver observer = null;
020: protected final Log log = LogFactory.getLog(this .getClass());
021:
022: public UpHandler(Queue mq, Protocol handler,
023: ProtocolObserver observer) {
024: super (Util.getGlobalThreadGroup(), "UpHandler");
025: this .mq = mq;
026: this .handler = handler;
027: this .observer = observer;
028: if (handler != null)
029: setName("UpHandler (" + handler.getName() + ')');
030: else
031: setName("UpHandler");
032: setDaemon(true);
033: }
034:
035: public void setObserver(ProtocolObserver observer) {
036: this .observer = observer;
037: }
038:
039: /** Removes events from mq and calls handler.up(evt) */
040: public void run() {
041: while (!mq.closed()) {
042: try {
043: Event evt = (Event) mq.remove();
044: if (evt == null) {
045: if (log.isWarnEnabled())
046: log.warn("removed null event");
047: continue;
048: }
049:
050: if (observer != null) { // call debugger hook (if installed)
051: if (observer.up(evt, mq.size()) == false) { // false means discard event
052: return;
053: }
054: }
055: handler.up(evt);
056: } catch (QueueClosedException queue_closed) {
057: break;
058: } catch (Throwable e) {
059: if (log.isErrorEnabled())
060: log.error(getName() + " caught exception", e);
061: }
062: }
063: }
064:
065: }
066:
067: class DownHandler extends Thread {
068: private Queue mq = null;
069: private Protocol handler = null;
070: private ProtocolObserver observer = null;
071: protected final Log log = LogFactory.getLog(this .getClass());
072:
073: public DownHandler(Queue mq, Protocol handler,
074: ProtocolObserver observer) {
075: super (Util.getGlobalThreadGroup(), "DownHandler");
076: this .mq = mq;
077: this .handler = handler;
078: this .observer = observer;
079: if (handler != null)
080: setName("DownHandler (" + handler.getName() + ')');
081: else
082: setName("DownHandler");
083: setDaemon(true);
084: }
085:
086: public void setObserver(ProtocolObserver observer) {
087: this .observer = observer;
088: }
089:
090: /** Removes events from mq and calls handler.down(evt) */
091: public void run() {
092: while (!mq.closed()) {
093: try {
094: Event evt = (Event) mq.remove();
095: if (evt == null) {
096: if (log.isWarnEnabled())
097: log.warn("removed null event");
098: continue;
099: }
100:
101: if (observer != null) { // call debugger hook (if installed)
102: if (observer.down(evt, mq.size()) == false) { // false means discard event
103: continue;
104: }
105: }
106:
107: int type = evt.getType();
108: if (type == Event.START || type == Event.STOP) {
109: if (handler.handleSpecialDownEvent(evt) == false)
110: continue;
111: }
112: handler.down(evt);
113: } catch (QueueClosedException queue_closed) {
114: break;
115: } catch (Throwable e) {
116: if (log.isErrorEnabled())
117: log.error(getName() + " caught exception", e);
118: }
119: }
120: }
121:
122: }
123:
124: /**
125: * The Protocol class provides a set of common services for protocol layers. Each layer has to
126: * be a subclass of Protocol and override a number of methods (typically just <code>up()</code>,
127: * <code>Down</code> and <code>getName</code>. Layers are stacked in a certain order to form
128: * a protocol stack. <a href=org.jgroups.Event.html>Events</a> are passed from lower
129: * layers to upper ones and vice versa. E.g. a Message received by the UDP layer at the bottom
130: * will be passed to its higher layer as an Event. That layer will in turn pass the Event to
131: * its layer and so on, until a layer handles the Message and sends a response or discards it,
132: * the former resulting in another Event being passed down the stack.<p>
133: * Each layer has 2 FIFO queues, one for up Events and one for down Events. When an Event is
134: * received by a layer (calling the internal upcall <code>ReceiveUpEvent</code>), it is placed
135: * in the up-queue where it will be retrieved by the up-handler thread which will invoke method
136: * <code>Up</code> of the layer. The same applies for Events traveling down the stack. Handling
137: * of the up-handler and down-handler threads and the 2 FIFO queues is donw by the Protocol
138: * class, subclasses will almost never have to override this behavior.<p>
139: * The important thing to bear in mind is that Events have to passed on between layers in FIFO
140: * order which is guaranteed by the Protocol implementation and must be guranteed by subclasses
141: * implementing their on Event queuing.<p>
142: * <b>Note that each class implementing interface Protocol MUST provide an empty, public
143: * constructor !</b>
144: */
145: public abstract class Protocol {
146: protected final Properties props = new Properties();
147: protected Protocol up_prot = null, down_prot = null;
148: protected ProtocolStack stack = null;
149: protected final Queue up_queue = new Queue();
150: protected final Queue down_queue = new Queue();
151: protected UpHandler up_handler = null;
152: protected int up_thread_prio = -1;
153: protected DownHandler down_handler = null;
154: protected int down_thread_prio = -1;
155: protected ProtocolObserver observer = null; // hook for debugger
156: private final static long THREAD_JOIN_TIMEOUT = 1000;
157: protected boolean down_thread = true; // determines whether the down_handler thread should be started
158: protected boolean up_thread = true; // determines whether the up_handler thread should be started
159: protected boolean stats = true; // determines whether to collect statistics (and expose them via JMX)
160: protected final Log log = LogFactory.getLog(this .getClass());
161:
162: /**
163: * Configures the protocol initially. A configuration string consists of name=value
164: * items, separated by a ';' (semicolon), e.g.:<pre>
165: * "loopback=false;unicast_inport=4444"
166: * </pre>
167: */
168: public boolean setProperties(Properties props) {
169: if (props != null)
170: this .props.putAll(props);
171: return true;
172: }
173:
174: /** Called by Configurator. Removes 2 properties which are used by the Protocol directly and then
175: * calls setProperties(), which might invoke the setProperties() method of the actual protocol instance.
176: */
177: public boolean setPropertiesInternal(Properties props) {
178: this .props.putAll(props);
179:
180: String str = props.getProperty("down_thread");
181: if (str != null) {
182: down_thread = Boolean.valueOf(str).booleanValue();
183: props.remove("down_thread");
184: }
185:
186: str = props.getProperty("down_thread_prio");
187: if (str != null) {
188: down_thread_prio = Integer.parseInt(str);
189: props.remove("down_thread_prio");
190: }
191:
192: str = props.getProperty("up_thread");
193: if (str != null) {
194: up_thread = Boolean.valueOf(str).booleanValue();
195: props.remove("up_thread");
196: }
197:
198: str = props.getProperty("up_thread_prio");
199: if (str != null) {
200: up_thread_prio = Integer.parseInt(str);
201: props.remove("up_thread_prio");
202: }
203:
204: str = props.getProperty("stats");
205: if (str != null) {
206: stats = Boolean.valueOf(str).booleanValue();
207: props.remove("stats");
208: }
209:
210: return setProperties(props);
211: }
212:
213: public Properties getProperties() {
214: return props;
215: }
216:
217: public boolean upThreadEnabled() {
218: return up_thread;
219: }
220:
221: public boolean downThreadEnabled() {
222: return down_thread;
223: }
224:
225: public boolean statsEnabled() {
226: return stats;
227: }
228:
229: public void enableStats(boolean flag) {
230: stats = flag;
231: }
232:
233: public void resetStats() {
234: ;
235: }
236:
237: public String printStats() {
238: return null;
239: }
240:
241: public Map dumpStats() {
242: return null;
243: }
244:
245: public void setObserver(ProtocolObserver observer) {
246: this .observer = observer;
247: observer.setProtocol(this );
248: if (up_handler != null)
249: up_handler.setObserver(observer);
250: if (down_handler != null)
251: down_handler.setObserver(observer);
252: }
253:
254: /**
255: * Called after instance has been created (null constructor) and before protocol is started.
256: * Properties are already set. Other protocols are not yet connected and events cannot yet be sent.
257: * @exception Exception Thrown if protocol cannot be initialized successfully. This will cause the
258: * ProtocolStack to fail, so the channel constructor will throw an exception
259: */
260: public void init() throws Exception {
261: }
262:
263: /**
264: * This method is called on a {@link org.jgroups.Channel#connect(String)}. Starts work.
265: * Protocols are connected and queues are ready to receive events.
266: * Will be called <em>from bottom to top</em>. This call will replace
267: * the <b>START</b> and <b>START_OK</b> events.
268: * @exception Exception Thrown if protocol cannot be started successfully. This will cause the ProtocolStack
269: * to fail, so {@link org.jgroups.Channel#connect(String)} will throw an exception
270: */
271: public void start() throws Exception {
272: }
273:
274: /**
275: * This method is called on a {@link org.jgroups.Channel#disconnect()}. Stops work (e.g. by closing multicast socket).
276: * Will be called <em>from top to bottom</em>. This means that at the time of the method invocation the
277: * neighbor protocol below is still working. This method will replace the
278: * <b>STOP</b>, <b>STOP_OK</b>, <b>CLEANUP</b> and <b>CLEANUP_OK</b> events. The ProtocolStack guarantees that
279: * when this method is called all messages in the down queue will have been flushed
280: */
281: public void stop() {
282: }
283:
284: /**
285: * This method is called on a {@link org.jgroups.Channel#close()}.
286: * Does some cleanup; after the call the VM will terminate
287: */
288: public void destroy() {
289: }
290:
291: public Queue getUpQueue() {
292: return up_queue;
293: } // used by Debugger (ProtocolView)
294:
295: public Queue getDownQueue() {
296: return down_queue;
297: } // used by Debugger (ProtocolView)
298:
299: /** List of events that are required to be answered by some layer above.
300: @return Vector (of Integers) */
301: public Vector requiredUpServices() {
302: return null;
303: }
304:
305: /** List of events that are required to be answered by some layer below.
306: @return Vector (of Integers) */
307: public Vector requiredDownServices() {
308: return null;
309: }
310:
311: /** List of events that are provided to layers above (they will be handled when sent down from
312: above).
313: @return Vector (of Integers) */
314: public Vector providedUpServices() {
315: return null;
316: }
317:
318: /** List of events that are provided to layers below (they will be handled when sent down from
319: below).
320: @return Vector (of Integers) */
321: public Vector providedDownServices() {
322: return null;
323: }
324:
325: public abstract String getName(); // all protocol names have to be unique !
326:
327: public Protocol getUpProtocol() {
328: return up_prot;
329: }
330:
331: public Protocol getDownProtocol() {
332: return down_prot;
333: }
334:
335: public void setUpProtocol(Protocol up_prot) {
336: this .up_prot = up_prot;
337: }
338:
339: public void setDownProtocol(Protocol down_prot) {
340: this .down_prot = down_prot;
341: }
342:
343: public void setProtocolStack(ProtocolStack stack) {
344: this .stack = stack;
345: }
346:
347: /** Used internally. If overridden, call this method first. Only creates the up_handler thread
348: if down_thread is true */
349: public void startUpHandler() {
350: if (up_thread) {
351: if (up_handler == null) {
352: up_handler = new UpHandler(up_queue, this , observer);
353: if (up_thread_prio >= 0) {
354: try {
355: up_handler.setPriority(up_thread_prio);
356: } catch (Throwable t) {
357: if (log.isErrorEnabled())
358: log
359: .error(
360: "priority "
361: + up_thread_prio
362: + " could not be set for thread",
363: t);
364: }
365: }
366: up_handler.start();
367: }
368: }
369: }
370:
371: /** Used internally. If overridden, call this method first. Only creates the down_handler thread
372: if down_thread is true */
373: public void startDownHandler() {
374: if (down_thread) {
375: if (down_handler == null) {
376: down_handler = new DownHandler(down_queue, this ,
377: observer);
378: if (down_thread_prio >= 0) {
379: try {
380: down_handler.setPriority(down_thread_prio);
381: } catch (Throwable t) {
382: if (log.isErrorEnabled())
383: log
384: .error(
385: "priority "
386: + down_thread_prio
387: + " could not be set for thread",
388: t);
389: }
390: }
391: down_handler.start();
392: }
393: }
394: }
395:
396: /** Used internally. If overridden, call parent's method first */
397: public void stopInternal() {
398: up_queue.close(false); // this should terminate up_handler thread
399:
400: if (up_handler != null && up_handler.isAlive()) {
401: try {
402: up_handler.join(THREAD_JOIN_TIMEOUT);
403: } catch (Exception ex) {
404: }
405: if (up_handler != null && up_handler.isAlive()) {
406: up_handler.interrupt(); // still alive ? let's just kill it without mercy...
407: try {
408: up_handler.join(THREAD_JOIN_TIMEOUT);
409: } catch (Exception ex) {
410: }
411: if (up_handler != null && up_handler.isAlive())
412: if (log.isErrorEnabled())
413: log
414: .error("up_handler thread for "
415: + getName()
416: + " was interrupted (in order to be terminated), but is still alive");
417: }
418: }
419: up_handler = null;
420:
421: down_queue.close(false); // this should terminate down_handler thread
422: if (down_handler != null && down_handler.isAlive()) {
423: try {
424: down_handler.join(THREAD_JOIN_TIMEOUT);
425: } catch (Exception ex) {
426: }
427: if (down_handler != null && down_handler.isAlive()) {
428: down_handler.interrupt(); // still alive ? let's just kill it without mercy...
429: try {
430: down_handler.join(THREAD_JOIN_TIMEOUT);
431: } catch (Exception ex) {
432: }
433: if (down_handler != null && down_handler.isAlive())
434: if (log.isErrorEnabled())
435: log
436: .error("down_handler thread for "
437: + getName()
438: + " was interrupted (in order to be terminated), but is is still alive");
439: }
440: }
441: down_handler = null;
442: }
443:
444: /**
445: * Internal method, should not be called by clients. Used by ProtocolStack. I would have
446: * used the 'friends' modifier, but this is available only in C++ ... If the up_handler thread
447: * is not available (down_thread == false), then directly call the up() method: we will run on the
448: * caller's thread (e.g. the protocol layer below us).
449: */
450: protected void receiveUpEvent(Event evt) {
451: if (up_handler == null) {
452: if (observer != null) { // call debugger hook (if installed)
453: if (observer.up(evt, up_queue.size()) == false) { // false means discard event
454: return;
455: }
456: }
457: up(evt);
458: return;
459: }
460: try {
461: up_queue.add(evt);
462: } catch (Exception e) {
463: if (log.isWarnEnabled())
464: log.warn("exception: " + e);
465: }
466: }
467:
468: /**
469: * Internal method, should not be called by clients. Used by ProtocolStack. I would have
470: * used the 'friends' modifier, but this is available only in C++ ... If the down_handler thread
471: * is not available (down_thread == false), then directly call the down() method: we will run on the
472: * caller's thread (e.g. the protocol layer above us).
473: */
474: protected void receiveDownEvent(Event evt) {
475: if (down_handler == null) {
476: if (observer != null) { // call debugger hook (if installed)
477: if (observer.down(evt, down_queue.size()) == false) { // false means discard event
478: return;
479: }
480: }
481: int type = evt.getType();
482: if (type == Event.START || type == Event.STOP) {
483: if (handleSpecialDownEvent(evt) == false)
484: return;
485: }
486: down(evt);
487: return;
488: }
489: try {
490: down_queue.add(evt);
491: } catch (Exception e) {
492: if (log.isWarnEnabled())
493: log.warn("exception: " + e);
494: }
495: }
496:
497: /**
498: * Causes the event to be forwarded to the next layer up in the hierarchy. Typically called
499: * by the implementation of <code>Up</code> (when done).
500: */
501: public void passUp(Event evt) {
502: if (observer != null) { // call debugger hook (if installed)
503: if (observer.passUp(evt) == false) { // false means don't pass up (=discard) event
504: return;
505: }
506: }
507: up_prot.receiveUpEvent(evt);
508: }
509:
510: /**
511: * Causes the event to be forwarded to the next layer down in the hierarchy.Typically called
512: * by the implementation of <code>Down</code> (when done).
513: */
514: public void passDown(Event evt) {
515: if (observer != null) { // call debugger hook (if installed)
516: if (observer.passDown(evt) == false) { // false means don't pass down (=discard) event
517: return;
518: }
519: }
520: down_prot.receiveDownEvent(evt);
521: }
522:
523: /**
524: * An event was received from the layer below. Usually the current layer will want to examine
525: * the event type and - depending on its type - perform some computation
526: * (e.g. removing headers from a MSG event type, or updating the internal membership list
527: * when receiving a VIEW_CHANGE event).
528: * Finally the event is either a) discarded, or b) an event is sent down
529: * the stack using <code>passDown()</code> or c) the event (or another event) is sent up
530: * the stack using <code>passUp()</code>.
531: */
532: public void up(Event evt) {
533: passUp(evt);
534: }
535:
536: /**
537: * An event is to be sent down the stack. The layer may want to examine its type and perform
538: * some action on it, depending on the event's type. If the event is a message MSG, then
539: * the layer may need to add a header to it (or do nothing at all) before sending it down
540: * the stack using <code>passDown()</code>. In case of a GET_ADDRESS event (which tries to
541: * retrieve the stack's address from one of the bottom layers), the layer may need to send
542: * a new response event back up the stack using <code>passUp()</code>.
543: */
544: public void down(Event evt) {
545: passDown(evt);
546: }
547:
548: /** These are special internal events that should not be handled by protocols
549: * @return boolean True: the event should be passed further down the stack. False: the event should
550: * be discarded (not passed down the stack)
551: */
552: protected boolean handleSpecialDownEvent(Event evt) {
553: switch (evt.getType()) {
554: case Event.START:
555: try {
556: start();
557:
558: // if we're the transport protocol, reply with a START_OK up the stack
559: if (down_prot == null) {
560: passUp(new Event(Event.START_OK, Boolean.TRUE));
561: return false; // don't pass down the stack
562: } else
563: return true; // pass down the stack
564: } catch (Exception e) {
565: passUp(new Event(Event.START_OK,
566: new Exception("exception caused by "
567: + getName() + ".start()", e)));
568: return false;
569: }
570: case Event.STOP:
571: stop();
572: if (down_prot == null) {
573: passUp(new Event(Event.STOP_OK, Boolean.TRUE));
574: return false; // don't pass down the stack
575: } else
576: return true; // pass down the stack
577: default:
578: return true; // pass down by default
579: }
580: }
581: }
|