001: /*
002: Copyright (C) 2004 David Bucciarelli (davibu@interfree.it)
003:
004: This program is free software; you can redistribute it and/or
005: modify it under the terms of the GNU General Public License
006: as published by the Free Software Foundation; either version 2
007: of the License, or (at your option) any later version.
008:
009: This program is distributed in the hope that it will be useful,
010: but WITHOUT ANY WARRANTY; without even the implied warranty of
011: MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
012: GNU General Public License for more details.
013:
014: You should have received a copy of the GNU General Public License
015: along with this program; if not, write to the Free Software
016: Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
017: */
018:
019: package org.homedns.dade.jcgrid.server;
020:
021: import java.io.*;
022: import java.net.*;
023: import java.util.*;
024:
025: import org.apache.log4j.*;
026:
027: import org.homedns.dade.jcgrid.*;
028: import org.homedns.dade.jcgrid.message.*;
029: import org.homedns.dade.jcgrid.util.*;
030:
031: abstract public class HandlerThread extends Thread {
032: protected final static String className = HandlerThread.class
033: .getName();
034: protected static Logger log = Logger.getLogger(className);
035: protected static Logger logDetail = Logger.getLogger("DETAIL."
036: + className);
037:
038: protected String handlerType;
039:
040: protected GridServer gridServer;
041: protected Socket handlerSocket;
042: protected GridMessageChannel handlerChannel;
043:
044: protected String sessionName;
045:
046: public HandlerThread(String type, GridServer server, Socket socket)
047: throws IOException {
048: if (log.isDebugEnabled())
049: log.debug("Start HandlerThread(" + type + "," + server
050: + "," + socket + ")");
051:
052: handlerType = type;
053: gridServer = server;
054: handlerSocket = socket;
055:
056: if (gridServer.getNodeConfig().getGridConfig()
057: .getUseCompression())
058: handlerChannel = new GridMessageGZIPChannel(socket);
059: else
060: handlerChannel = new GridMessageFlatChannel(socket, true);
061:
062: sessionName = null;
063:
064: if (log.isDebugEnabled())
065: log.debug("End HandlerThread()");
066: }
067:
068: public String getSessionName() {
069: if (logDetail.isDebugEnabled())
070: logDetail.debug("Start getSessionName()");
071: if (logDetail.isDebugEnabled())
072: logDetail.debug("End getSessionName(" + sessionName + ")");
073:
074: return sessionName;
075: }
076:
077: public void run() {
078: if (log.isDebugEnabled())
079: log.debug("Start run()");
080:
081: NDC.push("[" + handlerType + "]");
082:
083: try {
084: // Waiting for login message
085:
086: GridMessageBeginSession gmb = (GridMessageBeginSession) handlerChannel
087: .recv();
088:
089: sessionName = gmb.getSessionName();
090:
091: NDC.pop();
092: NDC.push("[" + handlerType + "-" + sessionName + "]");
093:
094: String pwd = gridServer.getNodeConfig().getGridConfig()
095: .getServerPassword();
096: if ((pwd != null) && (!pwd.equals(gmb.getServerPassword())))
097: throw new Exception("Login failed for " + sessionName);
098:
099: log.warn("New session: " + sessionName + " ["
100: + handlerSocket.getLocalAddress().getHostAddress()
101: + ":" + handlerSocket.getLocalPort() + "]");
102:
103: // Check if the session name is valid
104: // If you change the regexp check also the client side code
105:
106: if (!sessionName.matches("[a-zA-Z0-9_]+"))
107: throw new Exception("Invalid session name: "
108: + sessionName);
109:
110: // Check if the session already exists
111:
112: HandlerThreads hts;
113: if (handlerType.equals(GridNodeConfig.TYPE_CLIENT))
114: hts = gridServer.getClientHandlers();
115: else if (handlerType.equals(GridNodeConfig.TYPE_WORKER))
116: hts = gridServer.getWorkerHandlers();
117: else if (handlerType.equals(GridNodeConfig.TYPE_ADMIN))
118: hts = gridServer.getAdminHandlers();
119: else
120: throw new Exception("Unknown handler type: "
121: + handlerType);
122:
123: if (hts.doesHandlerSessionExists(sessionName, this ))
124: throw new Exception("Session name already in use: "
125: + sessionName);
126:
127: // Add to the list of connected clients
128:
129: hts.addConnectedHandler(this );
130:
131: try {
132: this .handleConnection();
133: } finally {
134: log.warn("End session: " + sessionName);
135: hts.removeConnectedHandler(this );
136: }
137: } catch (SocketException sex) {
138: if (log.isDebugEnabled())
139: log.debug(" Socket Exception", sex);
140: } catch (EOFException eofex) {
141: // Client disconnected
142:
143: if (log.isDebugEnabled())
144: log.debug(" Socket EOF");
145: } catch (Exception ex) {
146: log.warn("Error in HandlerThread.run()", ex);
147: } finally {
148: try {
149: this .handleDisconnection();
150: } catch (Exception ex) {
151: log.warn("Failed disconnection of session "
152: + this .getSessionName(), ex);
153: }
154:
155: try {
156: handlerChannel.close();
157: } catch (Exception ex) {
158: }
159: }
160:
161: NDC.pop();
162:
163: if (log.isDebugEnabled())
164: log.debug("End run()");
165: }
166:
167: protected void handleConnection() throws Exception {
168: // Wait for something to do
169:
170: for (;;) {
171: GridMessage msg = handlerChannel.recv();
172:
173: if (log.isDebugEnabled())
174: log.debug(" Received grid message: " + msg);
175:
176: if (msg instanceof GridMessagePing) {
177: if (log.isDebugEnabled())
178: log.debug(" Received ping request");
179:
180: handlerChannel.send(new GridMessagePingAck());
181: } else if (msg instanceof GridMessageEndSession) {
182: if (log.isDebugEnabled())
183: log.debug(" Received end session request");
184: break;
185: } else
186: this .handleMsg(msg);
187: }
188: }
189:
190: protected void handleMsg(GridMessage msg) throws Exception {
191: log
192: .warn("Unknown message in HandlerThread.handleMsg(): "
193: + msg);
194: }
195:
196: protected void handleDisconnection() throws Exception {
197: // Do nothig by default
198: }
199: }
|