001: /*
002: * Copyright 1999,2004 The Apache Software Foundation.
003: *
004: * Licensed under the Apache License, Version 2.0 (the "License");
005: * you may not use this file except in compliance with the License.
006: * You may obtain a copy of the License at
007: *
008: * http://www.apache.org/licenses/LICENSE-2.0
009: *
010: * Unless required by applicable law or agreed to in writing, software
011: * distributed under the License is distributed on an "AS IS" BASIS,
012: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013: * See the License for the specific language governing permissions and
014: * limitations under the License.
015: */
016:
017: package org.apache.catalina.cluster.io;
018:
019: /**
020: * The XByteBuffer provides a dual functionality.
021: * One, it stores message bytes and automatically extends the byte buffer if needed.<BR>
022: * Two, it can encode and decode packages so that they can be defined and identified
023: * as they come in on a socket.
024: *
025: * @author Filip Hanik
026: * @version $Revision: 1.8 $, $Date: 2004/05/26 16:32:59 $
027: */
028:
029: public class XByteBuffer {
030:
031: public static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory
032: .getLog(XByteBuffer.class);
033:
034: /**
035: * This is a package header, 7 bytes
036: */
037: public static final byte[] START_DATA = { 70, 76, 84, 50, 48, 48,
038: 50 };
039: /**
040: * This is the package footer, 7 bytes
041: */
042: public static final byte[] END_DATA = { 84, 76, 70, 50, 48, 48, 51 };
043: //A package looks like, always.
044: /**
045: * START_DATA - 7 bytes
046: * SIZE - 4 bytes - size of the data package
047: * DATA - should be as many bytes as the prev SIZE
048: * END_DATA - 7 bytes
049: */
050:
051: /**
052: * Default size on the initial byte buffer
053: */
054: static final int DEF_SIZE = 1024;
055: /**
056: * Default size to extend the buffer with
057: */
058: static final int DEF_EXT = 1024;
059: /**
060: * Variable to hold the data
061: */
062: protected byte[] buf = null;
063: /**
064: * Current length of data in the buffer
065: */
066: protected int bufSize = 0;
067:
068: /**
069: * Constructs a new XByteBuffer
070: * @param size - the initial size of the byte buffer
071: */
072: public XByteBuffer(int size) {
073: buf = new byte[size];
074: }//XByteBuffer
075:
076: /**
077: * Constructs a new XByteBuffer with an initial size of 1024 bytes
078: */
079: public XByteBuffer() {
080: this (DEF_SIZE);
081: }//XByteBuffer
082:
083: /**
084: * Returns the bytes in the buffer, in its exact length
085: */
086: public byte[] getBytes() {
087: byte[] b = new byte[bufSize];
088: System.arraycopy(buf, 0, b, 0, bufSize);
089: return b;
090: }//getBytes
091:
092: /**
093: * Resets the buffer
094: */
095: public void clear() {
096: bufSize = 0;
097: }
098:
099: /**
100: * Appends the data to the buffer. If the data is incorrectly formatted, ie, the data should always start with the
101: * header, false will be returned and the data will be discarded.
102: * @param b - bytes to be appended
103: * @param off - the offset to extract data from
104: * @param len - the number of bytes to append.
105: * @return true if the data was appended correctly. Returns false if the package is incorrect, ie missing header or something, or the length of data is 0
106: */
107: public boolean append(byte[] b, int off, int len) {
108: if ((off < 0) || (off > b.length) || (len < 0)
109: || ((off + len) > b.length) || ((off + len) < 0)) {
110: throw new IndexOutOfBoundsException();
111: } else if (len == 0) {
112: return false;
113: }//end if
114:
115: int newcount = bufSize + len;
116: if (newcount > buf.length) {
117: byte newbuf[] = new byte[Math
118: .max(buf.length << 1, newcount)];
119: System.arraycopy(buf, 0, newbuf, 0, bufSize);
120: buf = newbuf;
121: }
122: System.arraycopy(b, off, buf, bufSize, len);
123: bufSize = newcount;
124:
125: if (bufSize > START_DATA.length
126: && (firstIndexOf(buf, 0, START_DATA) == -1)) {
127: bufSize = 0;
128: log.error("Discarded the package, invalid header");
129: return false;
130: }
131: return true;
132: }//append
133:
134: /**
135: * Internal mechanism to make a check if a complete package exists
136: * within the buffer
137: * @return - true if a complete package (header,size,data,footer) exists within the buffer
138: */
139: public int countPackages() {
140: int cnt = 0;
141: int pos = START_DATA.length;
142: int start = 0;
143:
144: while (start < bufSize) {
145: //first check start header
146: int index = this .firstIndexOf(buf, start, START_DATA);
147: //if the header (START_DATA) isn't the first thing or
148: //the buffer isn't even 10 bytes
149: if (index != start || ((bufSize - start) < 10))
150: break;
151: //then get the size 4 bytes
152: int size = toInt(buf, pos);
153: //now the total buffer has to be long enough to hold
154: //START_DATA.length+4+size+END_DATA.length
155: pos = start + START_DATA.length + 4 + size;
156: if ((pos + END_DATA.length) > bufSize)
157: break;
158: //and finally check the footer of the package END_DATA
159: int newpos = firstIndexOf(buf, pos, END_DATA);
160: //mismatch, there is no package
161: if (newpos != pos)
162: break;
163: //increase the packet count
164: cnt++;
165: //reset the values
166: start = pos + END_DATA.length;
167: pos = start + START_DATA.length;
168: }//while
169: return cnt;
170: }//getSize
171:
172: /**
173: * Method to check if a package exists in this byte buffer.
174: * @return - true if a complete package (header,size,data,footer) exists within the buffer
175: */
176: public boolean doesPackageExist() {
177: return (countPackages() > 0);
178: }//doesPackageExist
179:
180: /**
181: * Extracts the message bytes from a package.
182: * If no package exists, a IllegalStateException will be thrown.
183: * @param clearFromBuffer - if true, the package will be removed from the byte buffer
184: * @return - returns the actual message bytes (header, size and footer not included).
185: */
186: public byte[] extractPackage(boolean clearFromBuffer)
187: throws java.io.IOException {
188: int psize = countPackages();
189: if (psize == 0)
190: throw new java.lang.IllegalStateException(
191: "No package exists in XByteBuffer");
192: int size = toInt(buf, START_DATA.length);
193: byte[] data = new byte[size];
194: System.arraycopy(buf, START_DATA.length + 4, data, 0, size);
195: if (clearFromBuffer) {
196: int totalsize = START_DATA.length + 4 + size
197: + END_DATA.length;
198: bufSize = bufSize - totalsize;
199: System.arraycopy(buf, totalsize, buf, 0, bufSize);
200: }
201: java.io.ByteArrayInputStream bin = new java.io.ByteArrayInputStream(
202: data);
203: java.util.zip.GZIPInputStream gin = new java.util.zip.GZIPInputStream(
204: bin);
205: byte[] tmp = new byte[1024];
206: byte[] result = new byte[0];
207: int length = gin.read(tmp);
208: while (length > 0) {
209: byte[] tmpdata = result;
210: result = new byte[result.length + length];
211: System.arraycopy(tmpdata, 0, result, 0, tmpdata.length);
212: System.arraycopy(tmp, 0, result, tmpdata.length, length);
213: length = gin.read(tmp);
214: }
215: gin.close();
216: return result;
217: }//extractPackage
218:
219: /**
220: * Convert four bytes to an int
221: * @param b - the byte array containing the four bytes
222: * @param off - the offset
223: * @return the integer value constructed from the four bytes
224: * @exception java.lang.ArrayIndexOutOfBoundsException
225: */
226: public static int toInt(byte[] b, int off) {
227: return (((int) b[off + 3]) & 0xFF)
228: + ((((int) b[off + 2]) & 0xFF) << 8)
229: + ((((int) b[off + 1]) & 0xFF) << 16)
230: + ((((int) b[off + 0]) & 0xFF) << 24);
231: }//toInt
232:
233: /**
234: * Convert eight bytes to a long
235: * @param b - the byte array containing the four bytes
236: * @param off - the offset
237: * @return the long value constructed from the eight bytes
238: * @exception java.lang.ArrayIndexOutOfBoundsException
239: */
240: public static long toLong(byte[] b, int off) {
241: return (((long) b[off + 7]) & 0xFF)
242: + ((((long) b[off + 6]) & 0xFF) << 8)
243: + ((((long) b[off + 5]) & 0xFF) << 16)
244: + ((((long) b[off + 4]) & 0xFF) << 24)
245: + ((((long) b[off + 3]) & 0xFF) << 32)
246: + ((((long) b[off + 2]) & 0xFF) << 40)
247: + ((((long) b[off + 1]) & 0xFF) << 48)
248: + ((((long) b[off + 0]) & 0xFF) << 56);
249: }//toInt
250:
251: /**
252: * Converts an integer to four bytes
253: * @param n - the integer
254: * @return - four bytes in an array
255: */
256: public static byte[] toBytes(int n) {
257: byte[] b = new byte[4];
258: b[3] = (byte) (n);
259: n >>>= 8;
260: b[2] = (byte) (n);
261: n >>>= 8;
262: b[1] = (byte) (n);
263: n >>>= 8;
264: b[0] = (byte) (n);
265: return b;
266: } //toBytes
267:
268: /**
269: * Converts an long to eight bytes
270: * @param n - the long
271: * @return - eight bytes in an array
272: */
273: public static byte[] toBytes(long n) {
274: byte[] b = new byte[8];
275: b[7] = (byte) (n);
276: n >>>= 8;
277: b[6] = (byte) (n);
278: n >>>= 8;
279: b[5] = (byte) (n);
280: n >>>= 8;
281: b[4] = (byte) (n);
282: n >>>= 8;
283: b[3] = (byte) (n);
284: n >>>= 8;
285: b[2] = (byte) (n);
286: n >>>= 8;
287: b[1] = (byte) (n);
288: n >>>= 8;
289: b[0] = (byte) (n);
290: return b;
291: } //toBytes
292:
293: /**
294: * Similar to a String.IndexOf, but uses pure bytes
295: * @param src - the source bytes to be searched
296: * @param srcOff - offset on the source buffer
297: * @param find - the string to be found within src
298: * @return - the index of the first matching byte. -1 if the find array is not found
299: */
300: public static int firstIndexOf(byte[] src, int srcOff, byte[] find) {
301: int result = -1;
302: if (find.length > src.length)
303: return result;
304: if (find.length == 0 || src.length == 0)
305: return result;
306: if (srcOff >= src.length)
307: throw new java.lang.ArrayIndexOutOfBoundsException();
308: boolean found = false;
309: int srclen = src.length;
310: int findlen = find.length;
311: byte first = find[0];
312: int pos = srcOff;
313: while (!found) {
314: //find the first byte
315: while (pos < srclen) {
316: if (first == src[pos])
317: break;
318: pos++;
319: } //while
320: if (pos >= srclen)
321: return -1;
322:
323: //we found the first character
324: //match the rest of the bytes - they have to match
325: if ((srclen - pos) < findlen)
326: return -1;
327: //assume it does exist
328: found = true;
329: for (int i = 1; ((i < findlen) && found); i++)
330: found = found && (find[i] == src[pos + i]);
331: if (found)
332: result = pos;
333: else if ((srclen - pos) < findlen)
334: return -1; //no more matches possible
335: else
336: pos++;
337: } //while
338: return result;
339: } //firstIndexOf
340:
341: /**
342: * Creates a complete data package
343: * @param indata - the message data to be contained within the package
344: * @return - a full package (header,size,data,footer)
345: */
346: public static byte[] createDataPackage(byte[] indata)
347: throws java.io.IOException {
348: java.io.ByteArrayOutputStream bout = new java.io.ByteArrayOutputStream(
349: indata.length / 2);
350: java.util.zip.GZIPOutputStream gout = new java.util.zip.GZIPOutputStream(
351: bout);
352: gout.write(indata);
353: gout.flush();
354: gout.close();
355: byte[] data = bout.toByteArray();
356: byte[] result = new byte[START_DATA.length + 4 + data.length
357: + END_DATA.length];
358: System.arraycopy(START_DATA, 0, result, 0, START_DATA.length);
359: System.arraycopy(toBytes(data.length), 0, result,
360: START_DATA.length, 4);
361: System.arraycopy(data, 0, result, START_DATA.length + 4,
362: data.length);
363: System.arraycopy(END_DATA, 0, result, START_DATA.length + 4
364: + data.length, END_DATA.length);
365: return result;
366: }//createDataPackage
367:
368: public static void main(String[] args) throws Exception {
369: log.info("Before=" + Integer.MAX_VALUE);
370: byte[] d = toBytes(Integer.MAX_VALUE);
371: log.info("After=" + toInt(d, 0));
372:
373: log.info("Before=" + Long.MAX_VALUE);
374: d = toBytes(Long.MAX_VALUE);
375: log.info("After=" + toLong(d, 0));
376:
377: log.info("Before=" + 4564564);
378: d = toBytes((long) 4564564);
379: log.info("After=" + toLong(d, 0));
380:
381: byte[] d1 = createDataPackage(new byte[] { 1 });
382: byte[] d2 = createDataPackage(new byte[] { 2 });
383: byte[] d3 = createDataPackage(new byte[] { 3 });
384: byte[] test = new byte[d1.length + d2.length + d3.length + 5];
385: System.arraycopy(d1, 0, test, 0, d1.length);
386: System.arraycopy(d2, 0, test, d1.length, d2.length);
387: System.arraycopy(d3, 0, test, d2.length + d1.length, d3.length);
388: printBuf(d1);
389: printBuf(d2);
390: printBuf(d3);
391: printBuf(test);
392: XByteBuffer b = new XByteBuffer();
393: b.append(test, 0, test.length);
394: int s = b.countPackages();
395: log.info("Nr of packages=" + s);
396: while (s > 0) {
397: d = b.extractPackage(true);
398: System.out.print("Package d1=");
399: printBuf(d);
400: s--;
401: }//while
402:
403: }
404:
405: public static void printBuf(byte[] b) {
406: StringBuffer buf = new StringBuffer();
407: for (int i = 0; i < b.length; i++) {
408: buf.append(b[i] + " ");
409: }
410: log.info(buf);
411: }
412:
413: }//class
|