Source Code Cross Referenced for NotificationBus.java in  » Net » JGroups-2.4.1-sp3 » org » jgroups » blocks » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Net » JGroups 2.4.1 sp3 » org.jgroups.blocks 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        // $Id: NotificationBus.java,v 1.11 2006/05/25 12:10:18 belaban Exp $
002:
003:        package org.jgroups.blocks;
004:
005:        import org.apache.commons.logging.Log;
006:        import org.apache.commons.logging.LogFactory;
007:        import org.jgroups.*;
008:        import org.jgroups.util.Promise;
009:        import org.jgroups.util.Util;
010:
011:        import java.io.Serializable;
012:        import java.util.Vector;
013:
014:        /**
015:         * This class provides notification sending and handling capability. 
016:         * Producers can send notifications to all registered consumers.
017:         * Provides hooks to implement shared group state, which allows an
018:         * application programmer to maintain a local cache which is replicated 
019:         * by all instances. NotificationBus sits on 
020:         * top of a channel, however it creates its channel itself, so the 
021:         * application programmers do not have to provide their own channel. 
022:         *
023:         * @author Bela Ban
024:         */
025:        public class NotificationBus implements  Receiver {
026:            final Vector members = new Vector();
027:            Channel channel = null;
028:            Address local_addr = null;
029:            Consumer consumer = null; // only a single consumer allowed
030:            String bus_name = "notification_bus";
031:            final Promise get_cache_promise = new Promise();
032:            final Object cache_mutex = new Object();
033:
034:            protected final Log log = LogFactory.getLog(getClass());
035:
036:            String props = null;
037:
038:            public interface Consumer {
039:                void handleNotification(Serializable n);
040:
041:                /** Called on the coordinator to obtains its cache */
042:                Serializable getCache();
043:
044:                void memberJoined(Address mbr);
045:
046:                void memberLeft(Address mbr);
047:            }
048:
049:            public NotificationBus() throws Exception {
050:                this ((Channel) null, null);
051:            }
052:
053:            public NotificationBus(String bus_name) throws Exception {
054:                this (bus_name, null);
055:            }
056:
057:            public NotificationBus(String bus_name, String properties)
058:                    throws Exception {
059:                if (bus_name != null)
060:                    this .bus_name = bus_name;
061:                if (properties != null)
062:                    props = properties;
063:                channel = new JChannel(props);
064:                channel.setReceiver(this );
065:            }
066:
067:            public NotificationBus(Channel channel, String bus_name)
068:                    throws Exception {
069:                if (bus_name != null)
070:                    this .bus_name = bus_name;
071:                this .channel = channel;
072:                channel.setReceiver(this );
073:            }
074:
075:            public void setConsumer(Consumer c) {
076:                consumer = c;
077:            }
078:
079:            public Address getLocalAddress() {
080:                if (local_addr != null)
081:                    return local_addr;
082:                if (channel != null)
083:                    local_addr = channel.getLocalAddress();
084:                return local_addr;
085:            }
086:
087:            /**
088:             * Returns a reference to the real membership: don't modify. 
089:             * If you need to modify, make a copy first !
090:             * @return Vector of Address objects
091:             */
092:            public Vector getMembership() {
093:                return members;
094:            }
095:
096:            /** 
097:             * Answers the Channel.
098:             * Used to operate on the underlying channel directly, e.g. perform operations that are not
099:             * provided using only NotificationBus. Should be used sparingly.
100:             * @return underlying Channel
101:             */
102:            public Channel getChannel() {
103:                return channel;
104:            }
105:
106:            public boolean isCoordinator() {
107:                Object first_mbr = null;
108:
109:                synchronized (members) {
110:                    first_mbr = members.size() > 0 ? members.elementAt(0)
111:                            : null;
112:                    if (first_mbr == null)
113:                        return true;
114:                }
115:                if (getLocalAddress() != null)
116:                    return getLocalAddress().equals(first_mbr);
117:                return false;
118:            }
119:
120:            public void start() throws Exception {
121:                channel.connect(bus_name);
122:            }
123:
124:            public void stop() {
125:                if (channel != null) {
126:                    channel.close(); // disconnects from channel and closes it
127:                    channel = null;
128:                }
129:            }
130:
131:            /** Pack the argument in a Info, serialize that one into the message buffer and send the message */
132:            public void sendNotification(Serializable n) {
133:                sendNotification(null, n);
134:            }
135:
136:            /** Pack the argument in a Info, serialize that one into the message buffer and send the message */
137:            public void sendNotification(Address dest, Serializable n) {
138:                Message msg = null;
139:                byte[] data = null;
140:                Info info;
141:
142:                try {
143:                    if (n == null)
144:                        return;
145:                    info = new Info(Info.NOTIFICATION, n);
146:                    data = Util.objectToByteBuffer(info);
147:                    msg = new Message(dest, null, data);
148:                    if (channel == null) {
149:                        if (log.isErrorEnabled())
150:                            log
151:                                    .error("channel is null. Won't send notification");
152:                        return;
153:                    }
154:                    channel.send(msg);
155:                } catch (Throwable ex) {
156:                    if (log.isErrorEnabled())
157:                        log.error("error sending notification", ex);
158:                }
159:            }
160:
161:            /**
162:             Determines the coordinator and asks it for its cache. If there is no coordinator (because we are first member),
163:             null will be returned. Used only internally by NotificationBus.
164:             @param timeout Max number of msecs until the call returns
165:             @param max_tries Max number of attempts to fetch the cache from the coordinator
166:             */
167:            public Serializable getCacheFromCoordinator(long timeout,
168:                    int max_tries) {
169:                return getCacheFromMember(null, timeout, max_tries);
170:            }
171:
172:            /**
173:             Determines the coordinator and asks it for its cache. If there is no coordinator (because we are first member),
174:             null will be returned. Used only internally by NotificationBus.
175:             @param mbr The address of the member from which to fetch the state. If null, the current coordinator
176:             will be asked for the state
177:             @param timeout Max number of msecs until the call returns - if timeout elapses
178:             null will be returned
179:             @param max_tries Max number of attempts to fetch the cache from the coordinator (will be set to 1 if < 1)
180:             */
181:            public Serializable getCacheFromMember(Address mbr, long timeout,
182:                    int max_tries) {
183:                Serializable cache = null;
184:                int num_tries = 0;
185:                Info info = new Info(Info.GET_CACHE_REQ);
186:                Message msg;
187:                Address dst = mbr; // member from which to fetch the cache
188:
189:                long start, stop; // +++ remove
190:
191:                if (max_tries < 1)
192:                    max_tries = 1;
193:
194:                get_cache_promise.reset();
195:                while (num_tries <= max_tries) {
196:                    if (mbr == null) { // mbr == null means get cache from coordinator
197:                        dst = determineCoordinator();
198:                        if (dst == null || dst.equals(getLocalAddress())) { // we are the first member --> empty cache
199:                            if (log.isInfoEnabled())
200:                                log
201:                                        .info("["
202:                                                + getLocalAddress()
203:                                                + "] no coordinator found --> first member (cache is empty)");
204:                            return null;
205:                        }
206:                    }
207:
208:                    // +++ remove
209:                    if (log.isInfoEnabled())
210:                        log.info("[" + getLocalAddress() + "] dst=" + dst
211:                                + ", timeout=" + timeout + ", max_tries="
212:                                + max_tries + ", num_tries=" + num_tries);
213:
214:                    info = new Info(Info.GET_CACHE_REQ);
215:                    msg = new Message(dst, null, info);
216:                    channel.down(new Event(Event.MSG, msg));
217:
218:                    start = System.currentTimeMillis();
219:                    cache = (Serializable) get_cache_promise.getResult(timeout);
220:                    stop = System.currentTimeMillis();
221:                    if (cache != null) {
222:                        if (log.isInfoEnabled())
223:                            log.info("got cache from " + dst
224:                                    + ": cache is valid (waited "
225:                                    + (stop - start)
226:                                    + " msecs on get_cache_promise)");
227:                        return cache;
228:                    } else {
229:                        if (log.isErrorEnabled())
230:                            log.error("received null cache; retrying (waited "
231:                                    + (stop - start)
232:                                    + " msecs on get_cache_promise)");
233:                    }
234:
235:                    Util.sleep(500);
236:                    ++num_tries;
237:                }
238:                if (cache == null)
239:                    if (log.isErrorEnabled())
240:                        log.error("[" + getLocalAddress()
241:                                + "] cache is null (num_tries=" + num_tries
242:                                + ')');
243:                return cache;
244:            }
245:
246:            /**
247:             Don't multicast this to all members, just apply it to local consumers.
248:             */
249:            public void notifyConsumer(Serializable n) {
250:                if (consumer != null && n != null)
251:                    consumer.handleNotification(n);
252:            }
253:
254:            /* -------------------------------- Interface MessageListener -------------------------------- */
255:            public void receive(Message msg) {
256:                Info info = null;
257:                Object obj;
258:
259:                if (msg == null || msg.getLength() == 0)
260:                    return;
261:                try {
262:                    obj = msg.getObject();
263:                    if (!(obj instanceof  Info)) {
264:
265:                        if (log.isErrorEnabled())
266:                            log.error("expected an instance of Info (received "
267:                                    + obj.getClass().getName() + ')');
268:                        return;
269:                    }
270:                    info = (Info) obj;
271:                    switch (info.type) {
272:                    case Info.NOTIFICATION:
273:                        notifyConsumer(info.data);
274:                        break;
275:
276:                    case Info.GET_CACHE_REQ:
277:                        handleCacheRequest(msg.getSrc());
278:                        break;
279:
280:                    case Info.GET_CACHE_RSP:
281:                        // +++ remove
282:                        if (log.isDebugEnabled())
283:                            log
284:                                    .debug("[GET_CACHE_RSP] cache was received from "
285:                                            + msg.getSrc());
286:                        get_cache_promise.setResult(info.data);
287:                        break;
288:
289:                    default:
290:                        if (log.isErrorEnabled())
291:                            log.error("type " + info.type + " unknown");
292:                        break;
293:                    }
294:                } catch (Throwable ex) {
295:
296:                    if (log.isErrorEnabled())
297:                        log.error("exception=" + ex);
298:                }
299:            }
300:
301:            public byte[] getState() {
302:                return null;
303:            }
304:
305:            public void setState(byte[] state) {
306:            }
307:
308:            /* ----------------------------- End of Interface MessageListener ---------------------------- */
309:
310:            /* ------------------------------- Interface MembershipListener ------------------------------ */
311:
312:            public synchronized void viewAccepted(View new_view) {
313:                Vector joined_mbrs, left_mbrs, tmp;
314:                Object tmp_mbr;
315:
316:                if (new_view == null)
317:                    return;
318:                tmp = new_view.getMembers();
319:
320:                synchronized (members) {
321:                    // get new members
322:                    joined_mbrs = new Vector();
323:                    for (int i = 0; i < tmp.size(); i++) {
324:                        tmp_mbr = tmp.elementAt(i);
325:                        if (!members.contains(tmp_mbr))
326:                            joined_mbrs.addElement(tmp_mbr);
327:                    }
328:
329:                    // get members that left
330:                    left_mbrs = new Vector();
331:                    for (int i = 0; i < members.size(); i++) {
332:                        tmp_mbr = members.elementAt(i);
333:                        if (!tmp.contains(tmp_mbr))
334:                            left_mbrs.addElement(tmp_mbr);
335:                    }
336:
337:                    // adjust our own membership
338:                    members.removeAllElements();
339:                    members.addAll(tmp);
340:                }
341:
342:                if (consumer != null) {
343:                    if (joined_mbrs.size() > 0)
344:                        for (int i = 0; i < joined_mbrs.size(); i++)
345:                            consumer.memberJoined((Address) joined_mbrs
346:                                    .elementAt(i));
347:                    if (left_mbrs.size() > 0)
348:                        for (int i = 0; i < left_mbrs.size(); i++)
349:                            consumer.memberLeft((Address) left_mbrs
350:                                    .elementAt(i));
351:                }
352:            }
353:
354:            public void suspect(Address suspected_mbr) {
355:            }
356:
357:            public void block() {
358:            }
359:
360:            /* ----------------------------- End of Interface MembershipListener ------------------------- */
361:
362:            /* ------------------------------------- Private Methods ------------------------------------- */
363:
364:            Address determineCoordinator() {
365:                Vector v = channel != null ? channel.getView().getMembers()
366:                        : null;
367:                return v != null ? (Address) v.elementAt(0) : null;
368:            }
369:
370:            void handleCacheRequest(Address sender) {
371:                Serializable cache = null;
372:                Message msg;
373:                Info info;
374:
375:                if (sender == null) {
376:                    // +++ remove
377:                    //
378:                    if (log.isErrorEnabled())
379:                        log.error("sender is null");
380:                    return;
381:                }
382:
383:                synchronized (cache_mutex) {
384:                    cache = getCache(); // get the cache from the consumer
385:                    info = new Info(Info.GET_CACHE_RSP, cache);
386:                    msg = new Message(sender, null, info);
387:                    if (log.isInfoEnabled())
388:                        log.info("[" + getLocalAddress()
389:                                + "] returning cache to " + sender);
390:                    channel.down(new Event(Event.MSG, msg));
391:                }
392:            }
393:
394:            public Serializable getCache() {
395:                return consumer != null ? consumer.getCache() : null;
396:            }
397:
398:            /* --------------------------------- End of Private Methods ---------------------------------- */
399:
400:            private static class Info implements  Serializable {
401:                public final static int NOTIFICATION = 1;
402:                public final static int GET_CACHE_REQ = 2;
403:                public final static int GET_CACHE_RSP = 3;
404:
405:                int type = 0;
406:                Serializable data = null; // if type == NOTIFICATION data is notification, if type == GET_CACHE_RSP, data is cache
407:                private static final long serialVersionUID = -7198723001828406107L;
408:
409:                public Info(int type) {
410:                    this .type = type;
411:                }
412:
413:                public Info(int type, Serializable data) {
414:                    this .type = type;
415:                    this .data = data;
416:                }
417:
418:                public String toString() {
419:                    StringBuffer sb = new StringBuffer();
420:                    sb.append("type= ");
421:                    if (type == NOTIFICATION)
422:                        sb.append("NOTIFICATION");
423:                    else if (type == GET_CACHE_REQ)
424:                        sb.append("GET_CACHE_REQ");
425:                    else if (type == GET_CACHE_RSP)
426:                        sb.append("GET_CACHE_RSP");
427:                    else
428:                        sb.append("<unknown>");
429:                    if (data != null) {
430:                        if (type == NOTIFICATION)
431:                            sb.append(", notification=" + data);
432:                        else if (type == GET_CACHE_RSP)
433:                            sb.append(", cache=" + data);
434:                    }
435:                    return sb.toString();
436:                }
437:            }
438:
439:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.