001: /*
002: Copyright (C) 2004 David Bucciarelli (davibu@interfree.it)
003:
004: This program is free software; you can redistribute it and/or
005: modify it under the terms of the GNU General Public License
006: as published by the Free Software Foundation; either version 2
007: of the License, or (at your option) any later version.
008:
009: This program is distributed in the hope that it will be useful,
010: but WITHOUT ANY WARRANTY; without even the implied warranty of
011: MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
012: GNU General Public License for more details.
013:
014: You should have received a copy of the GNU General Public License
015: along with this program; if not, write to the Free Software
016: Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
017: */
018:
019: package org.homedns.dade.jcgrid.server;
020:
021: import java.io.*;
022: import java.net.*;
023: import java.util.*;
024:
025: import org.apache.log4j.*;
026:
027: import org.homedns.dade.jcgrid.*;
028: import org.homedns.dade.jcgrid.vfs.*;
029: import org.homedns.dade.jcgrid.message.*;
030: import org.homedns.dade.jcgrid.util.*;
031:
032: public class ClientHandlerThread extends HandlerThread {
033: public ClientHandlerThread(GridServer server, Socket socket)
034: throws IOException {
035: super (GridNodeConfig.TYPE_CLIENT, server, socket);
036:
037: if (log.isDebugEnabled())
038: log.debug("Start ClientHandlerThread(" + server + ","
039: + socket + ")");
040: if (log.isDebugEnabled())
041: log.debug("End ClientHandlerThread()");
042: }
043:
044: public void pushWorkResult(WorkResult wr) throws Exception {
045: if (log.isDebugEnabled())
046: log.debug("Start pushWorkResult(" + wr + ")");
047:
048: // In order to avoid collision between more workers calling this method
049:
050: synchronized (handlerChannel) {
051: handlerChannel.send(new GridMessageWorkResult(wr));
052: }
053:
054: if (log.isDebugEnabled())
055: log.debug("End pushWorkResult()");
056: }
057:
058: protected void handleMsg(GridMessage msg) throws Exception {
059: if (log.isDebugEnabled())
060: log.debug("Start handleMsg()");
061:
062: if (msg instanceof GridMessageWorkRequest) {
063: WorkRequest wr = ((GridMessageWorkRequest) msg)
064: .getWorkRequest();
065:
066: // Reset the session name (security)
067:
068: wr.setSessionName(sessionName);
069:
070: if (log.isDebugEnabled())
071: log.debug(" Received work request: " + wr);
072:
073: gridServer.pushWorkRequest(wr);
074: } else if (msg instanceof GridMessageVFSSessionBegin) {
075: if (log.isDebugEnabled())
076: log.debug(" Received VFS session begin");
077:
078: if (gridServer.getNodeConfig().getGridConfig().getUseVFS()) {
079: vfsSession clientSession = ((GridMessageVFSSessionBegin) msg)
080: .getVFSSession();
081:
082: // Align sessions
083:
084: gridServer.getVFSSessionPool().syncVFSSession(
085: sessionName, handlerChannel, clientSession);
086:
087: // End file exchange
088:
089: handlerChannel.send(new GridMessageVFSSessionEnd());
090: } else
091: log
092: .warn("Received VFS session begin even if VFS is disabled (session name: "
093: + this .getSessionName() + ")");
094: } else
095: log
096: .warn("Unknown message in ClientHandlerThread.handleMsg(): "
097: + msg);
098:
099: if (log.isDebugEnabled())
100: log.debug("End handleMsg()");
101: }
102:
103: protected void handleDisconnection() throws Exception {
104: if (log.isDebugEnabled())
105: log.debug("Start handleDisconnection()");
106:
107: // Remove any pending work
108:
109: gridServer.removePendingWorkRequests(sessionName);
110:
111: if (gridServer.getNodeConfig().getGridConfig().getUseVFS())
112: gridServer.getVFSSessionPool()
113: .unlockVFSSession(sessionName);
114:
115: if (log.isDebugEnabled())
116: log.debug("End handleDisconnection()");
117: }
118: }
|