001: package org.jgroups.protocols;
002:
003: import org.jgroups.Event;
004: import org.jgroups.Global;
005: import org.jgroups.Header;
006: import org.jgroups.Message;
007: import org.jgroups.stack.Protocol;
008: import org.jgroups.util.Streamable;
009:
010: import java.io.*;
011: import java.util.Properties;
012: import java.util.zip.DataFormatException;
013: import java.util.zip.Deflater;
014: import java.util.zip.Inflater;
015:
016: /**
017: * Compresses the payload of a message. Goal is to reduce the number of messages sent across the wire.
018: * Should ideally be layered somewhere above a fragmentation protocol (e.g. FRAG).
019: * @author Bela Ban
020: * @version $Id: COMPRESS.java,v 1.12.2.1 2007/04/27 08:03:50 belaban Exp $
021: */
022: public class COMPRESS extends Protocol {
023: Deflater[] deflater_pool = null;
024: Inflater[] inflater_pool = null;
025:
026: /** Values are from 0-9 (0=no compression, 9=best compression) */
027: int compression_level = Deflater.BEST_COMPRESSION; // this is 9
028:
029: /** Minimal payload size of a message (in bytes) for compression to kick in */
030: long min_size = 500;
031:
032: /** Number of inflaters/deflaters, for concurrency, increase this to the max number of concurrent requests possible */
033: int pool_size = 2;
034:
035: private int inflater_index = 0;
036: private int deflater_index = 0;
037:
038: final static String name = "COMPRESS";
039:
040: public String getName() {
041: return name;
042: }
043:
044: private final int getInflaterIndex() {
045: synchronized (this ) {
046: int retval = inflater_index++;
047: if (inflater_index >= pool_size) {
048: inflater_index = 0;
049: }
050: return retval;
051: }
052: }
053:
054: private final int getDeflaterIndex() {
055: synchronized (this ) {
056: int retval = deflater_index++;
057: if (deflater_index >= pool_size) {
058: deflater_index = 0;
059: }
060: return retval;
061: }
062: }
063:
064: public void init() throws Exception {
065: deflater_pool = new Deflater[pool_size];
066: for (int i = 0; i < deflater_pool.length; i++) {
067: deflater_pool[i] = new Deflater(compression_level);
068: }
069: inflater_pool = new Inflater[pool_size];
070: for (int i = 0; i < inflater_pool.length; i++) {
071: inflater_pool[i] = new Inflater();
072: }
073: }
074:
075: public void destroy() {
076: for (int i = 0; i < deflater_pool.length; i++) {
077: Deflater deflater = deflater_pool[i];
078: deflater.end();
079: }
080: for (int i = 0; i < inflater_pool.length; i++) {
081: Inflater inflater = inflater_pool[i];
082: inflater.end();
083: }
084: }
085:
086: public boolean setProperties(Properties props) {
087: String str;
088:
089: super .setProperties(props);
090: str = props.getProperty("compression_level");
091: if (str != null) {
092: compression_level = Integer.parseInt(str);
093: props.remove("compression_level");
094: }
095:
096: str = props.getProperty("min_size");
097: if (str != null) {
098: min_size = Long.parseLong(str);
099: props.remove("min_size");
100: }
101:
102: str = props.getProperty("pool_size");
103: if (str != null) {
104: pool_size = Integer.parseInt(str);
105: if (pool_size <= 0) {
106: log.warn("pool_size must be > 0, setting it to 1");
107: pool_size = 1;
108: }
109: props.remove("pool_size");
110: }
111:
112: if (props.size() > 0) {
113: log.error("the following properties are not recognized: "
114: + props);
115: return false;
116: }
117: return true;
118: }
119:
120: /**
121: * We compress the payload if it is larger than <code>min_size</code>. In this case we add a header containing
122: * the original size before compression. Otherwise we add no header.<br/>
123: * Note that we compress either the entire buffer (if offset/length are not used), or a subset (if offset/length
124: * are used)
125: * @param evt
126: */
127: public void down(Event evt) {
128: if (evt.getType() == Event.MSG) {
129: Message msg = (Message) evt.getArg();
130: int length = msg.getLength(); // takes offset/length (if set) into account
131: if (length >= min_size) {
132: byte[] payload = msg.getRawBuffer(); // here we get the ref so we can avoid copying
133: byte[] compressed_payload = new byte[length];
134: int compressed_size;
135: int tmp_index = getDeflaterIndex();
136: Deflater deflater = deflater_pool[tmp_index]; // must be guaranteed to be non-null !
137: synchronized (deflater) {
138: deflater.reset();
139: deflater.setInput(payload, msg.getOffset(), length);
140: deflater.finish();
141: deflater.deflate(compressed_payload);
142: compressed_size = deflater.getTotalOut();
143: }
144: byte[] new_payload = new byte[compressed_size];
145: System.arraycopy(compressed_payload, 0, new_payload, 0,
146: compressed_size);
147: msg.setBuffer(new_payload);
148: msg.putHeader(name, new CompressHeader(length));
149: if (log.isTraceEnabled())
150: log.trace("compressed payload from " + length
151: + " bytes to " + compressed_size
152: + " bytes (inflater #" + tmp_index + ")");
153: }
154: }
155: passDown(evt);
156: }
157:
158: /**
159: * If there is no header, we pass the message up. Otherwise we uncompress the payload to its original size.
160: * @param evt
161: */
162: public void up(Event evt) {
163: if (evt.getType() == Event.MSG) {
164: Message msg = (Message) evt.getArg();
165: CompressHeader hdr = (CompressHeader) msg
166: .removeHeader(name);
167: if (hdr != null) {
168: byte[] compressed_payload = msg.getRawBuffer();
169: if (compressed_payload != null
170: && compressed_payload.length > 0) {
171: int original_size = hdr.original_size;
172: byte[] uncompressed_payload = new byte[original_size];
173: int tmp_index = getInflaterIndex();
174: Inflater inflater = inflater_pool[tmp_index];
175: synchronized (inflater) {
176: inflater.reset();
177: inflater.setInput(compressed_payload, msg
178: .getOffset(), msg.getLength());
179: try {
180: inflater.inflate(uncompressed_payload);
181: if (log.isTraceEnabled())
182: log.trace("uncompressed "
183: + compressed_payload.length
184: + " bytes to " + original_size
185: + " bytes (deflater #"
186: + tmp_index + ")");
187: msg.setBuffer(uncompressed_payload);
188: } catch (DataFormatException e) {
189: if (log.isErrorEnabled())
190: log.error("exception on uncompression",
191: e);
192: }
193: }
194: }
195: }
196: }
197: passUp(evt);
198: }
199:
200: public static class CompressHeader extends Header implements
201: Streamable {
202: int original_size = 0;
203:
204: public CompressHeader() {
205: super ();
206: }
207:
208: public CompressHeader(int s) {
209: original_size = s;
210: }
211:
212: public long size() {
213: return Global.INT_SIZE;
214: }
215:
216: public void writeExternal(ObjectOutput out) throws IOException {
217: out.writeInt(original_size);
218: }
219:
220: public void readExternal(ObjectInput in) throws IOException,
221: ClassNotFoundException {
222: original_size = in.readInt();
223: }
224:
225: public void writeTo(DataOutputStream out) throws IOException {
226: out.writeInt(original_size);
227: }
228:
229: public void readFrom(DataInputStream in) throws IOException,
230: IllegalAccessException, InstantiationException {
231: original_size = in.readInt();
232: }
233: }
234: }
|