Source Code Cross Referenced for PullPushAdapter.java in  » Net » JGroups-2.4.1-sp3 » org » jgroups » blocks » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Net » JGroups 2.4.1 sp3 » org.jgroups.blocks 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        // $Id: PullPushAdapter.java,v 1.22 2006/09/27 19:21:53 vlada Exp $
002:
003:        package org.jgroups.blocks;
004:
005:        import org.apache.commons.logging.Log;
006:        import org.apache.commons.logging.LogFactory;
007:        import org.jgroups.*;
008:        import org.jgroups.util.Util;
009:
010:        import java.io.IOException;
011:        import java.io.ObjectInput;
012:        import java.io.ObjectOutput;
013:        import java.io.Serializable;
014:        import java.util.ArrayList;
015:        import java.util.HashMap;
016:        import java.util.Iterator;
017:        import java.util.List;
018:
019:        /**
020:         * Allows a client of {@link org.jgroups.Channel} to be notified when messages have been received
021:         * instead of having to actively poll the channel for new messages. Typically used in the
022:         * client role (receive()). As this class does not implement interface
023:         * {@link org.jgroups.Transport}, but <b>uses</b> it for receiving messages, an underlying object
024:         * has to be used to send messages (e.g. the channel on which an object of this class relies).<p>
025:         * Multiple MembershipListeners can register with the PullPushAdapter; when a view is received, they
026:         * will all be notified. There is one main message listener which sends and receives message. In addition,
027:         * MessageListeners can register with a certain tag (identifier), and then send messages tagged with this
028:         * identifier. When a message with such an identifier is received, the corresponding MessageListener will be
029:         * looked up and the message dispatched to it. If no tag is found (default), the main MessageListener will
030:         * receive the message.
031:         * @author Bela Ban
032:         * @version $Revision
033:         */
034:        public class PullPushAdapter implements  Runnable, ChannelListener {
035:            protected Transport transport = null;
036:            protected MessageListener listener = null; // main message receiver
037:            protected final List membership_listeners = new ArrayList();
038:            protected Thread receiver_thread = null;
039:            protected final HashMap listeners = new HashMap(); // keys=identifier (Serializable), values=MessageListeners
040:            protected final Log log = LogFactory.getLog(getClass());
041:            static final String PULL_HEADER = "PULL_HEADER";
042:
043:            public PullPushAdapter(Transport transport) {
044:                this .transport = transport;
045:                start();
046:            }
047:
048:            public PullPushAdapter(Transport transport, MessageListener l) {
049:                this .transport = transport;
050:                setListener(l);
051:                start();
052:            }
053:
054:            public PullPushAdapter(Transport transport, MembershipListener ml) {
055:                this .transport = transport;
056:                addMembershipListener(ml);
057:                start();
058:            }
059:
060:            public PullPushAdapter(Transport transport, MessageListener l,
061:                    MembershipListener ml) {
062:                this .transport = transport;
063:                setListener(l);
064:                addMembershipListener(ml);
065:                start();
066:            }
067:
068:            public PullPushAdapter(Transport transport, MessageListener l,
069:                    MembershipListener ml, boolean start) {
070:                this .transport = transport;
071:                setListener(l);
072:                addMembershipListener(ml);
073:                if (start)
074:                    start();
075:            }
076:
077:            public Transport getTransport() {
078:                return transport;
079:            }
080:
081:            public final void start() {
082:                if (receiver_thread == null || !receiver_thread.isAlive()) {
083:                    receiver_thread = new Thread(this , "PullPushAdapterThread");
084:                    receiver_thread.setDaemon(true);
085:                    receiver_thread.start();
086:                }
087:                if (transport instanceof  JChannel)
088:                    ((JChannel) transport).addChannelListener(this );
089:            }
090:
091:            public void stop() {
092:                Thread tmp = null;
093:                if (receiver_thread != null && receiver_thread.isAlive()) {
094:                    tmp = receiver_thread;
095:                    receiver_thread = null;
096:                    tmp.interrupt();
097:                    try {
098:                        tmp.join(1000);
099:                    } catch (Exception ex) {
100:                    }
101:                }
102:                receiver_thread = null;
103:            }
104:
105:            /**
106:             * Sends a message to the group - listeners to this identifier will receive the messages.
107:             * @param identifier the key that the proper listeners are listenting on 
108:             * @param msg the Message to be sent
109:             * @see #registerListener
110:             */
111:            public void send(Serializable identifier, Message msg)
112:                    throws Exception {
113:                if (msg == null) {
114:                    if (log.isErrorEnabled())
115:                        log.error("msg is null");
116:                    return;
117:                }
118:                if (identifier == null)
119:                    transport.send(msg);
120:                else {
121:                    msg.putHeader(PULL_HEADER, new PullHeader(identifier));
122:                    transport.send(msg);
123:                }
124:            }
125:
126:            /**
127:             * Sends a message with no identifier; listener member will get this message on the other group members.
128:             * @param msg the Message to be sent
129:             * @throws Exception
130:             */
131:            public void send(Message msg) throws Exception {
132:                send(null, msg);
133:            }
134:
135:            public final void setListener(MessageListener l) {
136:                listener = l;
137:            }
138:
139:            /**
140:             * Sets a listener to messages with a given identifier.
141:             * Messages sent with this identifier in their headers will be routed to this listener.
142:             * <b>Note: there can be only one listener for one identifier;
143:             * if you want to register a different listener to an already registered identifier, then unregister first.</b> 
144:             * @param identifier - messages sent on the group with this object will be received by this listener 
145:             * @param l - the listener that will get the message
146:             */
147:            public void registerListener(Serializable identifier,
148:                    MessageListener l) {
149:                if (l == null || identifier == null) {
150:                    if (log.isErrorEnabled())
151:                        log.error("message listener or identifier is null");
152:                    return;
153:                }
154:                if (listeners.containsKey(identifier)) {
155:                    if (log.isErrorEnabled())
156:                        log
157:                                .error("listener with identifier="
158:                                        + identifier
159:                                        + " already exists, choose a different identifier or unregister current listener");
160:                    // we do not want to overwrite the listener
161:                    return;
162:                }
163:                listeners.put(identifier, l);
164:            }
165:
166:            /**
167:             * Removes a message listener to a given identifier from the message listeners map.
168:             * @param identifier - the key to whom we do not want to listen any more
169:             */
170:            public void unregisterListener(Serializable identifier) {
171:                listeners.remove(identifier);
172:            }
173:
174:            /** @deprecated Use {@link #addMembershipListener} */
175:            public void setMembershipListener(MembershipListener ml) {
176:                addMembershipListener(ml);
177:            }
178:
179:            public final void addMembershipListener(MembershipListener l) {
180:                if (l != null && !membership_listeners.contains(l))
181:                    membership_listeners.add(l);
182:            }
183:
184:            public void removeMembershipListener(MembershipListener l) {
185:                if (l != null && membership_listeners.contains(l))
186:                    membership_listeners.remove(l);
187:            }
188:
189:            /**
190:             * Reentrant run(): message reception is serialized, then the listener is notified of the
191:             * message reception
192:             */
193:            public void run() {
194:                Object obj;
195:
196:                while (receiver_thread != null
197:                        && Thread.currentThread().equals(receiver_thread)) {
198:                    try {
199:                        obj = transport.receive(0);
200:                        if (obj == null)
201:                            continue;
202:
203:                        if (obj instanceof  Message) {
204:                            handleMessage((Message) obj);
205:                        } else if (obj instanceof  GetStateEvent) {
206:                            byte[] retval = null;
207:                            GetStateEvent evt = (GetStateEvent) obj;
208:                            String state_id = evt.getStateId();
209:                            if (listener != null) {
210:                                try {
211:                                    if (listener instanceof  ExtendedMessageListener
212:                                            && state_id != null) {
213:                                        retval = ((ExtendedMessageListener) listener)
214:                                                .getState(state_id);
215:                                    } else {
216:                                        retval = listener.getState();
217:                                    }
218:                                } catch (Throwable t) {
219:                                    log
220:                                            .error(
221:                                                    "getState() from application failed, will return empty state",
222:                                                    t);
223:                                }
224:                            } else {
225:                                log
226:                                        .warn("no listener registered, returning empty state");
227:                            }
228:
229:                            if (transport instanceof  Channel) {
230:                                ((Channel) transport).returnState(retval,
231:                                        state_id);
232:                            } else {
233:                                if (log.isErrorEnabled())
234:                                    log
235:                                            .error("underlying transport is not a Channel, but a "
236:                                                    + transport.getClass()
237:                                                            .getName()
238:                                                    + ": cannot return state using returnState()");
239:                            }
240:                        } else if (obj instanceof  SetStateEvent) {
241:                            SetStateEvent evt = (SetStateEvent) obj;
242:                            String state_id = evt.getStateId();
243:                            if (listener != null) {
244:                                try {
245:                                    if (listener instanceof  ExtendedMessageListener
246:                                            && state_id != null) {
247:                                        ((ExtendedMessageListener) listener)
248:                                                .setState(state_id, evt
249:                                                        .getArg());
250:                                    } else {
251:                                        listener.setState(evt.getArg());
252:                                    }
253:                                } catch (ClassCastException cast_ex) {
254:                                    if (log.isErrorEnabled())
255:                                        log
256:                                                .error("received SetStateEvent, but argument "
257:                                                        + ((SetStateEvent) obj)
258:                                                                .getArg()
259:                                                        + " is not serializable ! Discarding message.");
260:                                }
261:                            }
262:                        } else if (obj instanceof  StreamingGetStateEvent) {
263:                            StreamingGetStateEvent evt = (StreamingGetStateEvent) obj;
264:                            if (listener instanceof  ExtendedMessageListener) {
265:                                if (evt.getStateId() == null) {
266:                                    ((ExtendedMessageListener) listener)
267:                                            .getState(evt.getArg());
268:                                } else {
269:                                    ((ExtendedMessageListener) listener)
270:                                            .getState(evt.getStateId(), evt
271:                                                    .getArg());
272:                                }
273:                            }
274:                        } else if (obj instanceof  StreamingSetStateEvent) {
275:                            StreamingSetStateEvent evt = (StreamingSetStateEvent) obj;
276:                            if (listener instanceof  ExtendedMessageListener) {
277:                                if (evt.getStateId() == null) {
278:                                    ((ExtendedMessageListener) listener)
279:                                            .setState(evt.getArg());
280:                                } else {
281:                                    ((ExtendedMessageListener) listener)
282:                                            .setState(evt.getStateId(), evt
283:                                                    .getArg());
284:                                }
285:                            }
286:                        } else if (obj instanceof  View) {
287:                            notifyViewChange((View) obj);
288:                        } else if (obj instanceof  SuspectEvent) {
289:                            notifySuspect((Address) ((SuspectEvent) obj)
290:                                    .getMember());
291:                        } else if (obj instanceof  BlockEvent) {
292:                            notifyBlock();
293:                            if (transport instanceof  Channel) {
294:                                ((Channel) transport).blockOk();
295:                            }
296:                        } else if (obj instanceof  UnblockEvent) {
297:                            notifyUnblock();
298:                        }
299:                    } catch (ChannelNotConnectedException conn) {
300:                        Address local_addr = ((Channel) transport)
301:                                .getLocalAddress();
302:                        if (log.isTraceEnabled())
303:                            log.trace('['
304:                                    + (local_addr == null ? "<null>"
305:                                            : local_addr.toString())
306:                                    + "] channel not connected, exception is "
307:                                    + conn);
308:                        Util.sleep(1000);
309:                        receiver_thread = null;
310:                        break;
311:                    } catch (ChannelClosedException closed_ex) {
312:                        Address local_addr = ((Channel) transport)
313:                                .getLocalAddress();
314:                        if (log.isTraceEnabled())
315:                            log.trace('['
316:                                    + (local_addr == null ? "<null>"
317:                                            : local_addr.toString())
318:                                    + "] channel closed, exception is "
319:                                    + closed_ex);
320:                        // Util.sleep(1000);
321:                        receiver_thread = null;
322:                        break;
323:                    } catch (Throwable e) {
324:                    }
325:                }
326:            }
327:
328:            /**
329:             * Check whether the message has an identifier. If yes, lookup the MessageListener associated with the
330:             * given identifier in the hashtable and dispatch to it. Otherwise just use the main (default) message
331:             * listener
332:             */
333:            protected void handleMessage(Message msg) {
334:                PullHeader hdr = (PullHeader) msg.getHeader(PULL_HEADER);
335:                Serializable identifier;
336:                MessageListener l;
337:
338:                if (hdr != null && (identifier = hdr.getIdentifier()) != null) {
339:                    l = (MessageListener) listeners.get(identifier);
340:                    if (l == null) {
341:                        if (log.isErrorEnabled())
342:                            log
343:                                    .error("received a messages tagged with identifier="
344:                                            + identifier
345:                                            + ", but there is no registration for that identifier. Will drop message");
346:                    } else
347:                        l.receive(msg);
348:                } else {
349:                    if (listener != null)
350:                        listener.receive(msg);
351:                }
352:            }
353:
354:            protected void notifyViewChange(View v) {
355:                MembershipListener l;
356:
357:                if (v == null)
358:                    return;
359:                for (Iterator it = membership_listeners.iterator(); it
360:                        .hasNext();) {
361:                    l = (MembershipListener) it.next();
362:                    try {
363:                        l.viewAccepted(v);
364:                    } catch (Throwable ex) {
365:                        if (log.isErrorEnabled())
366:                            log.error("exception notifying " + l + ": " + ex);
367:                    }
368:                }
369:            }
370:
371:            protected void notifySuspect(Address suspected_mbr) {
372:                MembershipListener l;
373:
374:                if (suspected_mbr == null)
375:                    return;
376:                for (Iterator it = membership_listeners.iterator(); it
377:                        .hasNext();) {
378:                    l = (MembershipListener) it.next();
379:                    try {
380:                        l.suspect(suspected_mbr);
381:                    } catch (Throwable ex) {
382:                        if (log.isErrorEnabled())
383:                            log.error("exception notifying " + l + ": " + ex);
384:                    }
385:                }
386:            }
387:
388:            protected void notifyBlock() {
389:                MembershipListener l;
390:
391:                for (Iterator it = membership_listeners.iterator(); it
392:                        .hasNext();) {
393:                    l = (MembershipListener) it.next();
394:                    try {
395:                        l.block();
396:                    } catch (Throwable ex) {
397:                        if (log.isErrorEnabled())
398:                            log.error("exception notifying " + l + ": " + ex);
399:                    }
400:                }
401:            }
402:
403:            protected void notifyUnblock() {
404:                MembershipListener l;
405:
406:                for (Iterator it = membership_listeners.iterator(); it
407:                        .hasNext();) {
408:                    l = (MembershipListener) it.next();
409:                    if (l instanceof  ExtendedMembershipListener) {
410:                        try {
411:                            ((ExtendedMembershipListener) l).unblock();
412:                        } catch (Throwable ex) {
413:                            if (log.isErrorEnabled())
414:                                log.error("exception notifying " + l + ": "
415:                                        + ex);
416:                        }
417:                    }
418:                }
419:            }
420:
421:            public void channelConnected(Channel channel) {
422:                if (log.isTraceEnabled())
423:                    log.trace("channel is connected");
424:            }
425:
426:            public void channelDisconnected(Channel channel) {
427:                if (log.isTraceEnabled())
428:                    log.trace("channel is disconnected");
429:            }
430:
431:            public void channelClosed(Channel channel) {
432:            }
433:
434:            public void channelShunned() {
435:                if (log.isTraceEnabled())
436:                    log.trace("channel is shunned");
437:            }
438:
439:            public void channelReconnected(Address addr) {
440:                start();
441:            }
442:
443:            public static final class PullHeader extends Header {
444:                Serializable identifier = null;
445:
446:                public PullHeader() {
447:                    ; // used by externalization
448:                }
449:
450:                public PullHeader(Serializable identifier) {
451:                    this .identifier = identifier;
452:                }
453:
454:                public Serializable getIdentifier() {
455:                    return identifier;
456:                }
457:
458:                public long size() {
459:                    if (identifier == null)
460:                        return 12;
461:                    else
462:                        return 64;
463:                }
464:
465:                public String toString() {
466:                    return "PullHeader";
467:                }
468:
469:                public void writeExternal(ObjectOutput out) throws IOException {
470:                    out.writeObject(identifier);
471:                }
472:
473:                public void readExternal(ObjectInput in) throws IOException,
474:                        ClassNotFoundException {
475:                    identifier = (Serializable) in.readObject();
476:                }
477:            }
478:
479:            /**
480:             * @return Returns the listener.
481:             */
482:            public MessageListener getListener() {
483:                return listener;
484:            }
485:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.