001: /*
002: * All content copyright (c) 2003-2007 Terracotta, Inc., except as may otherwise be noted in a separate copyright
003: * notice. All rights reserved.
004: */
005: package com.tc.process;
006:
007: import java.io.BufferedReader;
008: import java.io.IOException;
009: import java.io.InputStreamReader;
010: import java.io.PrintWriter;
011: import java.net.ServerSocket;
012: import java.net.Socket;
013: import java.net.SocketTimeoutException;
014: import java.text.DateFormat;
015: import java.text.SimpleDateFormat;
016: import java.util.ArrayList;
017: import java.util.Date;
018: import java.util.Iterator;
019: import java.util.List;
020:
021: public class HeartBeatServer {
022: public static final String PULSE = "PULSE";
023: public static final String KILL = "KILL";
024: public static final String IS_APP_SERVER_ALIVE = "IS_APP_SERVER_ALIVE";
025: public static final String IM_ALIVE = "IM_ALIVE";
026: public static final int PULSE_INTERVAL = 30 * 1000;
027: private static DateFormat DATEFORMAT = new SimpleDateFormat(
028: "HH:mm:ss.SSS");
029:
030: private ListenThread listenThread;
031: // @GuardBy(this)
032: private final List heartBeatThreads = new ArrayList();
033:
034: public HeartBeatServer() {
035: //
036: }
037:
038: public static void log(String message) {
039: System.out.println(DATEFORMAT.format(new Date())
040: + " - HeartBeatServer: " + message);
041: }
042:
043: public void start() {
044: if (listenThread == null) {
045: listenThread = new ListenThread(this );
046: listenThread.setDaemon(true);
047: listenThread.start();
048: }
049: }
050:
051: public void shutdown() {
052: try {
053: listenThread.shutdown();
054: listenThread.join();
055: listenThread = null;
056: } catch (InterruptedException ignored) {
057: // nop
058: }
059: sendKillSignalToChildren();
060: }
061:
062: public synchronized void sendKillSignalToChildren() {
063: for (Iterator it = heartBeatThreads.iterator(); it.hasNext();) {
064: HeartBeatThread hb = (HeartBeatThread) it.next();
065: hb.sendKillSignal();
066: }
067: heartBeatThreads.clear();
068: }
069:
070: public synchronized boolean anyAppServerAlive() {
071: boolean alive = false;
072: for (Iterator it = heartBeatThreads.iterator(); it.hasNext();) {
073: HeartBeatThread hb = (HeartBeatThread) it.next();
074: alive = alive || hb.pingAppServer();
075: }
076: return alive;
077: }
078:
079: public synchronized void removeDeadClient(HeartBeatThread thread) {
080: log("Removed dead client: " + thread.getName());
081: heartBeatThreads.remove(thread);
082: }
083:
084: public synchronized void addThread(HeartBeatThread hb) {
085: heartBeatThreads.add(hb);
086: }
087:
088: public int listeningPort() {
089: if (!listenThread.isAlive())
090: throw new IllegalStateException(
091: "Heartbeat server has not started");
092: return listenThread.listeningPort();
093: }
094:
095: private static class ListenThread extends Thread {
096: private ServerSocket serverSocket;
097: private int listeningPort = -1;
098: private boolean isShutdown = false;
099: private HeartBeatServer server;
100:
101: public ListenThread(HeartBeatServer server) {
102: this .server = server;
103: }
104:
105: public void shutdown() {
106: try {
107: isShutdown = true;
108: serverSocket.close();
109: } catch (IOException ignored) {
110: // nop
111: }
112: }
113:
114: public void run() {
115: try {
116: synchronized (this ) {
117: isShutdown = false;
118: serverSocket = new ServerSocket(0);
119: listeningPort = serverSocket.getLocalPort();
120: this .notifyAll();
121: }
122: log("Heartbeat server is online...");
123: Socket clientSocket;
124: while ((clientSocket = serverSocket.accept()) != null) {
125: HeartBeatThread hb = new HeartBeatThread(server,
126: clientSocket);
127: hb.setDaemon(true);
128: hb.start();
129: server.addThread(hb);
130: }
131: } catch (Exception e) {
132: if (isShutdown) {
133: log("Heartbeat server is shutdown");
134: } else {
135: throw new RuntimeException(e);
136: }
137: }
138: }
139:
140: public int listeningPort() {
141: synchronized (this ) {
142: while (listeningPort == -1) {
143: try {
144: this .wait(5000);
145: } catch (InterruptedException e) {
146: throw new RuntimeException(e);
147: }
148: }
149: }
150: return listeningPort;
151: }
152: }
153:
154: private static class HeartBeatThread extends Thread {
155: private Socket socket;
156: private BufferedReader in;
157: private PrintWriter out;
158: private HeartBeatServer server;
159: private boolean killed = false;
160: private String clientName;
161: private int missedPulseCount = 0;
162:
163: public HeartBeatThread(HeartBeatServer server, Socket s) {
164: this .server = server;
165: socket = s;
166: try {
167: socket.setSoTimeout(PULSE_INTERVAL / 2);
168: socket.setTcpNoDelay(true);
169:
170: in = new BufferedReader(new InputStreamReader(socket
171: .getInputStream()));
172: out = new PrintWriter(socket.getOutputStream(), true);
173: } catch (Exception e) {
174: throw new RuntimeException(e);
175: }
176: }
177:
178: public void run() {
179: try {
180: // read clientName
181: clientName = in.readLine();
182: this .setName(clientName);
183: log("got new client: " + clientName);
184:
185: while (true) {
186: log("send pulse to client: " + clientName);
187: out.println(PULSE);
188: try {
189: String reply = in.readLine();
190: if (reply == null) {
191: throw new Exception(
192: "read-half of socket closed.");
193: }
194: missedPulseCount = 0;
195: } catch (SocketTimeoutException toe) {
196: log("Client: " + clientName + " missed "
197: + (++missedPulseCount));
198: if (missedPulseCount >= 5) {
199: throw new Exception(
200: "Client missed 3 pulses... considered it dead.");
201: }
202: }
203: reallySleep(PULSE_INTERVAL);
204: }
205: } catch (Exception e) {
206: if (!killed) {
207: // only removed itself if client isn't being sent a kill signal
208: log("Dead client detected: " + clientName
209: + ". Exception message: " + e.getMessage());
210: server.removeDeadClient(this );
211: }
212: }
213: }
214:
215: public void sendKillSignal() {
216: try {
217: killed = true;
218: out.println(KILL);
219: socket.close();
220: } catch (Exception e) {
221: // ignored - considered killed
222: }
223: }
224:
225: public boolean pingAppServer() {
226: boolean alive = false;
227: try {
228: out.println(IS_APP_SERVER_ALIVE);
229: String reply = in.readLine();
230: if (reply != null && IM_ALIVE.equals(reply)) {
231: alive = true;
232: }
233: } catch (Exception e) {
234: // ignore - dead anyway
235: }
236: return alive;
237: }
238:
239: }
240:
241: public static void reallySleep(long millis) {
242: try {
243: long millisLeft = millis;
244: while (millisLeft > 0) {
245: long start = System.currentTimeMillis();
246: Thread.sleep(millisLeft);
247: millisLeft -= System.currentTimeMillis() - start;
248: }
249: } catch (InterruptedException ie) {
250: // nop
251: }
252: }
253: }
|