Source Code Cross Referenced for SMACK.java in  » Net » JGroups-2.4.1-sp3 » org » jgroups » protocols » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Net » JGroups 2.4.1 sp3 » org.jgroups.protocols 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        // $Id: SMACK.java,v 1.14.6.1 2007/04/27 08:03:51 belaban Exp $
002:
003:        package org.jgroups.protocols;
004:
005:        import org.jgroups.*;
006:        import org.jgroups.stack.AckMcastSenderWindow;
007:        import org.jgroups.stack.AckReceiverWindow;
008:        import org.jgroups.stack.Protocol;
009:        import org.jgroups.util.Streamable;
010:        import org.jgroups.util.Util;
011:
012:        import java.io.*;
013:        import java.util.HashMap;
014:        import java.util.Iterator;
015:        import java.util.Properties;
016:        import java.util.Vector;
017:
018:        /**
019:         * Simple Multicast ACK protocol. A positive acknowledgment-based protocol for reliable delivery of
020:         * multicast messages, which does not need any group membership service.
021:         * Basically works as follows:
022:         * <ul>
023:         * <li>Sender S sends multicast message M</li>
024:         * <li>When member P receives M, it sends back a unicast ack to S</li>
025:         * <li>When S receives the ack from P, it checks whether P is in its
026:         *     membership list. If not, P will be added. This is necessary to retransmit the next message
027:         *     sent to P.</li>
028:         * <li>When S sends a multicast message M, all members are added to a
029:         *     retransmission entry (containing all members to which the message
030:         *     was sent), which is added to a hashmap (keyed by seqno). Whenever
031:         *     an ack is received from receiver X, X will be removed from the
032:         *     retransmission list for the given seqno. When the retransmission
033:         *     list is empty, the seqno will be removed from the hashmap.</li>
034:         * <li>A retransmitter thread in the sender periodically retransmits
035:         *     (either via unicast, or multicast) messages for which no ack has
036:         *     been received yet</li>
037:         * <li>When a max number of (unsuccessful) retransmissions have been
038:         *     exceeded, all remaining members for that seqno are removed from
039:         *     the local membership, and the seqno is removed from te hashmap,
040:         *     ceasing all retransmissions</li>
041:         * </ul>
042:         * Advantage of this protocol: no group membership necessary, fast.
043:         * @author Bela Ban Aug 2002
044:         * @version $Revision: 1.14.6.1 $
045:         * <BR> Fix membershop bug: start a, b, kill b, restart b: b will be suspected by a.
046:         */
047:        public class SMACK extends Protocol implements 
048:                AckMcastSenderWindow.RetransmitCommand {
049:            long[] timeout = { 1000, 2000, 3000 }; // retransmit timeouts (for AckMcastSenderWindow)
050:            int max_xmits = 10; // max retransmissions (if still no ack, member will be removed)
051:            final Vector members = new Vector(); // contains Addresses
052:            AckMcastSenderWindow sender_win = null;
053:            final HashMap receivers = new HashMap(); // keys=sender (Address), values=AckReceiverWindow
054:            final HashMap xmit_table = new HashMap(); // keeps track of num xmits / member (keys: mbr, val:num)
055:            Address local_addr = null; // my own address
056:            long seqno = 1; // seqno for msgs sent by this sender
057:            long vid = 1; // for the fake view changes
058:            boolean print_local_addr = true;
059:            static final String name = "SMACK";
060:
061:            public SMACK() {
062:            }
063:
064:            public String getName() {
065:                return name;
066:            }
067:
068:            public boolean setProperties(Properties props) {
069:                String str;
070:                long[] tmp;
071:
072:                super .setProperties(props);
073:                str = props.getProperty("print_local_addr");
074:                if (str != null) {
075:                    print_local_addr = Boolean.valueOf(str).booleanValue();
076:                    props.remove("print_local_addr");
077:                }
078:
079:                str = props.getProperty("timeout");
080:                if (str != null) {
081:                    tmp = Util.parseCommaDelimitedLongs(str);
082:                    props.remove("timeout");
083:                    if (tmp != null && tmp.length > 0)
084:                        timeout = tmp;
085:                }
086:
087:                str = props.getProperty("max_xmits");
088:                if (str != null) {
089:                    max_xmits = Integer.parseInt(str);
090:                    props.remove("max_xmits");
091:                }
092:
093:                if (props.size() > 0) {
094:                    log
095:                            .error("SMACK.setProperties(): the following properties are not recognized: "
096:                                    + props);
097:
098:                    return false;
099:                }
100:                return true;
101:            }
102:
103:            public void stop() {
104:                AckReceiverWindow win;
105:                if (sender_win != null) {
106:                    sender_win.stop();
107:                    sender_win = null;
108:                }
109:                for (Iterator it = receivers.values().iterator(); it.hasNext();) {
110:                    win = (AckReceiverWindow) it.next();
111:                    win.reset();
112:                }
113:                receivers.clear();
114:            }
115:
116:            public void up(Event evt) {
117:                Address sender;
118:
119:                switch (evt.getType()) {
120:
121:                case Event.SET_LOCAL_ADDRESS:
122:                    local_addr = (Address) evt.getArg();
123:                    addMember(local_addr);
124:                    if (print_local_addr) {
125:                        System.out
126:                                .println("\n-------------------------------------------------------\n"
127:                                        + "GMS: address is "
128:                                        + local_addr
129:                                        + "\n-------------------------------------------------------");
130:                    }
131:                    break;
132:
133:                case Event.CONNECT_OK:
134:                    passUp(evt);
135:                    sender_win = new AckMcastSenderWindow(this , timeout);
136:
137:                    // send join announcement
138:                    Message join_msg = new Message();
139:                    join_msg.putHeader(name, new SmackHeader(
140:                            SmackHeader.JOIN_ANNOUNCEMENT, -1));
141:                    passDown(new Event(Event.MSG, join_msg));
142:                    return;
143:
144:                case Event.SUSPECT:
145:
146:                    if (log.isInfoEnabled())
147:                        log.info("removing suspected member " + evt.getArg());
148:                    removeMember((Address) evt.getArg());
149:                    break;
150:
151:                case Event.MSG:
152:                    Message msg = (Message) evt.getArg(),
153:                    tmp_msg;
154:                    if (msg == null)
155:                        break;
156:                    sender = msg.getSrc();
157:                    SmackHeader hdr = (SmackHeader) msg.removeHeader(name);
158:                    if (hdr == null) // is probably a unicast message
159:                        break;
160:                    switch (hdr.type) {
161:                    case SmackHeader.MCAST: // send an ack, then pass up (if not already received)
162:                        Long tmp_seqno;
163:                        AckReceiverWindow win;
164:                        Message ack_msg = new Message(sender);
165:
166:                        ack_msg.putHeader(name, new SmackHeader(
167:                                SmackHeader.ACK, hdr.seqno));
168:                        passDown(new Event(Event.MSG, ack_msg));
169:
170:                        tmp_seqno = new Long(hdr.seqno);
171:
172:                        if (log.isTraceEnabled())
173:                            log.trace("received #" + tmp_seqno + " from "
174:                                    + sender);
175:
176:                        win = (AckReceiverWindow) receivers.get(sender);
177:                        if (win == null) {
178:                            addMember(sender);
179:                            win = new AckReceiverWindow(hdr.seqno);
180:                            receivers.put(sender, win);
181:                        }
182:                        win.add(hdr.seqno, msg);
183:
184:                        // now remove as many messages as possible
185:                        while ((tmp_msg = win.remove()) != null)
186:                            passUp(new Event(Event.MSG, tmp_msg));
187:                        return;
188:
189:                    case SmackHeader.ACK:
190:                        addMember(msg.getSrc());
191:                        sender_win.ack(hdr.seqno, msg.getSrc());
192:                        sender_win.clearStableMessages();
193:                        if (log.isTraceEnabled())
194:                            log.trace("received ack for #" + hdr.seqno
195:                                    + " from " + msg.getSrc());
196:                        return;
197:
198:                    case SmackHeader.JOIN_ANNOUNCEMENT:
199:
200:                        if (log.isInfoEnabled())
201:                            log.info("received join announcement by "
202:                                    + msg.getSrc());
203:
204:                        if (!containsMember(sender)) {
205:                            Message join_rsp = new Message(sender);
206:                            join_rsp.putHeader(name, new SmackHeader(
207:                                    SmackHeader.JOIN_ANNOUNCEMENT, -1));
208:                            passDown(new Event(Event.MSG, join_rsp));
209:                        }
210:                        addMember(sender);
211:                        return;
212:
213:                    case SmackHeader.LEAVE_ANNOUNCEMENT:
214:
215:                        if (log.isInfoEnabled())
216:                            log.info("received leave announcement by "
217:                                    + msg.getSrc());
218:
219:                        removeMember(sender);
220:                        return;
221:
222:                    default:
223:                        if (log.isWarnEnabled())
224:                            log.warn("detected SmackHeader with invalid type: "
225:                                    + hdr);
226:                        break;
227:                    }
228:                    break;
229:                }
230:
231:                passUp(evt);
232:            }
233:
234:            public void down(Event evt) {
235:                Message leave_msg;
236:
237:                switch (evt.getType()) {
238:
239:                case Event.DISCONNECT:
240:                    leave_msg = new Message();
241:                    leave_msg.putHeader(name, new SmackHeader(
242:                            SmackHeader.LEAVE_ANNOUNCEMENT, -1));
243:                    passDown(new Event(Event.MSG, leave_msg));
244:                    // passUp(new Event(Event.DISCONNECT_OK));
245:                    break;
246:
247:                case Event.CONNECT:
248:                    //passUp(new Event(Event.CONNECT_OK));
249:
250:                    // Do not send JOIN_ANOUNCEMENT here, don't know yet if the transport is OK.
251:                    // Send it later when handling CONNECT_OK from below
252:
253:                    //                 sender_win=new AckMcastSenderWindow(this, timeout);
254:                    //                 // send join announcement
255:                    //                 Message join_msg=new Message();
256:                    //                 join_msg.putHeader(name, new SmackHeader(SmackHeader.JOIN_ANNOUNCEMENT, -1));
257:                    //                 passDown(new Event(Event.MSG, join_msg));
258:                    //                 return;
259:
260:                    break;
261:
262:                // add a header with the current sequence number and increment seqno
263:                case Event.MSG:
264:                    Message msg = (Message) evt.getArg();
265:                    if (msg == null)
266:                        break;
267:                    if (msg.getDest() == null
268:                            || msg.getDest().isMulticastAddress()) {
269:                        msg.putHeader(name, new SmackHeader(SmackHeader.MCAST,
270:                                seqno));
271:                        sender_win.add(seqno, msg, (Vector) members.clone());
272:                        if (log.isTraceEnabled())
273:                            log.trace("sending mcast #" + seqno);
274:                        seqno++;
275:                    }
276:                    break;
277:                }
278:
279:                passDown(evt);
280:            }
281:
282:            /* ----------------------- Interface AckMcastSenderWindow.RetransmitCommand -------------------- */
283:
284:            public void retransmit(long seqno, Message msg, Address dest) {
285:                msg.setDest(dest);
286:
287:                if (log.isInfoEnabled())
288:                    log.info(seqno + ", msg=" + msg);
289:                passDown(new Event(Event.MSG, msg));
290:            }
291:
292:            /* -------------------- End of Interface AckMcastSenderWindow.RetransmitCommand ---------------- */
293:
294:            public static class SmackHeader extends Header implements 
295:                    Streamable {
296:                public static final byte MCAST = 1;
297:                public static final byte ACK = 2;
298:                public static final byte JOIN_ANNOUNCEMENT = 3;
299:                public static final byte LEAVE_ANNOUNCEMENT = 4;
300:
301:                byte type = 0;
302:                long seqno = -1;
303:
304:                public SmackHeader() {
305:                }
306:
307:                public SmackHeader(byte type, long seqno) {
308:                    this .type = type;
309:                    this .seqno = seqno;
310:                }
311:
312:                public void writeExternal(ObjectOutput out) throws IOException {
313:                    out.writeByte(type);
314:                    out.writeLong(seqno);
315:                }
316:
317:                public void readExternal(ObjectInput in) throws IOException,
318:                        ClassNotFoundException {
319:                    type = in.readByte();
320:                    seqno = in.readLong();
321:                }
322:
323:                public long size() {
324:                    return Global.LONG_SIZE + Global.BYTE_SIZE;
325:                }
326:
327:                public void writeTo(DataOutputStream out) throws IOException {
328:                    out.writeByte(type);
329:                    out.writeLong(seqno);
330:                }
331:
332:                public void readFrom(DataInputStream in) throws IOException,
333:                        IllegalAccessException, InstantiationException {
334:                    type = in.readByte();
335:                    seqno = in.readLong();
336:                }
337:
338:                public String toString() {
339:                    switch (type) {
340:                    case MCAST:
341:                        return "MCAST";
342:                    case ACK:
343:                        return "ACK";
344:                    case JOIN_ANNOUNCEMENT:
345:                        return "JOIN_ANNOUNCEMENT";
346:                    case LEAVE_ANNOUNCEMENT:
347:                        return "LEAVE_ANNOUNCEMENT";
348:                    default:
349:                        return "<unknown>";
350:                    }
351:                }
352:            }
353:
354:            /* ------------------------------------- Private methods --------------------------------------- */
355:            void addMember(Address mbr) {
356:                synchronized (members) {
357:                    if (mbr != null && !members.contains(mbr)) {
358:                        Object tmp;
359:                        View new_view;
360:                        members.addElement(mbr);
361:                        tmp = members.clone();
362:                        if (log.isTraceEnabled())
363:                            log.trace("added " + mbr + ", members=" + tmp);
364:                        new_view = new View(new ViewId(local_addr, vid++),
365:                                (Vector) tmp);
366:                        passUp(new Event(Event.VIEW_CHANGE, new_view));
367:                        passDown(new Event(Event.VIEW_CHANGE, new_view));
368:                    }
369:                }
370:            }
371:
372:            void removeMember(Address mbr) {
373:                synchronized (members) {
374:                    if (mbr != null) {
375:                        Object tmp;
376:                        View new_view;
377:                        members.removeElement(mbr);
378:                        tmp = members.clone();
379:                        if (log.isTraceEnabled())
380:                            log.trace("removed " + mbr + ", members=" + tmp);
381:                        new_view = new View(new ViewId(local_addr, vid++),
382:                                (Vector) tmp);
383:                        passUp(new Event(Event.VIEW_CHANGE, new_view));
384:                        passDown(new Event(Event.VIEW_CHANGE, new_view));
385:                        if (sender_win != null)
386:                            sender_win.remove(mbr); // causes retransmissions to mbr to stop
387:                    }
388:                }
389:            }
390:
391:            boolean containsMember(Address mbr) {
392:                synchronized (members) {
393:                    return mbr != null && members.contains(mbr);
394:                }
395:            }
396:
397:            /* --------------------------------- End of Private methods ------------------------------------ */
398:
399:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.