001: /*
002: Copyright (C) 2004 David Bucciarelli (davibu@interfree.it)
003:
004: This program is free software; you can redistribute it and/or
005: modify it under the terms of the GNU General Public License
006: as published by the Free Software Foundation; either version 2
007: of the License, or (at your option) any later version.
008:
009: This program is distributed in the hope that it will be useful,
010: but WITHOUT ANY WARRANTY; without even the implied warranty of
011: MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
012: GNU General Public License for more details.
013:
014: You should have received a copy of the GNU General Public License
015: along with this program; if not, write to the Free Software
016: Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
017: */
018:
019: package org.homedns.dade.jcgrid.util;
020:
021: import java.io.*;
022: import java.net.*;
023: import java.util.zip.*;
024:
025: import org.apache.log4j.*;
026:
027: import org.homedns.dade.jcgrid.message.*;
028:
029: public class GridMessageGZIPChannel implements GridMessageChannel {
030: private final static String className = GridMessageGZIPChannel.class
031: .getName();
032: private static Logger log = Logger.getLogger(className);
033: private static Logger logDetail = Logger.getLogger("DETAIL."
034: + className);
035:
036: private Socket socket;
037: private InputStream is;
038: private OutputStream os;
039:
040: public GridMessageGZIPChannel(Socket s) throws IOException {
041: if (log.isDebugEnabled())
042: log.debug("Start GridMessageGZIPChannel(" + s + ")");
043:
044: socket = s;
045:
046: is = socket.getInputStream();
047: os = socket.getOutputStream();
048:
049: if (log.isDebugEnabled())
050: log.debug("End GridMessageGZIPChannel()");
051: }
052:
053: private int readPacket(byte[] buf) throws Exception {
054: if (log.isDebugEnabled())
055: log.debug("Start readPacket(" + buf + ")");
056:
057: int base = 0;
058: while (buf.length - base > 0) {
059: int res = is.read(buf, base, buf.length - base);
060:
061: if (res == -1) {
062: if (log.isDebugEnabled())
063: log.debug("End readPacket(-1)");
064:
065: return -1;
066: }
067:
068: base += res;
069: }
070:
071: if (log.isDebugEnabled())
072: log.debug("End readPacket(" + buf.length + ")");
073:
074: return buf.length;
075: }
076:
077: public GridMessage recv() throws Exception {
078: if (log.isDebugEnabled())
079: log.debug("Start recv()");
080:
081: byte[] bufSize = new byte[4];
082: if (this .readPacket(bufSize) != 4)
083: throw new Exception(
084: "Error reading a message header (size field)");
085: int size = (bufSize[0] & 0xff) | ((bufSize[1] & 0xff) << 8)
086: | ((bufSize[2] & 0xff) << 16)
087: | ((bufSize[3] & 0xff) << 24);
088: if (log.isDebugEnabled())
089: log.debug(" Packet size: " + size);
090:
091: byte[] bufData = new byte[size];
092: if (this .readPacket(bufData) != size)
093: throw new Exception(
094: "Error reading a message (payload field)");
095:
096: ByteArrayInputStream bis = new ByteArrayInputStream(bufData);
097: GZIPInputStream gis = new GZIPInputStream(bis);
098: ObjectInputStream ois = new ObjectInputStream(gis);
099: GridMessage msg = (GridMessage) ois.readObject();
100:
101: if (log.isDebugEnabled())
102: log.debug("End recv(" + msg + ")");
103:
104: return msg;
105: }
106:
107: public GridMessage recv(int timeout) throws Exception {
108: if (log.isDebugEnabled())
109: log.debug("Start recv(" + timeout + ")");
110:
111: socket.setSoTimeout(timeout);
112: GridMessage msg = this .recv();
113: socket.setSoTimeout(0);
114:
115: if (log.isDebugEnabled())
116: log.debug("End recv(" + msg + ")");
117:
118: return msg;
119: }
120:
121: public void send(GridMessage msg) throws Exception {
122: if (log.isDebugEnabled())
123: log.debug("Start send(" + msg + ")");
124:
125: ByteArrayOutputStream bos = new ByteArrayOutputStream();
126: ObjectOutputStream oos = new ObjectOutputStream(bos);
127: oos.writeObject(msg);
128: oos.close();
129: byte[] obj = bos.toByteArray();
130:
131: if (log.isDebugEnabled())
132: log.debug(" Array size: " + obj.length);
133:
134: // Compress data
135:
136: bos = new ByteArrayOutputStream();
137: GZIPOutputStream gos = new GZIPOutputStream(bos);
138: gos.write(obj);
139: gos.close();
140: obj = bos.toByteArray();
141:
142: if (log.isDebugEnabled())
143: log.debug(" Array compressed size: " + obj.length);
144:
145: // Send header (size) and data
146:
147: byte[] buf = new byte[4];
148: buf[0] = (byte) (obj.length & 0x000000ff);
149: buf[1] = (byte) ((obj.length & 0x0000ff00) >> 8);
150: buf[2] = (byte) ((obj.length & 0x00ff0000) >> 16);
151: buf[3] = (byte) ((obj.length & 0xff000000) >> 24);
152: os.write(buf);
153:
154: os.write(obj);
155: os.flush();
156:
157: if (log.isDebugEnabled())
158: log.debug("End send()");
159: }
160:
161: public void close() throws IOException {
162: if (log.isDebugEnabled())
163: log.debug("Start close()");
164:
165: is.close();
166: os.close();
167: socket.close();
168:
169: if (log.isDebugEnabled())
170: log.debug("End close()");
171: }
172: }
|