001: /**
002: * Sequoia: Database clustering technology.
003: * Copyright (C) 2006 Continuent, Inc.
004: * Contact: sequoia@continuent.org
005: *
006: * Licensed under the Apache License, Version 2.0 (the "License");
007: * you may not use this file except in compliance with the License.
008: * You may obtain a copy of the License at
009: *
010: * http://www.apache.org/licenses/LICENSE-2.0
011: *
012: * Unless required by applicable law or agreed to in writing, software
013: * distributed under the License is distributed on an "AS IS" BASIS,
014: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015: * See the License for the specific language governing permissions and
016: * limitations under the License.
017: *
018: * Initial developer(s): Gilles Rayrat.
019: * Contributor(s): Marc Herbert
020: */package org.continuent.sequoia.driver;
021:
022: import java.io.IOException;
023: import java.net.InetSocketAddress;
024: import java.nio.ByteBuffer;
025: import java.nio.channels.DatagramChannel;
026: import java.util.Date;
027: import java.util.Iterator;
028:
029: import org.continuent.sequoia.common.util.Constants;
030:
031: /**
032: * This thread sends a ping to all controllers in a given list at a given
033: * frequency
034: *
035: * @author <a href="mailto:gilles.rayrat@continuent.com">Gilles Rayrat</a>
036: * @author <a href="mailto:marc.herbert@continuent.com">Marc Herbert</a>
037: * @version 1.0
038: */
039: public class ControllerPingSender extends Thread {
040: /** List of controllers to send pings to */
041: final WatchedControllers controllers;
042: /** Time to wait between 2 successive pings to a controller */
043: final int pingDelayInMs;
044: /** Channel for outgoing pings */
045: final DatagramChannel sendChannel;
046: /** Packet sent as a ping */
047: final ByteBuffer pingPacket = ByteBuffer.allocate(1);
048: /** Level of logging (logs are printed to stdout, see {@link SequoiaUrl}) */
049: final int logLevel;
050:
051: private final ControllerWatcher fatherWatcher;
052:
053: private boolean terminated = false;
054:
055: /**
056: * Creates a new <code>ControllerPingSender</code> on the given controller
057: * list with the given frequency
058: *
059: * @param controllerList controllers to ping. Should never be null
060: * @param channel <code>DatagramChannel</code> to use for sending pings
061: * @param pingDelayInMs time to wait between two successive pings
062: * @param logLevel level of logging to use (logs are printed to stdout)
063: */
064: public ControllerPingSender(ControllerWatcher father,
065: WatchedControllers controllerList, DatagramChannel channel,
066: int pingDelayInMs, int logLevel) {
067: super ("ControllerPingSender");
068: this .fatherWatcher = father;
069: this .controllers = controllerList;
070: this .pingDelayInMs = pingDelayInMs;
071: this .logLevel = logLevel;
072: // sendChannel = new DatagramSocket();
073: sendChannel = channel;
074: pingPacket.put(Constants.CONTROLLER_PING_VERSION);
075: }
076:
077: /**
078: * Starts pinging controllers.<br>
079: * Controllers will be ping with an interval of at least pingDelayInMs. But as
080: * this function is synchronized on the controller list it can be slowed down
081: * if controllers are added/removed, raising up the ping delay
082: *
083: * @see java.lang.Thread#run()
084: */
085: public void run() {
086: if (logLevel >= SequoiaUrl.DEBUG_LEVEL_DEBUG)
087: System.out.println(new Date() + " " + this + " started.");
088:
089: long oldTime = System.currentTimeMillis();
090:
091: while (!terminated) {
092: try {
093: for (Iterator it = controllers.getControllerIterator(); it
094: .hasNext();) {
095: ControllerInfo ctrl = (ControllerInfo) it.next();
096: sendPingTo(ctrl);
097: }
098: // Wait for pingDelayInMs milliseconds before next ping
099: sleep(pingDelayInMs);
100: // detect violent clock changes (user hardly set new date/time)
101: long newTime = System.currentTimeMillis();
102: long timeShift = newTime - (oldTime + pingDelayInMs);
103: if (Math.abs(timeShift) > fatherWatcher.controllerTimeout / 2) {
104: System.err.println(timeShift / 1000
105: + "s time shift detected, from "
106: + new Date(oldTime) + " to "
107: + new Date(newTime));
108: System.err
109: .println("Brutal changes of date/time can lead to erroneous controller failure detections!");
110: }
111: oldTime = newTime;
112: } catch (IOException e) {
113: System.err
114: .println(new Date()
115: + " "
116: + this
117: + " IOException, trying to restart both daemons");
118: e.printStackTrace(System.err);
119: fatherWatcher.restartBothDaemons();
120: } catch (InterruptedException e) {
121: System.err
122: .println(new Date()
123: + " "
124: + this
125: + " InterruptedException, trying to restart both daemons");
126: e.printStackTrace(System.err);
127: fatherWatcher.restartBothDaemons();
128: } // try
129: } // while (!terminated)
130:
131: if (logLevel >= SequoiaUrl.DEBUG_LEVEL_DEBUG)
132: System.out.println(new Date() + " " + this + " terminated");
133:
134: }
135:
136: private void sendPingTo(ControllerInfo ctrl) throws IOException {
137: pingPacket.rewind();
138: sendChannel.send(pingPacket, ctrl);
139: }
140:
141: void terminate() {
142: terminated = true;
143: }
144:
145: public String toString() {
146: // Add our address, useful as an ID. Stolen from Object.toString()
147: return super .toString() + "@" + Integer.toHexString(hashCode())
148: + " " + controllers;
149: }
150: }
|