001: /*
002: * Copyright 1999,2004 The Apache Software Foundation.
003: *
004: * Licensed under the Apache License, Version 2.0 (the "License");
005: * you may not use this file except in compliance with the License.
006: * You may obtain a copy of the License at
007: *
008: * http://www.apache.org/licenses/LICENSE-2.0
009: *
010: * Unless required by applicable law or agreed to in writing, software
011: * distributed under the License is distributed on an "AS IS" BASIS,
012: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013: * See the License for the specific language governing permissions and
014: * limitations under the License.
015: */
016:
017: package org.apache.catalina.cluster.mcast;
018:
019: /**
020: * A <b>membership</b> implementation using simple multicast.
021: * This is the representation of a multicast membership service.
022: * This class is responsible for maintaining a list of active cluster nodes in the cluster.
023: * If a node fails to send out a heartbeat, the node will be dismissed.
024: * This is the low level implementation that handles the multicasting sockets.
025: * Need to fix this, could use java.nio and only need one thread to send and receive, or
026: * just use a timeout on the receive
027: * @author Filip Hanik
028: * @version $Revision: 1.11 $, $Date: 2004/05/14 15:00:29 $
029: */
030:
031: import java.net.MulticastSocket;
032: import java.io.IOException;
033: import java.net.InetAddress;
034: import java.net.DatagramPacket;
035: import org.apache.catalina.cluster.MembershipListener;
036:
037: public class McastServiceImpl {
038: private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory
039: .getLog(McastService.class);
040: /**
041: * Internal flag used for the listen thread that listens to the multicasting socket.
042: */
043: protected boolean doRun = false;
044: /**
045: * Socket that we intend to listen to
046: */
047: protected MulticastSocket socket;
048: /**
049: * The local member that we intend to broad cast over and over again
050: */
051: protected McastMember member;
052: /**
053: * The multicast address
054: */
055: protected InetAddress address;
056: /**
057: * The multicast port
058: */
059: protected int port;
060: /**
061: * The time it takes for a member to expire.
062: */
063: protected long timeToExpiration;
064: /**
065: * How often to we send out a broadcast saying we are alive, must be smaller than timeToExpiration
066: */
067: protected long sendFrequency;
068: /**
069: * Reuse the sendPacket, no need to create a new one everytime
070: */
071: protected DatagramPacket sendPacket;
072: /**
073: * Reuse the receivePacket, no need to create a new one everytime
074: */
075: protected DatagramPacket receivePacket;
076: /**
077: * The membership, used so that we calculate memberships when they arrive or don't arrive
078: */
079: protected McastMembership membership;
080: /**
081: * The actual listener, for callback when shits goes down
082: */
083: protected MembershipListener service;
084: /**
085: * Thread to listen for pings
086: */
087: protected ReceiverThread receiver;
088: /**
089: * Thread to send pings
090: */
091: protected SenderThread sender;
092:
093: /**
094: * When was the service started
095: */
096: protected long serviceStartTime = System.currentTimeMillis();
097:
098: protected int mcastTTL = -1;
099: protected int mcastSoTimeout = -1;
100: protected InetAddress mcastBindAddress = null;
101:
102: /**
103: * Create a new mcast service impl
104: * @param member - the local member
105: * @param sendFrequency - the time (ms) in between pings sent out
106: * @param expireTime - the time (ms) for a member to expire
107: * @param port - the mcast port
108: * @param bind - the bind address (not sure this is used yet)
109: * @param mcastAddress - the mcast address
110: * @param service - the callback service
111: * @throws IOException
112: */
113: public McastServiceImpl(McastMember member, long sendFrequency,
114: long expireTime, int port, InetAddress bind,
115: InetAddress mcastAddress, int ttl, int soTimeout,
116: MembershipListener service) throws IOException {
117: this .member = member;
118: address = mcastAddress;
119: this .port = port;
120: this .mcastSoTimeout = soTimeout;
121: this .mcastTTL = ttl;
122: this .mcastBindAddress = bind;
123: setupSocket();
124: sendPacket = new DatagramPacket(new byte[1000], 1000);
125: sendPacket.setAddress(address);
126: sendPacket.setPort(port);
127: receivePacket = new DatagramPacket(new byte[1000], 1000);
128: receivePacket.setAddress(address);
129: receivePacket.setPort(port);
130: membership = new McastMembership(member.getName());
131: timeToExpiration = expireTime;
132: this .service = service;
133: this .sendFrequency = sendFrequency;
134: }
135:
136: protected void setupSocket() throws IOException {
137: if (mcastBindAddress != null)
138: socket = new MulticastSocket(
139: new java.net.InetSocketAddress(mcastBindAddress,
140: port));
141: else
142: socket = new MulticastSocket(port);
143: if (mcastBindAddress != null) {
144: log.info("Setting multihome multicast interface to:"
145: + mcastBindAddress);
146: socket.setInterface(mcastBindAddress);
147: } //end if
148: if (mcastSoTimeout >= 0) {
149: log.info("Setting cluster mcast soTimeout to "
150: + mcastSoTimeout);
151: socket.setSoTimeout(mcastSoTimeout);
152: }
153: if (mcastTTL >= 0) {
154: log.info("Setting cluster mcast TTL to " + mcastTTL);
155: socket.setTimeToLive(mcastTTL);
156: }
157: }
158:
159: /**
160: * Start the service
161: * @param level 1 starts the receiver, level 2 starts the sender
162: * @throws IOException if the service fails to start
163: * @throws IllegalStateException if the service is already started
164: */
165: public synchronized void start(int level) throws IOException {
166: if (sender != null && receiver != null)
167: throw new IllegalStateException("Service already running.");
168: if (level == 1) {
169: socket.joinGroup(address);
170: doRun = true;
171: receiver = new ReceiverThread();
172: receiver.setDaemon(true);
173: receiver.start();
174: }
175: if (level == 2) {
176: serviceStartTime = System.currentTimeMillis();
177: sender = new SenderThread(sendFrequency);
178: sender.setDaemon(true);
179: sender.start();
180:
181: }
182: }
183:
184: /**
185: * Stops the service
186: * @throws IOException if the service fails to disconnect from the sockets
187: */
188: public synchronized void stop() throws IOException {
189: socket.leaveGroup(address);
190: doRun = false;
191: sender = null;
192: receiver = null;
193: serviceStartTime = Long.MAX_VALUE;
194: }
195:
196: /**
197: * Receive a datagram packet, locking wait
198: * @throws IOException
199: */
200: public void receive() throws IOException {
201: socket.receive(receivePacket);
202: byte[] data = new byte[receivePacket.getLength()];
203: System.arraycopy(receivePacket.getData(), receivePacket
204: .getOffset(), data, 0, data.length);
205: McastMember m = McastMember.getMember(data);
206: if (membership.memberAlive(m)) {
207: service.memberAdded(m);
208: }
209: McastMember[] expired = membership.expire(timeToExpiration);
210: for (int i = 0; i < expired.length; i++)
211: service.memberDisappeared(expired[i]);
212: }
213:
214: /**
215: * Send a ping
216: * @throws Exception
217: */
218: public void send() throws Exception {
219: member.inc();
220: byte[] data = member.getData(this .serviceStartTime);
221: DatagramPacket p = new DatagramPacket(data, data.length);
222: p.setAddress(address);
223: p.setPort(port);
224: socket.send(p);
225: }
226:
227: public long getServiceStartTime() {
228: return this .serviceStartTime;
229: }
230:
231: public class ReceiverThread extends Thread {
232: public ReceiverThread() {
233: super ();
234: setName("Cluster-MembershipReceiver");
235: }
236:
237: public void run() {
238: while (doRun) {
239: try {
240: receive();
241: } catch (Exception x) {
242: log.warn("Error receiving mcast package.", x);
243: }
244: }
245: }
246: }//class ReceiverThread
247:
248: public class SenderThread extends Thread {
249: long time;
250:
251: public SenderThread(long time) {
252: this .time = time;
253: setName("Cluster-MembershipSender");
254:
255: }
256:
257: public void run() {
258: while (doRun) {
259: try {
260: send();
261: this .sleep(time);
262: } catch (Exception x) {
263: log.warn("Unable to send mcast message.", x);
264: }
265: }
266: }
267: }//class SenderThread
268: }
|