001: /**
002: * Copyright 2003-2007 Luck Consulting Pty Ltd
003: *
004: * Licensed under the Apache License, Version 2.0 (the "License");
005: * you may not use this file except in compliance with the License.
006: * You may obtain a copy of the License at
007: *
008: * http://www.apache.org/licenses/LICENSE-2.0
009: *
010: * Unless required by applicable law or agreed to in writing, software
011: * distributed under the License is distributed on an "AS IS" BASIS,
012: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013: * See the License for the specific language governing permissions and
014: * limitations under the License.
015: */package net.sf.ehcache.distribution;
016:
017: import net.sf.ehcache.CacheManager;
018: import org.apache.commons.logging.Log;
019: import org.apache.commons.logging.LogFactory;
020:
021: import java.io.IOException;
022: import java.net.DatagramPacket;
023: import java.net.InetAddress;
024: import java.net.MulticastSocket;
025: import java.util.List;
026: import java.util.ArrayList;
027: import java.util.Iterator;
028:
029: /**
030: * Sends heartbeats to a multicast group containing a compressed list of URLs.
031: * <p/>
032: * You can control how far the multicast packets propagate by setting the badly misnamed "TTL".
033: * Using the multicast IP protocol, the TTL value indicates the scope or range in which a packet may be forwarded.
034: * By convention:
035: * <ul>
036: * <li>0 is restricted to the same host
037: * <li>1 is restricted to the same subnet
038: * <li>32 is restricted to the same site
039: * <li>64 is restricted to the same region
040: * <li>128 is restricted to the same continent
041: * <li>255 is unrestricted
042: * </ul>
043: * You can also control how often the heartbeat sends by setting the interval.
044: *
045: * @author Greg Luck
046: * @version $Id: MulticastKeepaliveHeartbeatSender.java 537 2007-08-14 23:52:19Z gregluck $
047: */
048: public final class MulticastKeepaliveHeartbeatSender {
049:
050: private static final Log LOG = LogFactory
051: .getLog(MulticastKeepaliveHeartbeatSender.class.getName());
052:
053: private static final int DEFAULT_HEARTBEAT_INTERVAL = 5000;
054: private static final int MINIMUM_HEARTBEAT_INTERVAL = 1000;
055: private static final int MAXIMUM_PEERS_PER_SEND = 150;
056:
057: private static long heartBeatInterval = DEFAULT_HEARTBEAT_INTERVAL;
058:
059: private final InetAddress groupMulticastAddress;
060: private final Integer groupMulticastPort;
061: private final Integer timeToLive;
062: private MulticastServerThread serverThread;
063: private boolean stopped;
064: private final CacheManager cacheManager;
065:
066: /**
067: * Constructor.
068: *
069: * @param cacheManager the bound CacheManager. Each CacheManager has a maximum of one sender
070: * @param multicastAddress
071: * @param multicastPort
072: * @param timeToLive See class description for the meaning of this parameter.
073: */
074: public MulticastKeepaliveHeartbeatSender(CacheManager cacheManager,
075: InetAddress multicastAddress, Integer multicastPort,
076: Integer timeToLive) {
077: this .cacheManager = cacheManager;
078: this .groupMulticastAddress = multicastAddress;
079: this .groupMulticastPort = multicastPort;
080: this .timeToLive = timeToLive;
081:
082: }
083:
084: /**
085: * Start the heartbeat thread
086: */
087: public final void init() {
088: serverThread = new MulticastServerThread();
089: serverThread.start();
090: }
091:
092: /**
093: * Shutdown this heartbeat sender
094: */
095: public final synchronized void dispose() {
096: stopped = true;
097: notifyAll();
098: serverThread.interrupt();
099: }
100:
101: /**
102: * A thread which sends a multicast heartbeat every second
103: */
104: private final class MulticastServerThread extends Thread {
105:
106: private MulticastSocket socket;
107: private List compressedUrlListList = new ArrayList();
108: private int cachePeersHash;
109:
110: /**
111: * Constructor
112: */
113: public MulticastServerThread() {
114: super ("Multicast Heartbeat Sender Thread");
115: setDaemon(true);
116: }
117:
118: public final void run() {
119: while (!stopped) {
120: try {
121: socket = new MulticastSocket(groupMulticastPort
122: .intValue());
123: socket.setTimeToLive(timeToLive.intValue());
124: socket.joinGroup(groupMulticastAddress);
125:
126: while (!stopped) {
127: List buffers = createCachePeersPayload();
128: for (Iterator iter = buffers.iterator(); iter
129: .hasNext();) {
130: byte[] buffer = (byte[]) iter.next();
131: DatagramPacket packet = new DatagramPacket(
132: buffer, buffer.length,
133: groupMulticastAddress,
134: groupMulticastPort.intValue());
135: socket.send(packet);
136: }
137: try {
138: synchronized (this ) {
139: wait(heartBeatInterval);
140: }
141: } catch (InterruptedException e) {
142: if (!stopped) {
143: LOG.error(
144: "Error receiving heartbeat. Initial cause was "
145: + e.getMessage(), e);
146: }
147: }
148: }
149: } catch (IOException e) {
150: LOG.debug("Error on multicast socket", e);
151: } catch (Throwable e) {
152: LOG.info(
153: "Unexpected throwable in run thread. Continuing..."
154: + e.getMessage(), e);
155: } finally {
156: closeSocket();
157: }
158: if (!stopped) {
159: try {
160: sleep(heartBeatInterval);
161: } catch (InterruptedException e) {
162: LOG.error(
163: "Sleep after error interrupted. Initial cause was "
164: + e.getMessage(), e);
165: }
166: }
167: }
168: }
169:
170: /**
171: * Creates a gzipped payload.
172: * <p/>
173: * The last gzipped payload is retained and only recalculated if the list of cache peers
174: * has changed.
175: *
176: * @return a gzipped byte[]
177: */
178: private List createCachePeersPayload() {
179: List localCachePeers = cacheManager.getCachePeerListener()
180: .getBoundCachePeers();
181: int newCachePeersHash = localCachePeers.hashCode();
182: if (cachePeersHash != newCachePeersHash) {
183: cachePeersHash = newCachePeersHash;
184:
185: compressedUrlListList = new ArrayList();
186: while (localCachePeers.size() > 0) {
187: int endIndex = Math.min(localCachePeers.size(),
188: MAXIMUM_PEERS_PER_SEND);
189: List localCachePeersSubList = localCachePeers
190: .subList(0, endIndex);
191: localCachePeers = localCachePeers.subList(endIndex,
192: localCachePeers.size());
193:
194: byte[] uncompressedUrlList = PayloadUtil
195: .assembleUrlList(localCachePeersSubList);
196: byte[] compressedUrlList = PayloadUtil
197: .gzip(uncompressedUrlList);
198: if (compressedUrlList.length > PayloadUtil.MTU) {
199: LOG
200: .fatal("Heartbeat is not working. Configure fewer caches for replication. "
201: + "Size is "
202: + compressedUrlList.length
203: + " but should be no greater than"
204: + PayloadUtil.MTU);
205: }
206: compressedUrlListList.add(compressedUrlList);
207: }
208: }
209: return compressedUrlListList;
210: }
211:
212: /**
213: * Interrupts this thread.
214: * <p/>
215: * <p> Unless the current thread is interrupting itself, which is
216: * always permitted, the {@link #checkAccess() checkAccess} method
217: * of this thread is invoked, which may cause a {@link
218: * SecurityException} to be thrown.
219: * <p/>
220: * <p> If this thread is blocked in an invocation of the {@link
221: * Object#wait() wait()}, {@link Object#wait(long) wait(long)}, or {@link
222: * Object#wait(long,int) wait(long, int)} methods of the {@link Object}
223: * class, or of the {@link #join()}, {@link #join(long)}, {@link
224: * #join(long,int)}, {@link #sleep(long)}, or {@link #sleep(long,int)},
225: * methods of this class, then its interrupt status will be cleared and it
226: * will receive an {@link InterruptedException}.
227: * <p/>
228: * <p> If this thread is blocked in an I/O operation upon an {@link
229: * java.nio.channels.InterruptibleChannel </code>interruptible
230: * channel<code>} then the channel will be closed, the thread's interrupt
231: * status will be set, and the thread will receive a {@link
232: * java.nio.channels.ClosedByInterruptException}.
233: * <p/>
234: * <p> If this thread is blocked in a {@link java.nio.channels.Selector}
235: * then the thread's interrupt status will be set and it will return
236: * immediately from the selection operation, possibly with a non-zero
237: * value, just as if the selector's {@link
238: * java.nio.channels.Selector#wakeup wakeup} method were invoked.
239: * <p/>
240: * <p> If none of the previous conditions hold then this thread's interrupt
241: * status will be set. </p>
242: *
243: * @throws SecurityException if the current thread cannot modify this thread
244: */
245: public final void interrupt() {
246: closeSocket();
247: super .interrupt();
248: }
249:
250: private void closeSocket() {
251: try {
252: if (socket != null && !socket.isClosed()) {
253: try {
254: socket.leaveGroup(groupMulticastAddress);
255: } catch (IOException e) {
256: LOG
257: .error("Error leaving multicast group. Message was "
258: + e.getMessage());
259: }
260: socket.close();
261: }
262: } catch (NoSuchMethodError e) {
263: LOG.debug("socket.isClosed is not supported by JDK1.3");
264: try {
265: socket.leaveGroup(groupMulticastAddress);
266: } catch (IOException ex) {
267: LOG
268: .error("Error leaving multicast group. Message was "
269: + ex.getMessage());
270: }
271: socket.close();
272: }
273: }
274:
275: }
276:
277: /**
278: * Sets the heartbeat interval to something other than the default of 5000ms. This is useful for testing,
279: * but not recommended for production. This method is static and so affects the heartbeat interval of all
280: * senders. The change takes effect after the next scheduled heartbeat.
281: *
282: * @param heartBeatInterval a time in ms, greater than 1000
283: */
284: public static void setHeartBeatInterval(long heartBeatInterval) {
285: if (heartBeatInterval < MINIMUM_HEARTBEAT_INTERVAL) {
286: LOG
287: .warn("Trying to set heartbeat interval too low. Using MINIMUM_HEARTBEAT_INTERVAL instead.");
288: MulticastKeepaliveHeartbeatSender.heartBeatInterval = MINIMUM_HEARTBEAT_INTERVAL;
289: } else {
290: MulticastKeepaliveHeartbeatSender.heartBeatInterval = heartBeatInterval;
291: }
292: }
293:
294: /**
295: * Returns the heartbeat interval.
296: */
297: public static long getHeartBeatInterval() {
298: return heartBeatInterval;
299: }
300:
301: /**
302: * @return the TTL
303: */
304: public Integer getTimeToLive() {
305: return timeToLive;
306: }
307: }
|