001: /*
002: * Copyright 2003-2006 Rick Knowles <winstone-devel at lists sourceforge net>
003: * Distributed under the terms of either:
004: * - the common development and distribution license (CDDL), v1.0; or
005: * - the GNU Lesser General Public License, v2.1 or later
006: */
007: package winstone.cluster;
008:
009: import java.io.IOException;
010: import java.io.InputStream;
011: import java.io.ObjectInputStream;
012: import java.io.ObjectOutputStream;
013: import java.io.OutputStream;
014: import java.net.ConnectException;
015: import java.net.Socket;
016: import java.util.ArrayList;
017: import java.util.Collection;
018: import java.util.Date;
019: import java.util.HashSet;
020: import java.util.Hashtable;
021: import java.util.Iterator;
022: import java.util.List;
023: import java.util.Map;
024: import java.util.Set;
025: import java.util.StringTokenizer;
026:
027: import winstone.Cluster;
028: import winstone.HostConfiguration;
029: import winstone.HostGroup;
030: import winstone.Logger;
031: import winstone.WebAppConfiguration;
032: import winstone.WinstoneResourceBundle;
033: import winstone.WinstoneSession;
034:
035: /**
036: * Represents a cluster of winstone containers.
037: *
038: * @author <a href="mailto:rick_knowles@hotmail.com">Rick Knowles</a>
039: * @version $Id: SimpleCluster.java,v 1.8 2006/08/10 06:38:31 rickknowles Exp $
040: */
041: public class SimpleCluster implements Runnable, Cluster {
042: final int SESSION_CHECK_TIMEOUT = 100;
043: final int HEARTBEAT_PERIOD = 5000;
044: final int MAX_NO_OF_MISSING_HEARTBEATS = 3;
045: final byte NODELIST_DOWNLOAD_TYPE = (byte) '2';
046: final byte NODE_HEARTBEAT_TYPE = (byte) '3';
047:
048: public static final WinstoneResourceBundle CLUSTER_RESOURCES = new WinstoneResourceBundle(
049: "winstone.cluster.LocalStrings");
050: private int controlPort;
051: private String initialClusterNodes;
052: private Map clusterAddresses;
053: private boolean interrupted;
054:
055: /**
056: * Builds a cluster instance
057: */
058: public SimpleCluster(Map args, Integer controlPort) {
059: this .interrupted = false;
060: this .clusterAddresses = new Hashtable();
061: if (controlPort != null)
062: this .controlPort = controlPort.intValue();
063:
064: // Start cluster init thread
065: this .initialClusterNodes = (String) args.get("clusterNodes");
066: Thread thread = new Thread(this , CLUSTER_RESOURCES
067: .getString("SimpleCluster.ThreadName"));
068: thread.setDaemon(true);
069: thread.setPriority(Thread.MIN_PRIORITY);
070: thread.start();
071: }
072:
073: public void destroy() {
074: this .interrupted = true;
075: }
076:
077: /**
078: * Send a heartbeat every now and then, and remove any nodes that haven't
079: * responded in 3 heartbeats.
080: */
081: public void run() {
082: // Ask each of the known addresses for their cluster lists, and build a
083: // set
084: if (this .initialClusterNodes != null) {
085: StringTokenizer st = new StringTokenizer(
086: this .initialClusterNodes, ",");
087: while (st.hasMoreTokens() && !interrupted)
088: askClusterNodeForNodeList(st.nextToken());
089: }
090:
091: Logger.log(Logger.DEBUG, CLUSTER_RESOURCES,
092: "SimpleCluster.InitNodes", ""
093: + this .clusterAddresses.size());
094:
095: while (!interrupted) {
096: try {
097: Set addresses = new HashSet(this .clusterAddresses
098: .keySet());
099: Date noHeartbeatDate = new Date(
100: System.currentTimeMillis()
101: - (MAX_NO_OF_MISSING_HEARTBEATS * HEARTBEAT_PERIOD));
102: for (Iterator i = addresses.iterator(); i.hasNext();) {
103: String ipPort = (String) i.next();
104:
105: Date lastHeartBeat = (Date) this .clusterAddresses
106: .get(ipPort);
107: if (lastHeartBeat.before(noHeartbeatDate)) {
108: this .clusterAddresses.remove(ipPort);
109: Logger.log(Logger.FULL_DEBUG,
110: CLUSTER_RESOURCES,
111: "SimpleCluster.RemovingNode", ipPort);
112: }
113:
114: // Send heartbeat
115: else
116: sendHeartbeat(ipPort);
117:
118: }
119: Thread.sleep(HEARTBEAT_PERIOD);
120: } catch (Throwable err) {
121: Logger.log(Logger.ERROR, CLUSTER_RESOURCES,
122: "SimpleCluster.ErrorMonitorThread", err);
123: }
124: }
125: Logger.log(Logger.FULL_DEBUG, CLUSTER_RESOURCES,
126: "SimpleCluster.FinishedMonitorThread");
127: }
128:
129: /**
130: * Check if the other nodes in this cluster have a session for this
131: * sessionId.
132: *
133: * @param sessionId The id of the session to check for
134: * @return A valid session instance
135: */
136: public WinstoneSession askClusterForSession(String sessionId,
137: WebAppConfiguration webAppConfig) {
138: // Iterate through the cluster members
139: Collection addresses = new ArrayList(clusterAddresses.keySet());
140: Collection searchThreads = new ArrayList();
141: for (Iterator i = addresses.iterator(); i.hasNext();) {
142: String ipPort = (String) i.next();
143: ClusterSessionSearch search = new ClusterSessionSearch(
144: webAppConfig.getContextPath(), webAppConfig
145: .getOwnerHostname(), sessionId, ipPort,
146: this .controlPort);
147: searchThreads.add(search);
148: }
149:
150: // Wait until we get an answer
151: WinstoneSession answer = null;
152: String senderThread = null;
153: boolean finished = false;
154: while (!finished) {
155: // Loop through all search threads. If finished, exit, otherwise
156: // sleep
157: List finishedThreads = new ArrayList();
158: for (Iterator i = searchThreads.iterator(); i.hasNext();) {
159: ClusterSessionSearch searchThread = (ClusterSessionSearch) i
160: .next();
161: if (!searchThread.isFinished())
162: continue;
163: else if (searchThread.getResult() == null)
164: finishedThreads.add(searchThread);
165: else {
166: answer = searchThread.getResult();
167: senderThread = searchThread.getAddressPort();
168: }
169: }
170:
171: // Remove finished threads
172: for (Iterator i = finishedThreads.iterator(); i.hasNext();)
173: searchThreads.remove(i.next());
174:
175: if (searchThreads.isEmpty() || (answer != null))
176: finished = true;
177: else
178: try {
179: Thread.sleep(100);
180: } catch (InterruptedException err) {
181: }
182: }
183:
184: // Once we have an answer, terminate all search threads
185: for (Iterator i = searchThreads.iterator(); i.hasNext();) {
186: ClusterSessionSearch searchThread = (ClusterSessionSearch) i
187: .next();
188: searchThread.destroy();
189: }
190: if (answer != null) {
191: answer.activate(webAppConfig);
192: Logger.log(Logger.DEBUG, CLUSTER_RESOURCES,
193: "SimpleCluster.SessionTransferredFrom",
194: senderThread);
195: }
196: return answer;
197: }
198:
199: /**
200: * Given an address, retrieve the list of cluster nodes and initialise dates
201: *
202: * @param address The address to request a node list from
203: */
204: private void askClusterNodeForNodeList(String address) {
205: try {
206: int colonPos = address.indexOf(':');
207: String ipAddress = address.substring(0, colonPos);
208: String port = address.substring(colonPos + 1);
209: Socket clusterListSocket = new Socket(ipAddress, Integer
210: .parseInt(port));
211: this .clusterAddresses.put(clusterListSocket
212: .getInetAddress().getHostAddress()
213: + ":" + port, new Date());
214: InputStream in = clusterListSocket.getInputStream();
215: OutputStream out = clusterListSocket.getOutputStream();
216: out.write(NODELIST_DOWNLOAD_TYPE);
217: out.flush();
218:
219: // Write out the control port
220: ObjectOutputStream outControl = new ObjectOutputStream(out);
221: outControl.writeInt(this .controlPort);
222: outControl.flush();
223:
224: // For each node, add an entry to cluster nodes
225: ObjectInputStream inData = new ObjectInputStream(in);
226: int nodeCount = inData.readInt();
227: for (int n = 0; n < nodeCount; n++)
228: this .clusterAddresses.put(inData.readUTF(), new Date());
229:
230: inData.close();
231: outControl.close();
232: out.close();
233: in.close();
234: clusterListSocket.close();
235: } catch (ConnectException err) {
236: Logger.log(Logger.DEBUG, CLUSTER_RESOURCES,
237: "SimpleCluster.NoNodeListResponse", address);
238: } catch (Throwable err) {
239: Logger.log(Logger.ERROR, CLUSTER_RESOURCES,
240: "SimpleCluster.ErrorGetNodeList", address, err);
241: }
242: }
243:
244: /**
245: * Given an address, send a heartbeat
246: *
247: * @param address The address to request a node list from
248: */
249: private void sendHeartbeat(String address) {
250: try {
251: int colonPos = address.indexOf(':');
252: String ipAddress = address.substring(0, colonPos);
253: String port = address.substring(colonPos + 1);
254: Socket heartbeatSocket = new Socket(ipAddress, Integer
255: .parseInt(port));
256: OutputStream out = heartbeatSocket.getOutputStream();
257: out.write(NODE_HEARTBEAT_TYPE);
258: out.flush();
259: ObjectOutputStream outData = new ObjectOutputStream(out);
260: outData.writeInt(this .controlPort);
261: outData.close();
262: heartbeatSocket.close();
263: Logger.log(Logger.FULL_DEBUG, CLUSTER_RESOURCES,
264: "SimpleCluster.HeartbeatSent", address);
265: } catch (ConnectException err) {/* ignore - 3 fails, and we remove */
266: } catch (Throwable err) {
267: Logger.log(Logger.ERROR, CLUSTER_RESOURCES,
268: "SimpleCluster.HeartbeatError", address, err);
269: }
270: }
271:
272: /**
273: * Accept a control socket request related to the cluster functions and
274: * process the request.
275: *
276: * @param requestType A byte indicating the request type
277: * @param in Socket input stream
278: * @param outSocket output stream
279: * @param webAppConfig Instance of the web app
280: * @throws IOException
281: */
282: public void clusterRequest(byte requestType, InputStream in,
283: OutputStream out, Socket socket, HostGroup hostGroup)
284: throws IOException {
285: if (requestType == ClusterSessionSearch.SESSION_CHECK_TYPE)
286: handleClusterSessionRequest(socket, in, out, hostGroup);
287: else if (requestType == NODELIST_DOWNLOAD_TYPE)
288: handleNodeListDownloadRequest(socket, in, out);
289: else if (requestType == NODE_HEARTBEAT_TYPE)
290: handleNodeHeartBeatRequest(socket, in);
291: else
292: Logger.log(Logger.ERROR, CLUSTER_RESOURCES,
293: "SimpleCluster.UnknownRequest", ""
294: + (char) requestType);
295: }
296:
297: /**
298: * Handles incoming socket requests for session search
299: */
300: public void handleClusterSessionRequest(Socket socket,
301: InputStream in, OutputStream out, HostGroup hostGroup)
302: throws IOException {
303: // Read in a string for the sessionId
304: ObjectInputStream inControl = new ObjectInputStream(in);
305: int port = inControl.readInt();
306: String ipPortSender = socket.getInetAddress().getHostAddress()
307: + ":" + port;
308: String sessionId = inControl.readUTF();
309: String hostname = inControl.readUTF();
310: HostConfiguration hostConfig = hostGroup
311: .getHostByName(hostname);
312: String webAppPrefix = inControl.readUTF();
313: WebAppConfiguration webAppConfig = hostConfig
314: .getWebAppByURI(webAppPrefix);
315: ObjectOutputStream outData = new ObjectOutputStream(out);
316: if (webAppConfig == null) {
317: outData.writeUTF(ClusterSessionSearch.SESSION_NOT_FOUND);
318: } else {
319: WinstoneSession session = webAppConfig.getSessionById(
320: sessionId, true);
321: if (session != null) {
322: outData.writeUTF(ClusterSessionSearch.SESSION_FOUND);
323: outData.writeObject(session);
324: outData.flush();
325: if (inControl.readUTF().equals(
326: ClusterSessionSearch.SESSION_RECEIVED))
327: session.passivate();
328: Logger.log(Logger.DEBUG, CLUSTER_RESOURCES,
329: "SimpleCluster.SessionTransferredTo",
330: ipPortSender);
331: } else {
332: outData
333: .writeUTF(ClusterSessionSearch.SESSION_NOT_FOUND);
334: }
335: }
336: outData.close();
337: inControl.close();
338: }
339:
340: /**
341: * Handles incoming socket requests for cluster node lists.
342: */
343: public void handleNodeListDownloadRequest(Socket socket,
344: InputStream in, OutputStream out) throws IOException {
345: // Get the ip and port of the requester, and make sure we don't send
346: // that
347: ObjectInputStream inControl = new ObjectInputStream(in);
348: int port = inControl.readInt();
349: String ipPortSender = socket.getInetAddress().getHostAddress()
350: + ":" + port;
351: List allClusterNodes = new ArrayList(this .clusterAddresses
352: .keySet());
353: List relevantClusterNodes = new ArrayList();
354: for (Iterator i = allClusterNodes.iterator(); i.hasNext();) {
355: String node = (String) i.next();
356: if (!node.equals(ipPortSender))
357: relevantClusterNodes.add(node);
358: }
359:
360: ObjectOutputStream outData = new ObjectOutputStream(out);
361: outData.writeInt(relevantClusterNodes.size());
362: outData.flush();
363: for (Iterator i = relevantClusterNodes.iterator(); i.hasNext();) {
364: String ipPort = (String) i.next();
365: if (!ipPort.equals(ipPortSender))
366: outData.writeUTF(ipPort);
367: outData.flush();
368: }
369: outData.close();
370: inControl.close();
371: }
372:
373: /**
374: * Handles heartbeats. Just updates the date of this node's last heartbeat
375: */
376: public void handleNodeHeartBeatRequest(Socket socket, InputStream in)
377: throws IOException {
378: ObjectInputStream inData = new ObjectInputStream(in);
379: int remoteControlPort = inData.readInt();
380: inData.close();
381: String ipPort = socket.getInetAddress().getHostAddress() + ":"
382: + remoteControlPort;
383: this .clusterAddresses.put(ipPort, new Date());
384: Logger.log(Logger.FULL_DEBUG, CLUSTER_RESOURCES,
385: "SimpleCluster.HeartbeatReceived", ipPort);
386: }
387: }
|