001: // $Id: PullPushAdapter.java,v 1.22 2006/09/27 19:21:53 vlada Exp $
002:
003: package org.jgroups.blocks;
004:
005: import org.apache.commons.logging.Log;
006: import org.apache.commons.logging.LogFactory;
007: import org.jgroups.*;
008: import org.jgroups.util.Util;
009:
010: import java.io.IOException;
011: import java.io.ObjectInput;
012: import java.io.ObjectOutput;
013: import java.io.Serializable;
014: import java.util.ArrayList;
015: import java.util.HashMap;
016: import java.util.Iterator;
017: import java.util.List;
018:
019: /**
020: * Allows a client of {@link org.jgroups.Channel} to be notified when messages have been received
021: * instead of having to actively poll the channel for new messages. Typically used in the
022: * client role (receive()). As this class does not implement interface
023: * {@link org.jgroups.Transport}, but <b>uses</b> it for receiving messages, an underlying object
024: * has to be used to send messages (e.g. the channel on which an object of this class relies).<p>
025: * Multiple MembershipListeners can register with the PullPushAdapter; when a view is received, they
026: * will all be notified. There is one main message listener which sends and receives message. In addition,
027: * MessageListeners can register with a certain tag (identifier), and then send messages tagged with this
028: * identifier. When a message with such an identifier is received, the corresponding MessageListener will be
029: * looked up and the message dispatched to it. If no tag is found (default), the main MessageListener will
030: * receive the message.
031: * @author Bela Ban
032: * @version $Revision
033: */
034: public class PullPushAdapter implements Runnable, ChannelListener {
035: protected Transport transport = null;
036: protected MessageListener listener = null; // main message receiver
037: protected final List membership_listeners = new ArrayList();
038: protected Thread receiver_thread = null;
039: protected final HashMap listeners = new HashMap(); // keys=identifier (Serializable), values=MessageListeners
040: protected final Log log = LogFactory.getLog(getClass());
041: static final String PULL_HEADER = "PULL_HEADER";
042:
043: public PullPushAdapter(Transport transport) {
044: this .transport = transport;
045: start();
046: }
047:
048: public PullPushAdapter(Transport transport, MessageListener l) {
049: this .transport = transport;
050: setListener(l);
051: start();
052: }
053:
054: public PullPushAdapter(Transport transport, MembershipListener ml) {
055: this .transport = transport;
056: addMembershipListener(ml);
057: start();
058: }
059:
060: public PullPushAdapter(Transport transport, MessageListener l,
061: MembershipListener ml) {
062: this .transport = transport;
063: setListener(l);
064: addMembershipListener(ml);
065: start();
066: }
067:
068: public PullPushAdapter(Transport transport, MessageListener l,
069: MembershipListener ml, boolean start) {
070: this .transport = transport;
071: setListener(l);
072: addMembershipListener(ml);
073: if (start)
074: start();
075: }
076:
077: public Transport getTransport() {
078: return transport;
079: }
080:
081: public final void start() {
082: if (receiver_thread == null || !receiver_thread.isAlive()) {
083: receiver_thread = new Thread(this , "PullPushAdapterThread");
084: receiver_thread.setDaemon(true);
085: receiver_thread.start();
086: }
087: if (transport instanceof JChannel)
088: ((JChannel) transport).addChannelListener(this );
089: }
090:
091: public void stop() {
092: Thread tmp = null;
093: if (receiver_thread != null && receiver_thread.isAlive()) {
094: tmp = receiver_thread;
095: receiver_thread = null;
096: tmp.interrupt();
097: try {
098: tmp.join(1000);
099: } catch (Exception ex) {
100: }
101: }
102: receiver_thread = null;
103: }
104:
105: /**
106: * Sends a message to the group - listeners to this identifier will receive the messages.
107: * @param identifier the key that the proper listeners are listenting on
108: * @param msg the Message to be sent
109: * @see #registerListener
110: */
111: public void send(Serializable identifier, Message msg)
112: throws Exception {
113: if (msg == null) {
114: if (log.isErrorEnabled())
115: log.error("msg is null");
116: return;
117: }
118: if (identifier == null)
119: transport.send(msg);
120: else {
121: msg.putHeader(PULL_HEADER, new PullHeader(identifier));
122: transport.send(msg);
123: }
124: }
125:
126: /**
127: * Sends a message with no identifier; listener member will get this message on the other group members.
128: * @param msg the Message to be sent
129: * @throws Exception
130: */
131: public void send(Message msg) throws Exception {
132: send(null, msg);
133: }
134:
135: public final void setListener(MessageListener l) {
136: listener = l;
137: }
138:
139: /**
140: * Sets a listener to messages with a given identifier.
141: * Messages sent with this identifier in their headers will be routed to this listener.
142: * <b>Note: there can be only one listener for one identifier;
143: * if you want to register a different listener to an already registered identifier, then unregister first.</b>
144: * @param identifier - messages sent on the group with this object will be received by this listener
145: * @param l - the listener that will get the message
146: */
147: public void registerListener(Serializable identifier,
148: MessageListener l) {
149: if (l == null || identifier == null) {
150: if (log.isErrorEnabled())
151: log.error("message listener or identifier is null");
152: return;
153: }
154: if (listeners.containsKey(identifier)) {
155: if (log.isErrorEnabled())
156: log
157: .error("listener with identifier="
158: + identifier
159: + " already exists, choose a different identifier or unregister current listener");
160: // we do not want to overwrite the listener
161: return;
162: }
163: listeners.put(identifier, l);
164: }
165:
166: /**
167: * Removes a message listener to a given identifier from the message listeners map.
168: * @param identifier - the key to whom we do not want to listen any more
169: */
170: public void unregisterListener(Serializable identifier) {
171: listeners.remove(identifier);
172: }
173:
174: /** @deprecated Use {@link #addMembershipListener} */
175: public void setMembershipListener(MembershipListener ml) {
176: addMembershipListener(ml);
177: }
178:
179: public final void addMembershipListener(MembershipListener l) {
180: if (l != null && !membership_listeners.contains(l))
181: membership_listeners.add(l);
182: }
183:
184: public void removeMembershipListener(MembershipListener l) {
185: if (l != null && membership_listeners.contains(l))
186: membership_listeners.remove(l);
187: }
188:
189: /**
190: * Reentrant run(): message reception is serialized, then the listener is notified of the
191: * message reception
192: */
193: public void run() {
194: Object obj;
195:
196: while (receiver_thread != null
197: && Thread.currentThread().equals(receiver_thread)) {
198: try {
199: obj = transport.receive(0);
200: if (obj == null)
201: continue;
202:
203: if (obj instanceof Message) {
204: handleMessage((Message) obj);
205: } else if (obj instanceof GetStateEvent) {
206: byte[] retval = null;
207: GetStateEvent evt = (GetStateEvent) obj;
208: String state_id = evt.getStateId();
209: if (listener != null) {
210: try {
211: if (listener instanceof ExtendedMessageListener
212: && state_id != null) {
213: retval = ((ExtendedMessageListener) listener)
214: .getState(state_id);
215: } else {
216: retval = listener.getState();
217: }
218: } catch (Throwable t) {
219: log
220: .error(
221: "getState() from application failed, will return empty state",
222: t);
223: }
224: } else {
225: log
226: .warn("no listener registered, returning empty state");
227: }
228:
229: if (transport instanceof Channel) {
230: ((Channel) transport).returnState(retval,
231: state_id);
232: } else {
233: if (log.isErrorEnabled())
234: log
235: .error("underlying transport is not a Channel, but a "
236: + transport.getClass()
237: .getName()
238: + ": cannot return state using returnState()");
239: }
240: } else if (obj instanceof SetStateEvent) {
241: SetStateEvent evt = (SetStateEvent) obj;
242: String state_id = evt.getStateId();
243: if (listener != null) {
244: try {
245: if (listener instanceof ExtendedMessageListener
246: && state_id != null) {
247: ((ExtendedMessageListener) listener)
248: .setState(state_id, evt
249: .getArg());
250: } else {
251: listener.setState(evt.getArg());
252: }
253: } catch (ClassCastException cast_ex) {
254: if (log.isErrorEnabled())
255: log
256: .error("received SetStateEvent, but argument "
257: + ((SetStateEvent) obj)
258: .getArg()
259: + " is not serializable ! Discarding message.");
260: }
261: }
262: } else if (obj instanceof StreamingGetStateEvent) {
263: StreamingGetStateEvent evt = (StreamingGetStateEvent) obj;
264: if (listener instanceof ExtendedMessageListener) {
265: if (evt.getStateId() == null) {
266: ((ExtendedMessageListener) listener)
267: .getState(evt.getArg());
268: } else {
269: ((ExtendedMessageListener) listener)
270: .getState(evt.getStateId(), evt
271: .getArg());
272: }
273: }
274: } else if (obj instanceof StreamingSetStateEvent) {
275: StreamingSetStateEvent evt = (StreamingSetStateEvent) obj;
276: if (listener instanceof ExtendedMessageListener) {
277: if (evt.getStateId() == null) {
278: ((ExtendedMessageListener) listener)
279: .setState(evt.getArg());
280: } else {
281: ((ExtendedMessageListener) listener)
282: .setState(evt.getStateId(), evt
283: .getArg());
284: }
285: }
286: } else if (obj instanceof View) {
287: notifyViewChange((View) obj);
288: } else if (obj instanceof SuspectEvent) {
289: notifySuspect((Address) ((SuspectEvent) obj)
290: .getMember());
291: } else if (obj instanceof BlockEvent) {
292: notifyBlock();
293: if (transport instanceof Channel) {
294: ((Channel) transport).blockOk();
295: }
296: } else if (obj instanceof UnblockEvent) {
297: notifyUnblock();
298: }
299: } catch (ChannelNotConnectedException conn) {
300: Address local_addr = ((Channel) transport)
301: .getLocalAddress();
302: if (log.isTraceEnabled())
303: log.trace('['
304: + (local_addr == null ? "<null>"
305: : local_addr.toString())
306: + "] channel not connected, exception is "
307: + conn);
308: Util.sleep(1000);
309: receiver_thread = null;
310: break;
311: } catch (ChannelClosedException closed_ex) {
312: Address local_addr = ((Channel) transport)
313: .getLocalAddress();
314: if (log.isTraceEnabled())
315: log.trace('['
316: + (local_addr == null ? "<null>"
317: : local_addr.toString())
318: + "] channel closed, exception is "
319: + closed_ex);
320: // Util.sleep(1000);
321: receiver_thread = null;
322: break;
323: } catch (Throwable e) {
324: }
325: }
326: }
327:
328: /**
329: * Check whether the message has an identifier. If yes, lookup the MessageListener associated with the
330: * given identifier in the hashtable and dispatch to it. Otherwise just use the main (default) message
331: * listener
332: */
333: protected void handleMessage(Message msg) {
334: PullHeader hdr = (PullHeader) msg.getHeader(PULL_HEADER);
335: Serializable identifier;
336: MessageListener l;
337:
338: if (hdr != null && (identifier = hdr.getIdentifier()) != null) {
339: l = (MessageListener) listeners.get(identifier);
340: if (l == null) {
341: if (log.isErrorEnabled())
342: log
343: .error("received a messages tagged with identifier="
344: + identifier
345: + ", but there is no registration for that identifier. Will drop message");
346: } else
347: l.receive(msg);
348: } else {
349: if (listener != null)
350: listener.receive(msg);
351: }
352: }
353:
354: protected void notifyViewChange(View v) {
355: MembershipListener l;
356:
357: if (v == null)
358: return;
359: for (Iterator it = membership_listeners.iterator(); it
360: .hasNext();) {
361: l = (MembershipListener) it.next();
362: try {
363: l.viewAccepted(v);
364: } catch (Throwable ex) {
365: if (log.isErrorEnabled())
366: log.error("exception notifying " + l + ": " + ex);
367: }
368: }
369: }
370:
371: protected void notifySuspect(Address suspected_mbr) {
372: MembershipListener l;
373:
374: if (suspected_mbr == null)
375: return;
376: for (Iterator it = membership_listeners.iterator(); it
377: .hasNext();) {
378: l = (MembershipListener) it.next();
379: try {
380: l.suspect(suspected_mbr);
381: } catch (Throwable ex) {
382: if (log.isErrorEnabled())
383: log.error("exception notifying " + l + ": " + ex);
384: }
385: }
386: }
387:
388: protected void notifyBlock() {
389: MembershipListener l;
390:
391: for (Iterator it = membership_listeners.iterator(); it
392: .hasNext();) {
393: l = (MembershipListener) it.next();
394: try {
395: l.block();
396: } catch (Throwable ex) {
397: if (log.isErrorEnabled())
398: log.error("exception notifying " + l + ": " + ex);
399: }
400: }
401: }
402:
403: protected void notifyUnblock() {
404: MembershipListener l;
405:
406: for (Iterator it = membership_listeners.iterator(); it
407: .hasNext();) {
408: l = (MembershipListener) it.next();
409: if (l instanceof ExtendedMembershipListener) {
410: try {
411: ((ExtendedMembershipListener) l).unblock();
412: } catch (Throwable ex) {
413: if (log.isErrorEnabled())
414: log.error("exception notifying " + l + ": "
415: + ex);
416: }
417: }
418: }
419: }
420:
421: public void channelConnected(Channel channel) {
422: if (log.isTraceEnabled())
423: log.trace("channel is connected");
424: }
425:
426: public void channelDisconnected(Channel channel) {
427: if (log.isTraceEnabled())
428: log.trace("channel is disconnected");
429: }
430:
431: public void channelClosed(Channel channel) {
432: }
433:
434: public void channelShunned() {
435: if (log.isTraceEnabled())
436: log.trace("channel is shunned");
437: }
438:
439: public void channelReconnected(Address addr) {
440: start();
441: }
442:
443: public static final class PullHeader extends Header {
444: Serializable identifier = null;
445:
446: public PullHeader() {
447: ; // used by externalization
448: }
449:
450: public PullHeader(Serializable identifier) {
451: this .identifier = identifier;
452: }
453:
454: public Serializable getIdentifier() {
455: return identifier;
456: }
457:
458: public long size() {
459: if (identifier == null)
460: return 12;
461: else
462: return 64;
463: }
464:
465: public String toString() {
466: return "PullHeader";
467: }
468:
469: public void writeExternal(ObjectOutput out) throws IOException {
470: out.writeObject(identifier);
471: }
472:
473: public void readExternal(ObjectInput in) throws IOException,
474: ClassNotFoundException {
475: identifier = (Serializable) in.readObject();
476: }
477: }
478:
479: /**
480: * @return Returns the listener.
481: */
482: public MessageListener getListener() {
483: return listener;
484: }
485: }
|