001: /*
002: * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright
003: * notice. All rights reserved.
004: */
005: package com.tc.process;
006:
007: import java.io.BufferedReader;
008: import java.io.IOException;
009: import java.io.InputStreamReader;
010: import java.io.PrintWriter;
011: import java.net.ServerSocket;
012: import java.net.Socket;
013: import java.net.SocketException;
014: import java.util.ArrayList;
015: import java.util.Arrays;
016: import java.util.Date;
017: import java.util.Iterator;
018: import java.util.List;
019:
020: /**
021: * Creates a connection between a parent java process and it's child process.
022: * <p>
023: * SCENARIO: When a parent process creates a child the parent may monitor the child using various hooks. If the parent
024: * process itself dies unexpectedly (kill -9) the child process will remain alive unaware of it's parents fate.
025: * <p>
026: * This class is able to create a server thread on the parent and a watchdog thread on the child which periodically
027: * pages it's parent to make sure it's still alive. If the parent's heartbeat flatlines, the child's watchdog thread
028: * will call <tt>System.exit(0)</tt>.
029: */
030: public final class LinkedJavaProcessPollingAgent {
031:
032: private static final int NORMAL_HEARTBEAT_INTERVAL = 15 * 1000;
033:
034: private static final String HEARTBEAT = "HEARTBEAT";
035: private static final String SHUTDOWN = "SHUTDOWN";
036: private static final String ARE_YOU_ALIVE = "ARE_YOU_ALIVE";
037:
038: private static final int MAX_HEARTBEAT_DELAY = 2 * NORMAL_HEARTBEAT_INTERVAL;
039: private static final int EXIT_CODE = 42;
040: private static HeartbeatServer server = null;
041: private static PingThread client = null;
042:
043: public static synchronized void startHeartBeatServer() {
044: if (server == null) {
045: server = new HeartbeatServer();
046: server.start();
047: }
048: }
049:
050: public static synchronized boolean isServerRunning() {
051: if (server == null) {
052: return false;
053: }
054: return server.isRunning();
055: }
056:
057: /**
058: * Creates a server thread in the parent process posting a periodic heartbeat.
059: *
060: * @return server port - must be passed to {@link startClientWatchdogService()}
061: */
062: public static synchronized int getChildProcessHeartbeatServerPort() {
063: if (server == null)
064: throw new IllegalStateException(
065: "Heartbeat Server has not started!");
066: return server.getPort();
067: }
068:
069: /**
070: * Creates a watchdog service thread in the child process which receives a heartbeart from the parent process.
071: *
072: * @param pingPort - this must come from {@link getChildProcessHeartbeatServerPort()}
073: * @param childClass - used for debugging
074: * @param honorShutdownMsg - false, will ignore the destroy() method and keep this client alive after the shutdown
075: * message is broadcast
076: */
077: public static synchronized void startClientWatchdogService(
078: int pingPort, String childClass, boolean honorShutdownMsg) {
079: if (client == null) {
080: client = new PingThread(pingPort, childClass,
081: honorShutdownMsg);
082: client.start();
083: System.err.println("Child-process watchdog for class "
084: + childClass + " monitoring server on port: "
085: + pingPort);
086: }
087: }
088:
089: public static synchronized void startClientWatchdogService(
090: int pingPort, String childClass) {
091: startClientWatchdogService(pingPort, childClass, false);
092: }
093:
094: /**
095: * Shutdown heartbeat server and send a kill signal to child processes
096: */
097: public static synchronized void shutdown() {
098: if (server != null) {
099: server.shutdown();
100: server = null;
101: }
102: }
103:
104: public static synchronized boolean isAnyAppServerAlive() {
105: if (server == null)
106: return false;
107: return server.isAnyAppServerAlive();
108: }
109:
110: private static synchronized void log(String msg) {
111: System.out.println("LJP: [" + new Date() + "] " + msg);
112: }
113:
114: static void reallySleep(long millis) {
115: try {
116: long millisLeft = millis;
117: while (millisLeft > 0) {
118: long start = System.currentTimeMillis();
119: Thread.sleep(millisLeft);
120: millisLeft -= System.currentTimeMillis() - start;
121: }
122: } catch (InterruptedException ie) {
123: throw new AssertionError(ie);
124: }
125: }
126:
127: private static class PingThread extends Thread {
128: private final int pingPort;
129: private final String forClass;
130: private boolean honorShutdownMsg;
131: private BufferedReader in;
132: private PrintWriter out;
133:
134: public PingThread(int port, String forClass,
135: boolean honorShutdownMsg) {
136: this (port, forClass);
137: this .honorShutdownMsg = honorShutdownMsg;
138: }
139:
140: public PingThread(int port, String forClass) {
141: if (!(port > 0))
142: throw new RuntimeException("Port not > 0");
143: if (forClass.trim().length() == 0)
144: throw new RuntimeException("blank argument");
145:
146: this .pingPort = port;
147: this .forClass = forClass;
148:
149: this .setDaemon(true);
150: }
151:
152: public void run() {
153: int port = -1;
154: Socket toServer = null;
155: try {
156: toServer = new Socket("localhost", this .pingPort);
157: toServer.setSoTimeout(MAX_HEARTBEAT_DELAY);
158:
159: port = toServer.getLocalPort();
160:
161: in = new BufferedReader(new InputStreamReader(toServer
162: .getInputStream()));
163: out = new PrintWriter(toServer.getOutputStream(), true);
164:
165: while (true) {
166: long start = System.currentTimeMillis();
167:
168: String data = in.readLine();
169: if (HEARTBEAT.equals(data)) {
170: log("Got heartbeat for main class "
171: + this .forClass);
172: } else if (SHUTDOWN.equals(data)) {
173: if (!honorShutdownMsg)
174: continue;
175: log("Client received shutdown message from server. Shutting Down...");
176: System.exit(0);
177: } else if (ARE_YOU_ALIVE.equals(data)) {
178: out.println(forClass);
179: out.flush();
180: } else {
181: throw new Exception("Doesn't recognize data: "
182: + data);
183: }
184:
185: long elapsed = System.currentTimeMillis() - start;
186: if (elapsed > MAX_HEARTBEAT_DELAY) {
187: throw new Exception(
188: "Client took too long to response.");
189: }
190: }
191: } catch (Exception e) {
192: log(e.getClass() + ": "
193: + Arrays.asList(e.getStackTrace()));
194: log("Didn't get heartbeat for at least "
195: + MAX_HEARTBEAT_DELAY
196: + " milliseconds. Killing self (port " + port
197: + ").");
198: } finally {
199: log("Ping thread exiting port (" + port + ")");
200: if (toServer != null) {
201: try {
202: toServer.close();
203: } catch (IOException e) {
204: throw new RuntimeException(e);
205: }
206: }
207: System.exit(EXIT_CODE);
208: }
209: }
210: }
211:
212: private static class HeartbeatServer extends Thread {
213: private int port;
214: private List heartBeatThreads = new ArrayList();
215: private ServerSocket serverSocket = null;
216: private boolean running = false;
217: private volatile boolean isStarting = false;
218:
219: public HeartbeatServer() {
220: this .port = -1;
221: this .setDaemon(true);
222: }
223:
224: public synchronized boolean isAnyAppServerAlive() {
225: boolean foundAlive = false;
226: synchronized (heartBeatThreads) {
227: for (Iterator it = heartBeatThreads.iterator(); it
228: .hasNext();) {
229: HeartbeatThread hb = (HeartbeatThread) it.next();
230: boolean aliveStatus = hb.isAppServerAlive();
231: log("pinging: " + hb.port + ", alive? = "
232: + aliveStatus);
233: foundAlive = foundAlive || aliveStatus;
234: }
235: }
236: return foundAlive;
237: }
238:
239: public synchronized int getPort() {
240: while (port == -1) {
241: try {
242: this .wait(5000);
243: } catch (InterruptedException e) {
244: throw new RuntimeException(
245: "Server might have not started yet", e);
246: }
247: }
248: return port;
249: }
250:
251: public synchronized boolean isRunning() {
252: while (isStarting) {
253: try {
254: this .wait();
255: } catch (InterruptedException e) {
256: throw new RuntimeException(e);
257: }
258: }
259: return running;
260: }
261:
262: public synchronized void setRunning(boolean status) {
263: running = status;
264: }
265:
266: private synchronized void shutdown() {
267: setRunning(false);
268:
269: if (serverSocket != null) {
270: try {
271: serverSocket.close(); // this effectively interrupts the thread and force it to exit
272: } catch (IOException e) {
273: throw new RuntimeException(e);
274: }
275: }
276:
277: synchronized (heartBeatThreads) {
278: HeartbeatThread ht;
279: for (Iterator i = heartBeatThreads.iterator(); i
280: .hasNext();) {
281: ht = (HeartbeatThread) i.next();
282: ht.sendKillSignal();
283: }
284: }
285: }
286:
287: public void run() {
288:
289: try {
290: isStarting = true;
291: synchronized (this ) {
292: serverSocket = new ServerSocket(0);
293: this .port = serverSocket.getLocalPort();
294: setRunning(true);
295: isStarting = false;
296: this .notifyAll();
297: }
298:
299: System.err
300: .println("Child-process heartbeat server started on port: "
301: + port);
302:
303: while (true) {
304: Socket sock = serverSocket.accept();
305: log("Got heartbeat connection from client; starting heartbeat.");
306: synchronized (heartBeatThreads) {
307: HeartbeatThread hbt = new HeartbeatThread(sock);
308: heartBeatThreads.add(hbt);
309: hbt.start();
310: }
311: }
312: } catch (Exception e) {
313: if (!running)
314: log("Heartbeat server was shutdown.");
315: else
316: log("Got expcetion in heartbeat server: "
317: + e.getMessage());
318: } finally {
319: setRunning(false);
320: log("Heartbeat server terminated.");
321: }
322: }
323: }
324:
325: private static class HeartbeatThread extends Thread {
326: private final Socket socket;
327: private final int port;
328:
329: private BufferedReader in;
330: private PrintWriter out;
331:
332: public HeartbeatThread(Socket socket) {
333: if (socket == null)
334: throw new NullPointerException();
335: this .socket = socket;
336: try {
337: this .socket.setSoTimeout(MAX_HEARTBEAT_DELAY);
338: } catch (SocketException e) {
339: throw new RuntimeException(e);
340: }
341: this .port = socket.getPort();
342: this .setDaemon(true);
343: }
344:
345: public synchronized void sendKillSignal() {
346: try {
347: out.println(SHUTDOWN);
348: out.flush();
349: } catch (Exception e) {
350: log("Socket Exception: client may have already shutdown.");
351: }
352: }
353:
354: public boolean isAppServerAlive() {
355: try {
356: log("sending ARE_YOU_ALIVE...");
357: out.println(ARE_YOU_ALIVE);
358: out.flush();
359: String result = in.readLine();
360: log("received: " + result);
361: if (result == null || result.endsWith("TCServerMain")) {
362: // not an apserver
363: return false;
364: } else {
365: return true;
366: }
367: } catch (IOException e) {
368: log("got exception: " + e.getMessage());
369: return false;
370: }
371: }
372:
373: public void run() {
374: try {
375: out = new PrintWriter(this .socket.getOutputStream(),
376: true);
377: in = new BufferedReader(new InputStreamReader(
378: this .socket.getInputStream()));
379:
380: while (true) {
381: synchronized (this ) {
382: out.println(HEARTBEAT);
383: }
384: reallySleep(NORMAL_HEARTBEAT_INTERVAL);
385: }
386: } catch (SocketException e) {
387: log("Socket Exception: client may have already shutdown.");
388: log(e.getClass() + ": "
389: + Arrays.asList(e.getStackTrace()));
390: } catch (Exception e) {
391: log("Heartbeat thread for child process (port " + port
392: + ") got exception");
393: log(e.getClass() + ": "
394: + Arrays.asList(e.getStackTrace()));
395: } finally {
396: log("Heartbeat thread for child process (port " + port
397: + ") terminating.");
398: try {
399: socket.close();
400: } catch (IOException e) {
401: throw new RuntimeException(e);
402: }
403: }
404: }
405: }
406: }
|