001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017:
018: package org.apache.catalina.tribes.membership;
019:
020: import java.io.IOException;
021: import java.net.DatagramPacket;
022: import java.net.InetAddress;
023: import java.net.MulticastSocket;
024:
025: import org.apache.catalina.tribes.MembershipListener;
026: import java.util.Arrays;
027: import java.net.SocketTimeoutException;
028: import org.apache.catalina.tribes.Member;
029: import org.apache.catalina.tribes.Channel;
030: import java.net.InetSocketAddress;
031:
032: /**
033: * A <b>membership</b> implementation using simple multicast.
034: * This is the representation of a multicast membership service.
035: * This class is responsible for maintaining a list of active cluster nodes in the cluster.
036: * If a node fails to send out a heartbeat, the node will be dismissed.
037: * This is the low level implementation that handles the multicasting sockets.
038: * Need to fix this, could use java.nio and only need one thread to send and receive, or
039: * just use a timeout on the receive
040: * @author Filip Hanik
041: * @version $Revision: 467222 $, $Date: 2006-10-24 05:17:11 +0200 (mar., 24 oct. 2006) $
042: */
043: public class McastServiceImpl {
044: private static org.apache.juli.logging.Log log = org.apache.juli.logging.LogFactory
045: .getLog(McastService.class);
046:
047: protected static int MAX_PACKET_SIZE = 65535;
048: /**
049: * Internal flag used for the listen thread that listens to the multicasting socket.
050: */
051: protected boolean doRunSender = false;
052: protected boolean doRunReceiver = false;
053: protected int startLevel = 0;
054: /**
055: * Socket that we intend to listen to
056: */
057: protected MulticastSocket socket;
058: /**
059: * The local member that we intend to broad cast over and over again
060: */
061: protected MemberImpl member;
062: /**
063: * The multicast address
064: */
065: protected InetAddress address;
066: /**
067: * The multicast port
068: */
069: protected int port;
070: /**
071: * The time it takes for a member to expire.
072: */
073: protected long timeToExpiration;
074: /**
075: * How often to we send out a broadcast saying we are alive, must be smaller than timeToExpiration
076: */
077: protected long sendFrequency;
078: /**
079: * Reuse the sendPacket, no need to create a new one everytime
080: */
081: protected DatagramPacket sendPacket;
082: /**
083: * Reuse the receivePacket, no need to create a new one everytime
084: */
085: protected DatagramPacket receivePacket;
086: /**
087: * The membership, used so that we calculate memberships when they arrive or don't arrive
088: */
089: protected Membership membership;
090: /**
091: * The actual listener, for callback when shits goes down
092: */
093: protected MembershipListener service;
094: /**
095: * Thread to listen for pings
096: */
097: protected ReceiverThread receiver;
098: /**
099: * Thread to send pings
100: */
101: protected SenderThread sender;
102:
103: /**
104: * When was the service started
105: */
106: protected long serviceStartTime = System.currentTimeMillis();
107:
108: /**
109: * Time to live for the multicast packets that are being sent out
110: */
111: protected int mcastTTL = -1;
112: /**
113: * Read timeout on the mcast socket
114: */
115: protected int mcastSoTimeout = -1;
116: /**
117: * bind address
118: */
119: protected InetAddress mcastBindAddress = null;
120:
121: /**
122: * Create a new mcast service impl
123: * @param member - the local member
124: * @param sendFrequency - the time (ms) in between pings sent out
125: * @param expireTime - the time (ms) for a member to expire
126: * @param port - the mcast port
127: * @param bind - the bind address (not sure this is used yet)
128: * @param mcastAddress - the mcast address
129: * @param service - the callback service
130: * @throws IOException
131: */
132: public McastServiceImpl(MemberImpl member, long sendFrequency,
133: long expireTime, int port, InetAddress bind,
134: InetAddress mcastAddress, int ttl, int soTimeout,
135: MembershipListener service) throws IOException {
136: this .member = member;
137: this .address = mcastAddress;
138: this .port = port;
139: this .mcastSoTimeout = soTimeout;
140: this .mcastTTL = ttl;
141: this .mcastBindAddress = bind;
142: this .timeToExpiration = expireTime;
143: this .service = service;
144: this .sendFrequency = sendFrequency;
145: setupSocket();
146: sendPacket = new DatagramPacket(new byte[MAX_PACKET_SIZE],
147: MAX_PACKET_SIZE);
148: sendPacket.setAddress(address);
149: sendPacket.setPort(port);
150: receivePacket = new DatagramPacket(new byte[MAX_PACKET_SIZE],
151: MAX_PACKET_SIZE);
152: receivePacket.setAddress(address);
153: receivePacket.setPort(port);
154: membership = new Membership(member);
155: }
156:
157: protected void setupSocket() throws IOException {
158: if (mcastBindAddress != null)
159: socket = new MulticastSocket(new InetSocketAddress(
160: mcastBindAddress, port));
161: else
162: socket = new MulticastSocket(port);
163: socket.setLoopbackMode(false); //hint that we don't need loop back messages
164: if (mcastBindAddress != null) {
165: if (log.isInfoEnabled())
166: log.info("Setting multihome multicast interface to:"
167: + mcastBindAddress);
168: socket.setInterface(mcastBindAddress);
169: } //end if
170: //force a so timeout so that we don't block forever
171: if (mcastSoTimeout <= 0)
172: mcastSoTimeout = (int) sendFrequency;
173: if (log.isInfoEnabled())
174: log.info("Setting cluster mcast soTimeout to "
175: + mcastSoTimeout);
176: socket.setSoTimeout(mcastSoTimeout);
177:
178: if (mcastTTL >= 0) {
179: if (log.isInfoEnabled())
180: log.info("Setting cluster mcast TTL to " + mcastTTL);
181: socket.setTimeToLive(mcastTTL);
182: }
183: }
184:
185: /**
186: * Start the service
187: * @param level 1 starts the receiver, level 2 starts the sender
188: * @throws IOException if the service fails to start
189: * @throws IllegalStateException if the service is already started
190: */
191: public synchronized void start(int level) throws IOException {
192: boolean valid = false;
193: if ((level & Channel.MBR_RX_SEQ) == Channel.MBR_RX_SEQ) {
194: if (receiver != null)
195: throw new IllegalStateException(
196: "McastService.receive already running.");
197: if (sender == null)
198: socket.joinGroup(address);
199: doRunReceiver = true;
200: receiver = new ReceiverThread();
201: receiver.setDaemon(true);
202: receiver.start();
203: valid = true;
204: }
205: if ((level & Channel.MBR_TX_SEQ) == Channel.MBR_TX_SEQ) {
206: if (sender != null)
207: throw new IllegalStateException(
208: "McastService.send already running.");
209: if (receiver == null)
210: socket.joinGroup(address);
211: //make sure at least one packet gets out there
212: send(false);
213: doRunSender = true;
214: serviceStartTime = System.currentTimeMillis();
215: sender = new SenderThread(sendFrequency);
216: sender.setDaemon(true);
217: sender.start();
218: //we have started the receiver, but not yet waited for membership to establish
219: valid = true;
220: }
221: if (!valid) {
222: throw new IllegalArgumentException(
223: "Invalid start level. Only acceptable levels are Channel.MBR_RX_SEQ and Channel.MBR_TX_SEQ");
224: }
225: //pause, once or twice
226: waitForMembers(level);
227: startLevel = (startLevel | level);
228: }
229:
230: private void waitForMembers(int level) {
231: long memberwait = sendFrequency * 2;
232: if (log.isInfoEnabled())
233: log
234: .info("Sleeping for "
235: + memberwait
236: + " milliseconds to establish cluster membership, start level:"
237: + level);
238: try {
239: Thread.sleep(memberwait);
240: } catch (InterruptedException ignore) {
241: }
242: if (log.isInfoEnabled())
243: log
244: .info("Done sleeping, membership established, start level:"
245: + level);
246: }
247:
248: /**
249: * Stops the service
250: * @throws IOException if the service fails to disconnect from the sockets
251: */
252: public synchronized boolean stop(int level) throws IOException {
253: boolean valid = false;
254:
255: if ((level & Channel.MBR_RX_SEQ) == Channel.MBR_RX_SEQ) {
256: valid = true;
257: doRunReceiver = false;
258: if (receiver != null)
259: receiver.interrupt();
260: receiver = null;
261: }
262: if ((level & Channel.MBR_TX_SEQ) == Channel.MBR_TX_SEQ) {
263: valid = true;
264: doRunSender = false;
265: if (sender != null)
266: sender.interrupt();
267: sender = null;
268: }
269:
270: if (!valid) {
271: throw new IllegalArgumentException(
272: "Invalid stop level. Only acceptable levels are Channel.MBR_RX_SEQ and Channel.MBR_TX_SEQ");
273: }
274: startLevel = (startLevel & (~level));
275: //we're shutting down, send a shutdown message and close the socket
276: if (startLevel == 0) {
277: //send a stop message
278: member.setCommand(Member.SHUTDOWN_PAYLOAD);
279: member.getData(true, true);
280: send(false);
281: //leave mcast group
282: try {
283: socket.leaveGroup(address);
284: } catch (Exception ignore) {
285: }
286: serviceStartTime = Long.MAX_VALUE;
287: }
288: return (startLevel == 0);
289: }
290:
291: /**
292: * Receive a datagram packet, locking wait
293: * @throws IOException
294: */
295: public void receive() throws IOException {
296: try {
297: socket.receive(receivePacket);
298: if (receivePacket.getLength() > MAX_PACKET_SIZE) {
299: log
300: .error("Multicast packet received was too long, dropping package:"
301: + receivePacket.getLength());
302: } else {
303: byte[] data = new byte[receivePacket.getLength()];
304: System.arraycopy(receivePacket.getData(), receivePacket
305: .getOffset(), data, 0, data.length);
306: final MemberImpl m = MemberImpl.getMember(data);
307: if (log.isTraceEnabled())
308: log.trace("Mcast receive ping from member " + m);
309: Thread t = null;
310: if (Arrays.equals(m.getCommand(),
311: Member.SHUTDOWN_PAYLOAD)) {
312: if (log.isDebugEnabled())
313: log.debug("Member has shutdown:" + m);
314: membership.removeMember(m);
315: t = new Thread() {
316: public void run() {
317: service.memberDisappeared(m);
318: }
319: };
320: } else if (membership.memberAlive(m)) {
321: if (log.isDebugEnabled())
322: log.debug("Mcast add member " + m);
323: t = new Thread() {
324: public void run() {
325: service.memberAdded(m);
326: }
327: };
328: } //end if
329: if (t != null)
330: t.start();
331: }
332: } catch (SocketTimeoutException x) {
333: //do nothing, this is normal, we don't want to block forever
334: //since the receive thread is the same thread
335: //that does membership expiration
336: }
337: checkExpired();
338: }
339:
340: protected Object expiredMutex = new Object();
341:
342: protected void checkExpired() {
343: synchronized (expiredMutex) {
344: MemberImpl[] expired = membership.expire(timeToExpiration);
345: for (int i = 0; i < expired.length; i++) {
346: final MemberImpl member = expired[i];
347: if (log.isDebugEnabled())
348: log.debug("Mcast exipre member " + expired[i]);
349: try {
350: Thread t = new Thread() {
351: public void run() {
352: service.memberDisappeared(member);
353: }
354: };
355: t.start();
356: } catch (Exception x) {
357: log
358: .error(
359: "Unable to process member disappeared message.",
360: x);
361: }
362: }
363: }
364: }
365:
366: /**
367: * Send a ping
368: * @throws Exception
369: */
370: public void send(boolean checkexpired) throws IOException {
371: //ignore if we haven't started the sender
372: //if ( (startLevel&Channel.MBR_TX_SEQ) != Channel.MBR_TX_SEQ ) return;
373: member.inc();
374: if (log.isTraceEnabled())
375: log.trace("Mcast send ping from member " + member);
376: byte[] data = member.getData();
377: DatagramPacket p = new DatagramPacket(data, data.length);
378: p.setAddress(address);
379: p.setPort(port);
380: socket.send(p);
381: if (checkexpired)
382: checkExpired();
383: }
384:
385: public long getServiceStartTime() {
386: return this .serviceStartTime;
387: }
388:
389: public class ReceiverThread extends Thread {
390: public ReceiverThread() {
391: super ();
392: setName("Cluster-MembershipReceiver");
393: }
394:
395: public void run() {
396: while (doRunReceiver) {
397: try {
398: receive();
399: } catch (ArrayIndexOutOfBoundsException ax) {
400: //we can ignore this, as it means we have an invalid package
401: //but we will log it to debug
402: if (log.isDebugEnabled())
403: log.debug("Invalid member mcast package.", ax);
404: } catch (Exception x) {
405: log
406: .warn(
407: "Error receiving mcast package. Sleeping 500ms",
408: x);
409: try {
410: Thread.sleep(500);
411: } catch (Exception ignore) {
412: }
413:
414: }
415: }
416: }
417: }//class ReceiverThread
418:
419: public class SenderThread extends Thread {
420: long time;
421:
422: public SenderThread(long time) {
423: this .time = time;
424: setName("Cluster-MembershipSender");
425:
426: }
427:
428: public void run() {
429: while (doRunSender) {
430: try {
431: send(true);
432: } catch (Exception x) {
433: log.warn("Unable to send mcast message.", x);
434: }
435: try {
436: Thread.sleep(time);
437: } catch (Exception ignore) {
438: }
439: }
440: }
441: }//class SenderThread
442: }
|