Source Code Cross Referenced for McastServiceImpl.java in  » Sevlet-Container » apache-tomcat-6.0.14 » org » apache » catalina » tribes » membership » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Sevlet Container » apache tomcat 6.0.14 » org.apache.catalina.tribes.membership 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        /*
002:         * Licensed to the Apache Software Foundation (ASF) under one or more
003:         * contributor license agreements.  See the NOTICE file distributed with
004:         * this work for additional information regarding copyright ownership.
005:         * The ASF licenses this file to You under the Apache License, Version 2.0
006:         * (the "License"); you may not use this file except in compliance with
007:         * the License.  You may obtain a copy of the License at
008:         * 
009:         *      http://www.apache.org/licenses/LICENSE-2.0
010:         * 
011:         * Unless required by applicable law or agreed to in writing, software
012:         * distributed under the License is distributed on an "AS IS" BASIS,
013:         * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014:         * See the License for the specific language governing permissions and
015:         * limitations under the License.
016:         */
017:
018:        package org.apache.catalina.tribes.membership;
019:
020:        import java.io.IOException;
021:        import java.net.DatagramPacket;
022:        import java.net.InetAddress;
023:        import java.net.MulticastSocket;
024:
025:        import org.apache.catalina.tribes.MembershipListener;
026:        import java.util.Arrays;
027:        import java.net.SocketTimeoutException;
028:        import org.apache.catalina.tribes.Member;
029:        import org.apache.catalina.tribes.Channel;
030:        import java.net.InetSocketAddress;
031:
032:        /**
033:         * A <b>membership</b> implementation using simple multicast.
034:         * This is the representation of a multicast membership service.
035:         * This class is responsible for maintaining a list of active cluster nodes in the cluster.
036:         * If a node fails to send out a heartbeat, the node will be dismissed.
037:         * This is the low level implementation that handles the multicasting sockets.
038:         * Need to fix this, could use java.nio and only need one thread to send and receive, or
039:         * just use a timeout on the receive
040:         * @author Filip Hanik
041:         * @version $Revision: 467222 $, $Date: 2006-10-24 05:17:11 +0200 (mar., 24 oct. 2006) $
042:         */
043:        public class McastServiceImpl {
044:            private static org.apache.juli.logging.Log log = org.apache.juli.logging.LogFactory
045:                    .getLog(McastService.class);
046:
047:            protected static int MAX_PACKET_SIZE = 65535;
048:            /**
049:             * Internal flag used for the listen thread that listens to the multicasting socket.
050:             */
051:            protected boolean doRunSender = false;
052:            protected boolean doRunReceiver = false;
053:            protected int startLevel = 0;
054:            /**
055:             * Socket that we intend to listen to
056:             */
057:            protected MulticastSocket socket;
058:            /**
059:             * The local member that we intend to broad cast over and over again
060:             */
061:            protected MemberImpl member;
062:            /**
063:             * The multicast address
064:             */
065:            protected InetAddress address;
066:            /**
067:             * The multicast port
068:             */
069:            protected int port;
070:            /**
071:             * The time it takes for a member to expire.
072:             */
073:            protected long timeToExpiration;
074:            /**
075:             * How often to we send out a broadcast saying we are alive, must be smaller than timeToExpiration
076:             */
077:            protected long sendFrequency;
078:            /**
079:             * Reuse the sendPacket, no need to create a new one everytime
080:             */
081:            protected DatagramPacket sendPacket;
082:            /**
083:             * Reuse the receivePacket, no need to create a new one everytime
084:             */
085:            protected DatagramPacket receivePacket;
086:            /**
087:             * The membership, used so that we calculate memberships when they arrive or don't arrive
088:             */
089:            protected Membership membership;
090:            /**
091:             * The actual listener, for callback when shits goes down
092:             */
093:            protected MembershipListener service;
094:            /**
095:             * Thread to listen for pings
096:             */
097:            protected ReceiverThread receiver;
098:            /**
099:             * Thread to send pings
100:             */
101:            protected SenderThread sender;
102:
103:            /**
104:             * When was the service started
105:             */
106:            protected long serviceStartTime = System.currentTimeMillis();
107:
108:            /**
109:             * Time to live for the multicast packets that are being sent out
110:             */
111:            protected int mcastTTL = -1;
112:            /**
113:             * Read timeout on the mcast socket
114:             */
115:            protected int mcastSoTimeout = -1;
116:            /**
117:             * bind address
118:             */
119:            protected InetAddress mcastBindAddress = null;
120:
121:            /**
122:             * Create a new mcast service impl
123:             * @param member - the local member
124:             * @param sendFrequency - the time (ms) in between pings sent out
125:             * @param expireTime - the time (ms) for a member to expire
126:             * @param port - the mcast port
127:             * @param bind - the bind address (not sure this is used yet)
128:             * @param mcastAddress - the mcast address
129:             * @param service - the callback service
130:             * @throws IOException
131:             */
132:            public McastServiceImpl(MemberImpl member, long sendFrequency,
133:                    long expireTime, int port, InetAddress bind,
134:                    InetAddress mcastAddress, int ttl, int soTimeout,
135:                    MembershipListener service) throws IOException {
136:                this .member = member;
137:                this .address = mcastAddress;
138:                this .port = port;
139:                this .mcastSoTimeout = soTimeout;
140:                this .mcastTTL = ttl;
141:                this .mcastBindAddress = bind;
142:                this .timeToExpiration = expireTime;
143:                this .service = service;
144:                this .sendFrequency = sendFrequency;
145:                setupSocket();
146:                sendPacket = new DatagramPacket(new byte[MAX_PACKET_SIZE],
147:                        MAX_PACKET_SIZE);
148:                sendPacket.setAddress(address);
149:                sendPacket.setPort(port);
150:                receivePacket = new DatagramPacket(new byte[MAX_PACKET_SIZE],
151:                        MAX_PACKET_SIZE);
152:                receivePacket.setAddress(address);
153:                receivePacket.setPort(port);
154:                membership = new Membership(member);
155:            }
156:
157:            protected void setupSocket() throws IOException {
158:                if (mcastBindAddress != null)
159:                    socket = new MulticastSocket(new InetSocketAddress(
160:                            mcastBindAddress, port));
161:                else
162:                    socket = new MulticastSocket(port);
163:                socket.setLoopbackMode(false); //hint that we don't need loop back messages
164:                if (mcastBindAddress != null) {
165:                    if (log.isInfoEnabled())
166:                        log.info("Setting multihome multicast interface to:"
167:                                + mcastBindAddress);
168:                    socket.setInterface(mcastBindAddress);
169:                } //end if
170:                //force a so timeout so that we don't block forever
171:                if (mcastSoTimeout <= 0)
172:                    mcastSoTimeout = (int) sendFrequency;
173:                if (log.isInfoEnabled())
174:                    log.info("Setting cluster mcast soTimeout to "
175:                            + mcastSoTimeout);
176:                socket.setSoTimeout(mcastSoTimeout);
177:
178:                if (mcastTTL >= 0) {
179:                    if (log.isInfoEnabled())
180:                        log.info("Setting cluster mcast TTL to " + mcastTTL);
181:                    socket.setTimeToLive(mcastTTL);
182:                }
183:            }
184:
185:            /**
186:             * Start the service
187:             * @param level 1 starts the receiver, level 2 starts the sender
188:             * @throws IOException if the service fails to start
189:             * @throws IllegalStateException if the service is already started
190:             */
191:            public synchronized void start(int level) throws IOException {
192:                boolean valid = false;
193:                if ((level & Channel.MBR_RX_SEQ) == Channel.MBR_RX_SEQ) {
194:                    if (receiver != null)
195:                        throw new IllegalStateException(
196:                                "McastService.receive already running.");
197:                    if (sender == null)
198:                        socket.joinGroup(address);
199:                    doRunReceiver = true;
200:                    receiver = new ReceiverThread();
201:                    receiver.setDaemon(true);
202:                    receiver.start();
203:                    valid = true;
204:                }
205:                if ((level & Channel.MBR_TX_SEQ) == Channel.MBR_TX_SEQ) {
206:                    if (sender != null)
207:                        throw new IllegalStateException(
208:                                "McastService.send already running.");
209:                    if (receiver == null)
210:                        socket.joinGroup(address);
211:                    //make sure at least one packet gets out there
212:                    send(false);
213:                    doRunSender = true;
214:                    serviceStartTime = System.currentTimeMillis();
215:                    sender = new SenderThread(sendFrequency);
216:                    sender.setDaemon(true);
217:                    sender.start();
218:                    //we have started the receiver, but not yet waited for membership to establish
219:                    valid = true;
220:                }
221:                if (!valid) {
222:                    throw new IllegalArgumentException(
223:                            "Invalid start level. Only acceptable levels are Channel.MBR_RX_SEQ and Channel.MBR_TX_SEQ");
224:                }
225:                //pause, once or twice
226:                waitForMembers(level);
227:                startLevel = (startLevel | level);
228:            }
229:
230:            private void waitForMembers(int level) {
231:                long memberwait = sendFrequency * 2;
232:                if (log.isInfoEnabled())
233:                    log
234:                            .info("Sleeping for "
235:                                    + memberwait
236:                                    + " milliseconds to establish cluster membership, start level:"
237:                                    + level);
238:                try {
239:                    Thread.sleep(memberwait);
240:                } catch (InterruptedException ignore) {
241:                }
242:                if (log.isInfoEnabled())
243:                    log
244:                            .info("Done sleeping, membership established, start level:"
245:                                    + level);
246:            }
247:
248:            /**
249:             * Stops the service
250:             * @throws IOException if the service fails to disconnect from the sockets
251:             */
252:            public synchronized boolean stop(int level) throws IOException {
253:                boolean valid = false;
254:
255:                if ((level & Channel.MBR_RX_SEQ) == Channel.MBR_RX_SEQ) {
256:                    valid = true;
257:                    doRunReceiver = false;
258:                    if (receiver != null)
259:                        receiver.interrupt();
260:                    receiver = null;
261:                }
262:                if ((level & Channel.MBR_TX_SEQ) == Channel.MBR_TX_SEQ) {
263:                    valid = true;
264:                    doRunSender = false;
265:                    if (sender != null)
266:                        sender.interrupt();
267:                    sender = null;
268:                }
269:
270:                if (!valid) {
271:                    throw new IllegalArgumentException(
272:                            "Invalid stop level. Only acceptable levels are Channel.MBR_RX_SEQ and Channel.MBR_TX_SEQ");
273:                }
274:                startLevel = (startLevel & (~level));
275:                //we're shutting down, send a shutdown message and close the socket
276:                if (startLevel == 0) {
277:                    //send a stop message
278:                    member.setCommand(Member.SHUTDOWN_PAYLOAD);
279:                    member.getData(true, true);
280:                    send(false);
281:                    //leave mcast group
282:                    try {
283:                        socket.leaveGroup(address);
284:                    } catch (Exception ignore) {
285:                    }
286:                    serviceStartTime = Long.MAX_VALUE;
287:                }
288:                return (startLevel == 0);
289:            }
290:
291:            /**
292:             * Receive a datagram packet, locking wait
293:             * @throws IOException
294:             */
295:            public void receive() throws IOException {
296:                try {
297:                    socket.receive(receivePacket);
298:                    if (receivePacket.getLength() > MAX_PACKET_SIZE) {
299:                        log
300:                                .error("Multicast packet received was too long, dropping package:"
301:                                        + receivePacket.getLength());
302:                    } else {
303:                        byte[] data = new byte[receivePacket.getLength()];
304:                        System.arraycopy(receivePacket.getData(), receivePacket
305:                                .getOffset(), data, 0, data.length);
306:                        final MemberImpl m = MemberImpl.getMember(data);
307:                        if (log.isTraceEnabled())
308:                            log.trace("Mcast receive ping from member " + m);
309:                        Thread t = null;
310:                        if (Arrays.equals(m.getCommand(),
311:                                Member.SHUTDOWN_PAYLOAD)) {
312:                            if (log.isDebugEnabled())
313:                                log.debug("Member has shutdown:" + m);
314:                            membership.removeMember(m);
315:                            t = new Thread() {
316:                                public void run() {
317:                                    service.memberDisappeared(m);
318:                                }
319:                            };
320:                        } else if (membership.memberAlive(m)) {
321:                            if (log.isDebugEnabled())
322:                                log.debug("Mcast add member " + m);
323:                            t = new Thread() {
324:                                public void run() {
325:                                    service.memberAdded(m);
326:                                }
327:                            };
328:                        } //end if
329:                        if (t != null)
330:                            t.start();
331:                    }
332:                } catch (SocketTimeoutException x) {
333:                    //do nothing, this is normal, we don't want to block forever
334:                    //since the receive thread is the same thread
335:                    //that does membership expiration
336:                }
337:                checkExpired();
338:            }
339:
340:            protected Object expiredMutex = new Object();
341:
342:            protected void checkExpired() {
343:                synchronized (expiredMutex) {
344:                    MemberImpl[] expired = membership.expire(timeToExpiration);
345:                    for (int i = 0; i < expired.length; i++) {
346:                        final MemberImpl member = expired[i];
347:                        if (log.isDebugEnabled())
348:                            log.debug("Mcast exipre  member " + expired[i]);
349:                        try {
350:                            Thread t = new Thread() {
351:                                public void run() {
352:                                    service.memberDisappeared(member);
353:                                }
354:                            };
355:                            t.start();
356:                        } catch (Exception x) {
357:                            log
358:                                    .error(
359:                                            "Unable to process member disappeared message.",
360:                                            x);
361:                        }
362:                    }
363:                }
364:            }
365:
366:            /**
367:             * Send a ping
368:             * @throws Exception
369:             */
370:            public void send(boolean checkexpired) throws IOException {
371:                //ignore if we haven't started the sender
372:                //if ( (startLevel&Channel.MBR_TX_SEQ) != Channel.MBR_TX_SEQ ) return;
373:                member.inc();
374:                if (log.isTraceEnabled())
375:                    log.trace("Mcast send ping from member " + member);
376:                byte[] data = member.getData();
377:                DatagramPacket p = new DatagramPacket(data, data.length);
378:                p.setAddress(address);
379:                p.setPort(port);
380:                socket.send(p);
381:                if (checkexpired)
382:                    checkExpired();
383:            }
384:
385:            public long getServiceStartTime() {
386:                return this .serviceStartTime;
387:            }
388:
389:            public class ReceiverThread extends Thread {
390:                public ReceiverThread() {
391:                    super ();
392:                    setName("Cluster-MembershipReceiver");
393:                }
394:
395:                public void run() {
396:                    while (doRunReceiver) {
397:                        try {
398:                            receive();
399:                        } catch (ArrayIndexOutOfBoundsException ax) {
400:                            //we can ignore this, as it means we have an invalid package
401:                            //but we will log it to debug
402:                            if (log.isDebugEnabled())
403:                                log.debug("Invalid member mcast package.", ax);
404:                        } catch (Exception x) {
405:                            log
406:                                    .warn(
407:                                            "Error receiving mcast package. Sleeping 500ms",
408:                                            x);
409:                            try {
410:                                Thread.sleep(500);
411:                            } catch (Exception ignore) {
412:                            }
413:
414:                        }
415:                    }
416:                }
417:            }//class ReceiverThread
418:
419:            public class SenderThread extends Thread {
420:                long time;
421:
422:                public SenderThread(long time) {
423:                    this .time = time;
424:                    setName("Cluster-MembershipSender");
425:
426:                }
427:
428:                public void run() {
429:                    while (doRunSender) {
430:                        try {
431:                            send(true);
432:                        } catch (Exception x) {
433:                            log.warn("Unable to send mcast message.", x);
434:                        }
435:                        try {
436:                            Thread.sleep(time);
437:                        } catch (Exception ignore) {
438:                        }
439:                    }
440:                }
441:            }//class SenderThread
442:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.