001: // $Id: NotificationBus.java,v 1.11 2006/05/25 12:10:18 belaban Exp $
002:
003: package org.jgroups.blocks;
004:
005: import org.apache.commons.logging.Log;
006: import org.apache.commons.logging.LogFactory;
007: import org.jgroups.*;
008: import org.jgroups.util.Promise;
009: import org.jgroups.util.Util;
010:
011: import java.io.Serializable;
012: import java.util.Vector;
013:
014: /**
015: * This class provides notification sending and handling capability.
016: * Producers can send notifications to all registered consumers.
017: * Provides hooks to implement shared group state, which allows an
018: * application programmer to maintain a local cache which is replicated
019: * by all instances. NotificationBus sits on
020: * top of a channel, however it creates its channel itself, so the
021: * application programmers do not have to provide their own channel.
022: *
023: * @author Bela Ban
024: */
025: public class NotificationBus implements Receiver {
026: final Vector members = new Vector();
027: Channel channel = null;
028: Address local_addr = null;
029: Consumer consumer = null; // only a single consumer allowed
030: String bus_name = "notification_bus";
031: final Promise get_cache_promise = new Promise();
032: final Object cache_mutex = new Object();
033:
034: protected final Log log = LogFactory.getLog(getClass());
035:
036: String props = null;
037:
038: public interface Consumer {
039: void handleNotification(Serializable n);
040:
041: /** Called on the coordinator to obtains its cache */
042: Serializable getCache();
043:
044: void memberJoined(Address mbr);
045:
046: void memberLeft(Address mbr);
047: }
048:
049: public NotificationBus() throws Exception {
050: this ((Channel) null, null);
051: }
052:
053: public NotificationBus(String bus_name) throws Exception {
054: this (bus_name, null);
055: }
056:
057: public NotificationBus(String bus_name, String properties)
058: throws Exception {
059: if (bus_name != null)
060: this .bus_name = bus_name;
061: if (properties != null)
062: props = properties;
063: channel = new JChannel(props);
064: channel.setReceiver(this );
065: }
066:
067: public NotificationBus(Channel channel, String bus_name)
068: throws Exception {
069: if (bus_name != null)
070: this .bus_name = bus_name;
071: this .channel = channel;
072: channel.setReceiver(this );
073: }
074:
075: public void setConsumer(Consumer c) {
076: consumer = c;
077: }
078:
079: public Address getLocalAddress() {
080: if (local_addr != null)
081: return local_addr;
082: if (channel != null)
083: local_addr = channel.getLocalAddress();
084: return local_addr;
085: }
086:
087: /**
088: * Returns a reference to the real membership: don't modify.
089: * If you need to modify, make a copy first !
090: * @return Vector of Address objects
091: */
092: public Vector getMembership() {
093: return members;
094: }
095:
096: /**
097: * Answers the Channel.
098: * Used to operate on the underlying channel directly, e.g. perform operations that are not
099: * provided using only NotificationBus. Should be used sparingly.
100: * @return underlying Channel
101: */
102: public Channel getChannel() {
103: return channel;
104: }
105:
106: public boolean isCoordinator() {
107: Object first_mbr = null;
108:
109: synchronized (members) {
110: first_mbr = members.size() > 0 ? members.elementAt(0)
111: : null;
112: if (first_mbr == null)
113: return true;
114: }
115: if (getLocalAddress() != null)
116: return getLocalAddress().equals(first_mbr);
117: return false;
118: }
119:
120: public void start() throws Exception {
121: channel.connect(bus_name);
122: }
123:
124: public void stop() {
125: if (channel != null) {
126: channel.close(); // disconnects from channel and closes it
127: channel = null;
128: }
129: }
130:
131: /** Pack the argument in a Info, serialize that one into the message buffer and send the message */
132: public void sendNotification(Serializable n) {
133: sendNotification(null, n);
134: }
135:
136: /** Pack the argument in a Info, serialize that one into the message buffer and send the message */
137: public void sendNotification(Address dest, Serializable n) {
138: Message msg = null;
139: byte[] data = null;
140: Info info;
141:
142: try {
143: if (n == null)
144: return;
145: info = new Info(Info.NOTIFICATION, n);
146: data = Util.objectToByteBuffer(info);
147: msg = new Message(dest, null, data);
148: if (channel == null) {
149: if (log.isErrorEnabled())
150: log
151: .error("channel is null. Won't send notification");
152: return;
153: }
154: channel.send(msg);
155: } catch (Throwable ex) {
156: if (log.isErrorEnabled())
157: log.error("error sending notification", ex);
158: }
159: }
160:
161: /**
162: Determines the coordinator and asks it for its cache. If there is no coordinator (because we are first member),
163: null will be returned. Used only internally by NotificationBus.
164: @param timeout Max number of msecs until the call returns
165: @param max_tries Max number of attempts to fetch the cache from the coordinator
166: */
167: public Serializable getCacheFromCoordinator(long timeout,
168: int max_tries) {
169: return getCacheFromMember(null, timeout, max_tries);
170: }
171:
172: /**
173: Determines the coordinator and asks it for its cache. If there is no coordinator (because we are first member),
174: null will be returned. Used only internally by NotificationBus.
175: @param mbr The address of the member from which to fetch the state. If null, the current coordinator
176: will be asked for the state
177: @param timeout Max number of msecs until the call returns - if timeout elapses
178: null will be returned
179: @param max_tries Max number of attempts to fetch the cache from the coordinator (will be set to 1 if < 1)
180: */
181: public Serializable getCacheFromMember(Address mbr, long timeout,
182: int max_tries) {
183: Serializable cache = null;
184: int num_tries = 0;
185: Info info = new Info(Info.GET_CACHE_REQ);
186: Message msg;
187: Address dst = mbr; // member from which to fetch the cache
188:
189: long start, stop; // +++ remove
190:
191: if (max_tries < 1)
192: max_tries = 1;
193:
194: get_cache_promise.reset();
195: while (num_tries <= max_tries) {
196: if (mbr == null) { // mbr == null means get cache from coordinator
197: dst = determineCoordinator();
198: if (dst == null || dst.equals(getLocalAddress())) { // we are the first member --> empty cache
199: if (log.isInfoEnabled())
200: log
201: .info("["
202: + getLocalAddress()
203: + "] no coordinator found --> first member (cache is empty)");
204: return null;
205: }
206: }
207:
208: // +++ remove
209: if (log.isInfoEnabled())
210: log.info("[" + getLocalAddress() + "] dst=" + dst
211: + ", timeout=" + timeout + ", max_tries="
212: + max_tries + ", num_tries=" + num_tries);
213:
214: info = new Info(Info.GET_CACHE_REQ);
215: msg = new Message(dst, null, info);
216: channel.down(new Event(Event.MSG, msg));
217:
218: start = System.currentTimeMillis();
219: cache = (Serializable) get_cache_promise.getResult(timeout);
220: stop = System.currentTimeMillis();
221: if (cache != null) {
222: if (log.isInfoEnabled())
223: log.info("got cache from " + dst
224: + ": cache is valid (waited "
225: + (stop - start)
226: + " msecs on get_cache_promise)");
227: return cache;
228: } else {
229: if (log.isErrorEnabled())
230: log.error("received null cache; retrying (waited "
231: + (stop - start)
232: + " msecs on get_cache_promise)");
233: }
234:
235: Util.sleep(500);
236: ++num_tries;
237: }
238: if (cache == null)
239: if (log.isErrorEnabled())
240: log.error("[" + getLocalAddress()
241: + "] cache is null (num_tries=" + num_tries
242: + ')');
243: return cache;
244: }
245:
246: /**
247: Don't multicast this to all members, just apply it to local consumers.
248: */
249: public void notifyConsumer(Serializable n) {
250: if (consumer != null && n != null)
251: consumer.handleNotification(n);
252: }
253:
254: /* -------------------------------- Interface MessageListener -------------------------------- */
255: public void receive(Message msg) {
256: Info info = null;
257: Object obj;
258:
259: if (msg == null || msg.getLength() == 0)
260: return;
261: try {
262: obj = msg.getObject();
263: if (!(obj instanceof Info)) {
264:
265: if (log.isErrorEnabled())
266: log.error("expected an instance of Info (received "
267: + obj.getClass().getName() + ')');
268: return;
269: }
270: info = (Info) obj;
271: switch (info.type) {
272: case Info.NOTIFICATION:
273: notifyConsumer(info.data);
274: break;
275:
276: case Info.GET_CACHE_REQ:
277: handleCacheRequest(msg.getSrc());
278: break;
279:
280: case Info.GET_CACHE_RSP:
281: // +++ remove
282: if (log.isDebugEnabled())
283: log
284: .debug("[GET_CACHE_RSP] cache was received from "
285: + msg.getSrc());
286: get_cache_promise.setResult(info.data);
287: break;
288:
289: default:
290: if (log.isErrorEnabled())
291: log.error("type " + info.type + " unknown");
292: break;
293: }
294: } catch (Throwable ex) {
295:
296: if (log.isErrorEnabled())
297: log.error("exception=" + ex);
298: }
299: }
300:
301: public byte[] getState() {
302: return null;
303: }
304:
305: public void setState(byte[] state) {
306: }
307:
308: /* ----------------------------- End of Interface MessageListener ---------------------------- */
309:
310: /* ------------------------------- Interface MembershipListener ------------------------------ */
311:
312: public synchronized void viewAccepted(View new_view) {
313: Vector joined_mbrs, left_mbrs, tmp;
314: Object tmp_mbr;
315:
316: if (new_view == null)
317: return;
318: tmp = new_view.getMembers();
319:
320: synchronized (members) {
321: // get new members
322: joined_mbrs = new Vector();
323: for (int i = 0; i < tmp.size(); i++) {
324: tmp_mbr = tmp.elementAt(i);
325: if (!members.contains(tmp_mbr))
326: joined_mbrs.addElement(tmp_mbr);
327: }
328:
329: // get members that left
330: left_mbrs = new Vector();
331: for (int i = 0; i < members.size(); i++) {
332: tmp_mbr = members.elementAt(i);
333: if (!tmp.contains(tmp_mbr))
334: left_mbrs.addElement(tmp_mbr);
335: }
336:
337: // adjust our own membership
338: members.removeAllElements();
339: members.addAll(tmp);
340: }
341:
342: if (consumer != null) {
343: if (joined_mbrs.size() > 0)
344: for (int i = 0; i < joined_mbrs.size(); i++)
345: consumer.memberJoined((Address) joined_mbrs
346: .elementAt(i));
347: if (left_mbrs.size() > 0)
348: for (int i = 0; i < left_mbrs.size(); i++)
349: consumer.memberLeft((Address) left_mbrs
350: .elementAt(i));
351: }
352: }
353:
354: public void suspect(Address suspected_mbr) {
355: }
356:
357: public void block() {
358: }
359:
360: /* ----------------------------- End of Interface MembershipListener ------------------------- */
361:
362: /* ------------------------------------- Private Methods ------------------------------------- */
363:
364: Address determineCoordinator() {
365: Vector v = channel != null ? channel.getView().getMembers()
366: : null;
367: return v != null ? (Address) v.elementAt(0) : null;
368: }
369:
370: void handleCacheRequest(Address sender) {
371: Serializable cache = null;
372: Message msg;
373: Info info;
374:
375: if (sender == null) {
376: // +++ remove
377: //
378: if (log.isErrorEnabled())
379: log.error("sender is null");
380: return;
381: }
382:
383: synchronized (cache_mutex) {
384: cache = getCache(); // get the cache from the consumer
385: info = new Info(Info.GET_CACHE_RSP, cache);
386: msg = new Message(sender, null, info);
387: if (log.isInfoEnabled())
388: log.info("[" + getLocalAddress()
389: + "] returning cache to " + sender);
390: channel.down(new Event(Event.MSG, msg));
391: }
392: }
393:
394: public Serializable getCache() {
395: return consumer != null ? consumer.getCache() : null;
396: }
397:
398: /* --------------------------------- End of Private Methods ---------------------------------- */
399:
400: private static class Info implements Serializable {
401: public final static int NOTIFICATION = 1;
402: public final static int GET_CACHE_REQ = 2;
403: public final static int GET_CACHE_RSP = 3;
404:
405: int type = 0;
406: Serializable data = null; // if type == NOTIFICATION data is notification, if type == GET_CACHE_RSP, data is cache
407: private static final long serialVersionUID = -7198723001828406107L;
408:
409: public Info(int type) {
410: this .type = type;
411: }
412:
413: public Info(int type, Serializable data) {
414: this .type = type;
415: this .data = data;
416: }
417:
418: public String toString() {
419: StringBuffer sb = new StringBuffer();
420: sb.append("type= ");
421: if (type == NOTIFICATION)
422: sb.append("NOTIFICATION");
423: else if (type == GET_CACHE_REQ)
424: sb.append("GET_CACHE_REQ");
425: else if (type == GET_CACHE_RSP)
426: sb.append("GET_CACHE_RSP");
427: else
428: sb.append("<unknown>");
429: if (data != null) {
430: if (type == NOTIFICATION)
431: sb.append(", notification=" + data);
432: else if (type == GET_CACHE_RSP)
433: sb.append(", cache=" + data);
434: }
435: return sb.toString();
436: }
437: }
438:
439: }
|