001: /*
002: Copyright (C) 2004 David Bucciarelli (davibu@interfree.it)
003:
004: This program is free software; you can redistribute it and/or
005: modify it under the terms of the GNU General Public License
006: as published by the Free Software Foundation; either version 2
007: of the License, or (at your option) any later version.
008:
009: This program is distributed in the hope that it will be useful,
010: but WITHOUT ANY WARRANTY; without even the implied warranty of
011: MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
012: GNU General Public License for more details.
013:
014: You should have received a copy of the GNU General Public License
015: along with this program; if not, write to the Free Software
016: Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
017: */
018:
019: package org.homedns.dade.jcgrid.worker;
020:
021: import java.io.*;
022: import java.net.*;
023: import java.util.*;
024:
025: import org.apache.log4j.*;
026:
027: import org.homedns.dade.jcgrid.*;
028: import org.homedns.dade.jcgrid.vfs.*;
029: import org.homedns.dade.jcgrid.message.*;
030: import org.homedns.dade.jcgrid.util.*;
031:
032: public class WorkerThread extends Thread {
033: private final static String className = WorkerThread.class
034: .getName();
035: private static Logger log = Logger.getLogger(className);
036: private static Logger logDetail = Logger.getLogger("DETAIL."
037: + className);
038:
039: private GridWorker gridWorker;
040: private GridWorkerFeedback feedback;
041:
042: public WorkerThread(GridWorker gw) throws IOException {
043: if (log.isDebugEnabled())
044: log.debug("Start WorkerThread(" + gw + ")");
045:
046: gridWorker = gw;
047: feedback = null;
048:
049: if (log.isDebugEnabled())
050: log.debug("End WorkerThread()");
051: }
052:
053: public void setWorkerFeedback(GridWorkerFeedback f) {
054: if (logDetail.isDebugEnabled())
055: logDetail.debug("Start setWorkerFeedback(" + f + ")");
056:
057: feedback = f;
058:
059: if (logDetail.isDebugEnabled())
060: logDetail.debug("End setWorkerFeedback()");
061: }
062:
063: public void run() {
064: if (log.isDebugEnabled())
065: log.debug("Start run()");
066:
067: try {
068: if (feedback != null)
069: feedback.start();
070:
071: GridMessageChannel serverChannel = gridWorker
072: .getGridMessageChannel();
073:
074: for (;;) {
075: GridMessage msg = serverChannel.recv();
076:
077: try {
078: if (msg instanceof GridMessageShutdown)
079: break;
080: else if (msg instanceof GridMessagePing)
081: serverChannel.send(new GridMessagePingAck());
082: else if (msg instanceof GridMessageWorkRequest) {
083: GridMessageWorkRequest gmwr = (GridMessageWorkRequest) msg;
084:
085: WorkRequest wr = gmwr.getWorkRequest();
086: if (log.isDebugEnabled())
087: log.debug(" Received work request: " + wr);
088:
089: String sessionName = wr.getSessionName();
090: log.warn("Working for: " + sessionName);
091:
092: if (feedback != null)
093: feedback.beginWorkingFor(sessionName, wr);
094:
095: String sessionPath = null;
096: if (gridWorker.getNodeConfig().getGridConfig()
097: .getUseVFS()) {
098: // Wait for VFS Session
099:
100: GridMessage vfsmsg = serverChannel.recv();
101:
102: if (log.isDebugEnabled())
103: log
104: .debug(" Received VFS session begin");
105:
106: vfsSession clientSession = ((GridMessageVFSSessionBegin) vfsmsg)
107: .getVFSSession();
108:
109: // Align sessions
110:
111: gridWorker.getVFSSessionPool()
112: .syncVFSSession(sessionName,
113: serverChannel,
114: clientSession);
115:
116: // End file exchange
117:
118: serverChannel
119: .send(new GridMessageVFSSessionEnd());
120:
121: sessionPath = gridWorker
122: .getVFSSessionPool().getVFSSession(
123: sessionName).getPath();
124: }
125:
126: // Do the work
127:
128: WorkResult res = gridWorker.getWorker().doWork(
129: wr, sessionPath);
130: if (log.isDebugEnabled())
131: log.debug(" Work result: " + res);
132:
133: if (feedback != null)
134: feedback.endWorkingFor(res);
135:
136: // Return the result
137:
138: serverChannel.send(new GridMessageWorkResult(
139: res));
140: } else
141: log
142: .warn("Error in WorkerThread.run(), received an unknown message: "
143: + msg);
144: } catch (Exception ex) {
145: log.warn("Error in WorkerThread.run.loop()", ex);
146: }
147: }
148: } catch (Exception ex) {
149: log.warn("Error in WorkerThread.run()", ex);
150: }
151:
152: if (log.isDebugEnabled())
153: log.debug("End run()");
154: }
155: }
|