001: /**
002: * Sequoia: Database clustering technology.
003: * Copyright (C) 2006 Continuent, Inc.
004: * Contact: sequoia@continuent.org
005: *
006: * Licensed under the Apache License, Version 2.0 (the "License");
007: * you may not use this file except in compliance with the License.
008: * You may obtain a copy of the License at
009: *
010: * http://www.apache.org/licenses/LICENSE-2.0
011: *
012: * Unless required by applicable law or agreed to in writing, software
013: * distributed under the License is distributed on an "AS IS" BASIS,
014: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015: * See the License for the specific language governing permissions and
016: * limitations under the License.
017: *
018: * Initial developer(s): Gilles Rayrat.
019: * Contributor(s): Marc Herbert
020: */package org.continuent.sequoia.driver;
021:
022: import java.io.IOException;
023: import java.net.InetSocketAddress;
024: import java.nio.ByteBuffer;
025: import java.nio.channels.DatagramChannel;
026: import java.nio.channels.SelectionKey;
027: import java.nio.channels.Selector;
028: import java.util.Date;
029: import java.util.Iterator;
030:
031: import org.continuent.sequoia.common.util.Constants;
032:
033: /**
034: * Main class for controller ping monitoring.<br>
035: * Launches a controller pinger thread and creates a list of controllers with
036: * their states. Reads controllers responses (pongs) and updates state
037: * accordingly
038: *
039: * @author <a href="mailto:gilles.rayrat@continuent.com">Gilles Rayrat</a>
040: * @author <a href="mailto:marc.herbert@continuent.com">Marc Herbert</a>
041: * @version 1.0
042: */
043: public class ControllerWatcher extends Thread {
044: /** List of controllers to send pings to */
045: final WatchedControllers controllers;
046: /** Thread that sends pings */
047: final ControllerPingSender pinger;
048: /** Selector for incomming pings */
049: final Selector selector;
050: /** Level of logging (logs are printed to stdout, see {@link SequoiaUrl}) */
051: final int logLevel;
052: /**
053: * Time to wait between 2 successive pings to a controller, also used for
054: * select timeout
055: */
056: final int pingDelayInMs;
057: /**
058: * Delay after which a controller will be considered as failing if it did not
059: * respond to pings. Used here as a timeout for select()
060: */
061: final int controllerTimeout;
062:
063: private boolean terminated = false;
064:
065: /**
066: * Creates a controller watcher with the given controllers to watch.<br>
067: * Pings will be sent to all given controllers
068: *
069: * @param controllerList controllers to ping
070: * @param callback Callback implementation to call when a controller state
071: * changes
072: * @param pingDelayInMs time to wait between two successive pings
073: * @param controllerTimeout delay after which a controller will be considered
074: * as failing if it did not respond to pings
075: * @param logLevel level of logging to use (logs are printed to stdout)
076: * @exception IOException if the socket selector could not be opened
077: */
078: public ControllerWatcher(ControllerInfo[] controllerList,
079: ControllerStateChangedCallback callback, int pingDelayInMs,
080: int controllerTimeout, int logLevel) throws IOException {
081: this (new WatchedControllers(controllerList, System
082: .currentTimeMillis(), controllerTimeout, callback),
083: pingDelayInMs, controllerTimeout, logLevel);
084: }
085:
086: private ControllerWatcher(WatchedControllers watchedControllers,
087: int pingDelayInMs, int controllerTimeout, int logLevel)
088: throws IOException
089:
090: {
091: super ("ControllerWatcher");
092:
093: this .controllers = watchedControllers;
094: this .pingDelayInMs = pingDelayInMs;
095: this .controllerTimeout = controllerTimeout;
096: this .logLevel = logLevel;
097: this .selector = Selector.open();
098:
099: this .pinger = new ControllerPingSender(this , this .controllers,
100: newRegisteredChannel(), pingDelayInMs, logLevel);
101: this .pinger.setDaemon(true);
102:
103: }
104:
105: /**
106: * @return a new DatagramChannel configured and registered at this.selector
107: */
108: private DatagramChannel newRegisteredChannel() throws IOException {
109: // Register channel
110: // Open the channel, set it to non-blocking, initiate connect
111: DatagramChannel channel = DatagramChannel.open();
112: channel.configureBlocking(false);
113: channel.register(this .selector, SelectionKey.OP_READ);
114: return channel;
115:
116: }
117:
118: private void processAnswers() throws IOException {
119: long timeAnswerWasReceived = System.currentTimeMillis();
120: for (Iterator i = selector.selectedKeys().iterator(); i
121: .hasNext();) {
122: // Retrieve the next key and remove it from the set
123: SelectionKey sk = (SelectionKey) i.next();
124: i.remove();
125: DatagramChannel channel = (DatagramChannel) sk.channel();
126: ByteBuffer buf = ByteBuffer.allocate(1);
127:
128: InetSocketAddress addr = (InetSocketAddress) channel
129: .receive(buf);
130: if (addr != null) {
131: // creates the controller info from the received SocketAddress
132: ControllerInfo from = new ControllerInfo(addr);
133: byte data[] = buf.array();
134: if (data[0] == Constants.CONTROLLER_PING_VERSION) {
135: controllers.setControllerResponsed(from,
136: timeAnswerWasReceived);
137: } else if (logLevel >= SequoiaUrl.DEBUG_LEVEL_DEBUG) {
138: System.out
139: .println(new Date()
140: + " "
141: + this
142: + ": controller "
143: + from
144: + " ping protocol does not match ours, was: "
145: + data[0] + ", expected:"
146: + Constants.CONTROLLER_PING_VERSION);
147: }
148: }
149: }
150: }
151:
152: /**
153: * Starts watching controllers.<br>
154: * Reads ping responses from controllers and updates corresponding controller
155: * data in the <code>ControllerList</code>.
156: *
157: * @see java.lang.Thread#run()
158: */
159: public void run() {
160: if (logLevel >= SequoiaUrl.DEBUG_LEVEL_DEBUG)
161: System.out.println(new Date() + " " + this + " started.");
162:
163: // Start ping
164: pinger.start();
165: while (!terminated) {
166: try {
167: int nbOfAnswers = selector.select(2 * pingDelayInMs);
168: if (nbOfAnswers > 0)
169: processAnswers();
170: // This will start callbacks according to new states
171: controllers.lookForDeadControllers(System
172: .currentTimeMillis());
173: } catch (IOException ioe) {
174: System.err
175: .println(new Date()
176: + " "
177: + this
178: + ": Error while reading controller answers to ping");
179: ioe.printStackTrace(System.err);
180:
181: restartBothDaemons();
182: } // try
183: } // while (!terminated)
184:
185: if (logLevel >= SequoiaUrl.DEBUG_LEVEL_DEBUG)
186: System.out.println(new Date() + " " + this + " terminated");
187:
188: }
189:
190: /**
191: * Forces a given controller to be considered as not responding
192: *
193: * @see WatchedControllers#setControllerDown(ControllerInfo)
194: */
195: public void forceControllerDown(ControllerInfo c) {
196: controllers.setControllerDown(c);
197: }
198:
199: /**
200: * Returns the given controllers index in the original list
201: *
202: * @param controller the controller to get index of
203: * @return original index of the given controller
204: */
205: public int getOriginalIndexOf(ControllerInfo controller) {
206: return controllers.getOriginalIndexOf(controller);
207: }
208:
209: /**
210: * Terminates both pinger and watcher thread TODO: terminateBoth definition.
211: */
212: public void terminateBoth() {
213: pinger.terminate();
214: terminated = true;
215: try {
216: // workaround for SEQUOIA-942 (Selector.open() leaks file descriptors)
217: selector.close();
218: } catch (IOException e) {
219: e.printStackTrace(System.err); // ignore
220: }
221: }
222:
223: void restartBothDaemons() {
224: synchronized (this ) {
225: // don't restart multiple times
226: if (terminated)
227: return;
228:
229: // tell old threads (including Thread.this!) to die ASAP
230: this .terminateBoth();
231: }
232:
233: boolean failed = true;
234:
235: // just for logging
236: ControllerWatcher newWatcher = null; // compiler is not clever enough
237:
238: while (failed) {
239: try
240: // to start over with 2 brand new threads (and thus a brand new,
241: // non-interrupted, non-closed channel)
242: {
243: Thread.sleep(pingDelayInMs);
244: newWatcher = new ControllerWatcher(controllers,
245: pingDelayInMs, controllerTimeout, logLevel);
246: newWatcher.start();
247: failed = false; // success, exit try loop
248: } catch (InterruptedException ie) {
249: // loop over
250: ie.printStackTrace(System.err);
251: } catch (IOException e) {
252: // loop over
253: e.printStackTrace(System.err);
254: }
255: }
256:
257: System.err.println(new Date() + " " + this
258: + " successfully restarted both daemons.");
259: System.err.println("New watcher = " + newWatcher);
260:
261: }
262:
263: public String toString() {
264: // Add our address, useful as an ID. Stolen from Object.toString()
265: return super .toString() + "@" + Integer.toHexString(hashCode())
266: + " " + " " + controllers;
267: }
268:
269: }
|