001: /*
002: Copyright (C) 2004 David Bucciarelli (davibu@interfree.it)
003:
004: This program is free software; you can redistribute it and/or
005: modify it under the terms of the GNU General Public License
006: as published by the Free Software Foundation; either version 2
007: of the License, or (at your option) any later version.
008:
009: This program is distributed in the hope that it will be useful,
010: but WITHOUT ANY WARRANTY; without even the implied warranty of
011: MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
012: GNU General Public License for more details.
013:
014: You should have received a copy of the GNU General Public License
015: along with this program; if not, write to the Free Software
016: Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
017: */
018:
019: package org.homedns.dade.jcgrid.server;
020:
021: import java.io.*;
022: import java.net.*;
023: import java.util.*;
024:
025: import org.apache.log4j.*;
026:
027: import org.homedns.dade.jcgrid.*;
028: import org.homedns.dade.jcgrid.vfs.*;
029: import org.homedns.dade.jcgrid.message.*;
030: import org.homedns.dade.jcgrid.util.*;
031:
032: public class WorkerHandlerThread extends HandlerThread {
033: private WorkerStats workerStats;
034:
035: public WorkerHandlerThread(GridServer server, Socket socket)
036: throws IOException {
037: super (GridNodeConfig.TYPE_WORKER, server, socket);
038:
039: if (log.isDebugEnabled())
040: log.debug("Start WorkerHandlerThread(" + server + ","
041: + socket + ")");
042:
043: workerStats = null;
044:
045: if (log.isDebugEnabled())
046: log.debug("End WorkerHandlerThread()");
047: }
048:
049: public WorkerStats getStats() {
050: if (logDetail.isDebugEnabled())
051: logDetail.debug("Start getStats()");
052: if (logDetail.isDebugEnabled())
053: logDetail.debug("End getStats(" + workerStats + ")");
054:
055: return workerStats;
056: }
057:
058: protected void handleConnection() throws Exception {
059: if (log.isDebugEnabled())
060: log.debug("Start handleConnection()");
061:
062: // Now the name of the worker is available
063:
064: workerStats = new WorkerStats(this .getSessionName());
065:
066: try {
067: // Wait some work to do
068:
069: for (;;) {
070: WorkRequest wr = gridServer.popWorkRequest(60 * 1000);
071: if (wr == null) {
072: // Send a ping message in order to check if the worker
073: // is still connected
074:
075: if (log.isDebugEnabled())
076: log.debug(" Sending Ping...");
077: handlerChannel.send(new GridMessagePing());
078:
079: if (log.isDebugEnabled())
080: log.debug(" Waiting PingAck...");
081: GridMessage msg = handlerChannel.recv(30 * 1000);
082:
083: if ((msg == null)
084: || (!(msg instanceof GridMessagePingAck)))
085: break;
086: else
087: continue;
088: }
089:
090: WorkResult wres = null;
091: try {
092: String sessionName = wr.getSessionName();
093: if (log.isDebugEnabled())
094: log.debug(" Received work request: " + wr);
095:
096: workerStats.workBegin(sessionName);
097:
098: // Send the work request to the worker
099:
100: handlerChannel.send(new GridMessageWorkRequest(wr));
101:
102: if (gridServer.getNodeConfig().getGridConfig()
103: .getUseVFS()) {
104: // Align sessions
105:
106: vfsSession session = gridServer
107: .getVFSSessionPool().getVFSSession(
108: sessionName);
109: session.syncVFSSession(handlerChannel);
110: }
111:
112: // Wait for the answer
113:
114: GridMessageWorkResult gmwr = (GridMessageWorkResult) handlerChannel
115: .recv();
116: wres = gmwr.getWorkResult();
117:
118: // Reset the user name (security)
119:
120: wres.setSessionName(sessionName);
121:
122: if (log.isDebugEnabled())
123: log.debug(" Received work result: " + wres);
124:
125: workerStats.workEnd(wres);
126: } finally {
127: if (wres == null) {
128: // Failed to do the work. Reinsert the WorkRequest in the
129: // pending work queue
130:
131: gridServer.pushWorkRequest(wr);
132: } else {
133: // Work done
134:
135: gridServer.pushWorkResult(wres);
136: }
137: }
138: }
139: } catch (EOFException eofex) {
140: // Worker disconnected
141:
142: if (log.isDebugEnabled())
143: log.debug(" Socket EOF");
144: }
145:
146: if (log.isDebugEnabled())
147: log.debug("End handleConnection()");
148: }
149: }
|