001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017:
018: package org.apache.catalina.tribes.io;
019:
020: import java.io.ByteArrayInputStream;
021: import java.io.ByteArrayOutputStream;
022: import java.io.IOException;
023: import java.io.InputStream;
024: import java.io.ObjectInputStream;
025: import java.io.ObjectOutputStream;
026: import java.io.Serializable;
027: import java.nio.ByteBuffer;
028:
029: /**
030: * The XByteBuffer provides a dual functionality.
031: * One, it stores message bytes and automatically extends the byte buffer if needed.<BR>
032: * Two, it can encode and decode packages so that they can be defined and identified
033: * as they come in on a socket.
034: * <br>
035: * <b>THIS CLASS IS NOT THREAD SAFE</B><BR>
036: * <br/>
037: * Transfer package:
038: * <ul>
039: * <li><b>START_DATA/b> - 7 bytes - <i>FLT2002</i></li>
040: * <li><b>SIZE</b> - 4 bytes - size of the data package</li>
041: * <li><b>DATA</b> - should be as many bytes as the prev SIZE</li>
042: * <li><b>END_DATA</b> - 7 bytes - <i>TLF2003</i></lI>
043: * </ul>
044: * @author Filip Hanik
045: * @version $Revision: 467173 $, $Date: 2006-10-24 01:12:17 +0200 (mar., 24 oct. 2006) $
046: */
047: public class XByteBuffer {
048:
049: public static org.apache.juli.logging.Log log = org.apache.juli.logging.LogFactory
050: .getLog(XByteBuffer.class);
051:
052: /**
053: * This is a package header, 7 bytes (FLT2002)
054: */
055: public static final byte[] START_DATA = { 70, 76, 84, 50, 48, 48,
056: 50 };
057:
058: /**
059: * This is the package footer, 7 bytes (TLF2003)
060: */
061: public static final byte[] END_DATA = { 84, 76, 70, 50, 48, 48, 51 };
062:
063: /**
064: * Default size on the initial byte buffer
065: */
066: private static final int DEF_SIZE = 2048;
067:
068: /**
069: * Default size to extend the buffer with
070: */
071: private static final int DEF_EXT = 1024;
072:
073: /**
074: * Variable to hold the data
075: */
076: protected byte[] buf = null;
077:
078: /**
079: * Current length of data in the buffer
080: */
081: protected int bufSize = 0;
082:
083: /**
084: * Flag for discarding invalid packages
085: * If this flag is set to true, and append(byte[],...) is called,
086: * the data added will be inspected, and if it doesn't start with
087: * <code>START_DATA</code> it will be thrown away.
088: *
089: */
090: protected boolean discard = true;
091:
092: /**
093: * Constructs a new XByteBuffer
094: * @param size - the initial size of the byte buffer
095: * @todo use a pool of byte[] for performance
096: */
097: public XByteBuffer(int size, boolean discard) {
098: buf = new byte[size];
099: this .discard = discard;
100: }
101:
102: public XByteBuffer(byte[] data, boolean discard) {
103: this (data, data.length + 128, discard);
104: }
105:
106: public XByteBuffer(byte[] data, int size, boolean discard) {
107: int length = Math.max(data.length, size);
108: buf = new byte[length];
109: System.arraycopy(data, 0, buf, 0, data.length);
110: bufSize = data.length;
111: this .discard = discard;
112: }
113:
114: public int getLength() {
115: return bufSize;
116: }
117:
118: public void setLength(int size) {
119: if (size > buf.length)
120: throw new ArrayIndexOutOfBoundsException(
121: "Size is larger than existing buffer.");
122: bufSize = size;
123: }
124:
125: public void trim(int length) {
126: if ((bufSize - length) < 0)
127: throw new ArrayIndexOutOfBoundsException(
128: "Can't trim more bytes than are available. length:"
129: + bufSize + " trim:" + length);
130: bufSize -= length;
131: }
132:
133: public void reset() {
134: bufSize = 0;
135: }
136:
137: public byte[] getBytesDirect() {
138: return this .buf;
139: }
140:
141: /**
142: * Returns the bytes in the buffer, in its exact length
143: */
144: public byte[] getBytes() {
145: byte[] b = new byte[bufSize];
146: System.arraycopy(buf, 0, b, 0, bufSize);
147: return b;
148: }
149:
150: /**
151: * Resets the buffer
152: */
153: public void clear() {
154: bufSize = 0;
155: }
156:
157: /**
158: * Appends the data to the buffer. If the data is incorrectly formatted, ie, the data should always start with the
159: * header, false will be returned and the data will be discarded.
160: * @param b - bytes to be appended
161: * @param off - the offset to extract data from
162: * @param len - the number of bytes to append.
163: * @return true if the data was appended correctly. Returns false if the package is incorrect, ie missing header or something, or the length of data is 0
164: */
165: public boolean append(ByteBuffer b, int len) {
166: int newcount = bufSize + len;
167: if (newcount > buf.length) {
168: expand(newcount);
169: }
170: b.get(buf, bufSize, len);
171:
172: bufSize = newcount;
173:
174: if (discard) {
175: if (bufSize > START_DATA.length
176: && (firstIndexOf(buf, 0, START_DATA) == -1)) {
177: bufSize = 0;
178: log.error("Discarded the package, invalid header");
179: return false;
180: }
181: }
182: return true;
183:
184: }
185:
186: public boolean append(byte i) {
187: int newcount = bufSize + 1;
188: if (newcount > buf.length) {
189: expand(newcount);
190: }
191: buf[bufSize] = i;
192: bufSize = newcount;
193: return true;
194: }
195:
196: public boolean append(boolean i) {
197: int newcount = bufSize + 1;
198: if (newcount > buf.length) {
199: expand(newcount);
200: }
201: XByteBuffer.toBytes(i, buf, bufSize);
202: bufSize = newcount;
203: return true;
204: }
205:
206: public boolean append(long i) {
207: int newcount = bufSize + 8;
208: if (newcount > buf.length) {
209: expand(newcount);
210: }
211: XByteBuffer.toBytes(i, buf, bufSize);
212: bufSize = newcount;
213: return true;
214: }
215:
216: public boolean append(int i) {
217: int newcount = bufSize + 4;
218: if (newcount > buf.length) {
219: expand(newcount);
220: }
221: XByteBuffer.toBytes(i, buf, bufSize);
222: bufSize = newcount;
223: return true;
224: }
225:
226: public boolean append(byte[] b, int off, int len) {
227: if ((off < 0) || (off > b.length) || (len < 0)
228: || ((off + len) > b.length) || ((off + len) < 0)) {
229: throw new IndexOutOfBoundsException();
230: } else if (len == 0) {
231: return false;
232: }
233:
234: int newcount = bufSize + len;
235: if (newcount > buf.length) {
236: expand(newcount);
237: }
238: System.arraycopy(b, off, buf, bufSize, len);
239: bufSize = newcount;
240:
241: if (discard) {
242: if (bufSize > START_DATA.length
243: && (firstIndexOf(buf, 0, START_DATA) == -1)) {
244: bufSize = 0;
245: log.error("Discarded the package, invalid header");
246: return false;
247: }
248: }
249: return true;
250: }
251:
252: public void expand(int newcount) {
253: //don't change the allocation strategy
254: byte newbuf[] = new byte[Math.max(buf.length << 1, newcount)];
255: System.arraycopy(buf, 0, newbuf, 0, bufSize);
256: buf = newbuf;
257: }
258:
259: public int getCapacity() {
260: return buf.length;
261: }
262:
263: /**
264: * Internal mechanism to make a check if a complete package exists
265: * within the buffer
266: * @return - true if a complete package (header,compress,size,data,footer) exists within the buffer
267: */
268: public int countPackages() {
269: return countPackages(false);
270: }
271:
272: public int countPackages(boolean first) {
273: int cnt = 0;
274: int pos = START_DATA.length;
275: int start = 0;
276:
277: while (start < bufSize) {
278: //first check start header
279: int index = XByteBuffer
280: .firstIndexOf(buf, start, START_DATA);
281: //if the header (START_DATA) isn't the first thing or
282: //the buffer isn't even 14 bytes
283: if (index != start || ((bufSize - start) < 14))
284: break;
285: //next 4 bytes are compress flag not needed for count packages
286: //then get the size 4 bytes
287: int size = toInt(buf, pos);
288: //now the total buffer has to be long enough to hold
289: //START_DATA.length+4+size+END_DATA.length
290: pos = start + START_DATA.length + 4 + size;
291: if ((pos + END_DATA.length) > bufSize)
292: break;
293: //and finally check the footer of the package END_DATA
294: int newpos = firstIndexOf(buf, pos, END_DATA);
295: //mismatch, there is no package
296: if (newpos != pos)
297: break;
298: //increase the packet count
299: cnt++;
300: //reset the values
301: start = pos + END_DATA.length;
302: pos = start + START_DATA.length;
303: //we only want to verify that we have at least one package
304: if (first)
305: break;
306: }
307: return cnt;
308: }
309:
310: /**
311: * Method to check if a package exists in this byte buffer.
312: * @return - true if a complete package (header,options,size,data,footer) exists within the buffer
313: */
314: public boolean doesPackageExist() {
315: return (countPackages(true) > 0);
316: }
317:
318: /**
319: * Extracts the message bytes from a package.
320: * If no package exists, a IllegalStateException will be thrown.
321: * @param clearFromBuffer - if true, the package will be removed from the byte buffer
322: * @return - returns the actual message bytes (header, compress,size and footer not included).
323: */
324: public XByteBuffer extractDataPackage(boolean clearFromBuffer) {
325: int psize = countPackages(true);
326: if (psize == 0) {
327: throw new java.lang.IllegalStateException(
328: "No package exists in XByteBuffer");
329: }
330: int size = toInt(buf, START_DATA.length);
331: XByteBuffer xbuf = BufferPool.getBufferPool().getBuffer(size,
332: false);
333: xbuf.setLength(size);
334: System.arraycopy(buf, START_DATA.length + 4, xbuf
335: .getBytesDirect(), 0, size);
336: if (clearFromBuffer) {
337: int totalsize = START_DATA.length + 4 + size
338: + END_DATA.length;
339: bufSize = bufSize - totalsize;
340: System.arraycopy(buf, totalsize, buf, 0, bufSize);
341: }
342: return xbuf;
343:
344: }
345:
346: public ChannelData extractPackage(boolean clearFromBuffer)
347: throws java.io.IOException {
348: XByteBuffer xbuf = extractDataPackage(clearFromBuffer);
349: ChannelData cdata = ChannelData.getDataFromPackage(xbuf);
350: return cdata;
351: }
352:
353: /**
354: * Creates a complete data package
355: * @param indata - the message data to be contained within the package
356: * @param compressed - compression flag for the indata buffer
357: * @return - a full package (header,size,data,footer)
358: *
359: */
360: public static byte[] createDataPackage(ChannelData cdata) {
361: // return createDataPackage(cdata.getDataPackage());
362: //avoid one extra byte array creation
363: int dlength = cdata.getDataPackageLength();
364: int length = getDataPackageLength(dlength);
365: byte[] data = new byte[length];
366: int offset = 0;
367: System
368: .arraycopy(START_DATA, 0, data, offset,
369: START_DATA.length);
370: offset += START_DATA.length;
371: toBytes(dlength, data, START_DATA.length);
372: offset += 4;
373: cdata.getDataPackage(data, offset);
374: offset += dlength;
375: System.arraycopy(END_DATA, 0, data, offset, END_DATA.length);
376: offset += END_DATA.length;
377: return data;
378: }
379:
380: public static byte[] createDataPackage(byte[] data, int doff,
381: int dlength, byte[] buffer, int bufoff) {
382: if ((buffer.length - bufoff) > getDataPackageLength(dlength)) {
383: throw new ArrayIndexOutOfBoundsException(
384: "Unable to create data package, buffer is too small.");
385: }
386: System.arraycopy(START_DATA, 0, buffer, bufoff,
387: START_DATA.length);
388: toBytes(data.length, buffer, bufoff + START_DATA.length);
389: System.arraycopy(data, doff, buffer, bufoff + START_DATA.length
390: + 4, dlength);
391: System.arraycopy(END_DATA, 0, buffer, bufoff
392: + START_DATA.length + 4 + data.length, END_DATA.length);
393: return buffer;
394: }
395:
396: public static int getDataPackageLength(int datalength) {
397: int length = START_DATA.length + //header length
398: 4 + //data length indicator
399: datalength + //actual data length
400: END_DATA.length; //footer length
401: return length;
402:
403: }
404:
405: public static byte[] createDataPackage(byte[] data) {
406: int length = getDataPackageLength(data.length);
407: byte[] result = new byte[length];
408: return createDataPackage(data, 0, data.length, result, 0);
409: }
410:
411: // public static void fillDataPackage(byte[] data, int doff, int dlength, XByteBuffer buf) {
412: // int pkglen = getDataPackageLength(dlength);
413: // if ( buf.getCapacity() < pkglen ) buf.expand(pkglen);
414: // createDataPackage(data,doff,dlength,buf.getBytesDirect(),buf.getLength());
415: // }
416:
417: /**
418: * Convert four bytes to an int
419: * @param b - the byte array containing the four bytes
420: * @param off - the offset
421: * @return the integer value constructed from the four bytes
422: * @exception java.lang.ArrayIndexOutOfBoundsException
423: */
424: public static int toInt(byte[] b, int off) {
425: return (((int) b[off + 3]) & 0xFF)
426: + ((((int) b[off + 2]) & 0xFF) << 8)
427: + ((((int) b[off + 1]) & 0xFF) << 16)
428: + ((((int) b[off + 0]) & 0xFF) << 24);
429: }
430:
431: /**
432: * Convert eight bytes to a long
433: * @param b - the byte array containing the four bytes
434: * @param off - the offset
435: * @return the long value constructed from the eight bytes
436: * @exception java.lang.ArrayIndexOutOfBoundsException
437: */
438: public static long toLong(byte[] b, int off) {
439: return (((long) b[off + 7]) & 0xFF)
440: + ((((long) b[off + 6]) & 0xFF) << 8)
441: + ((((long) b[off + 5]) & 0xFF) << 16)
442: + ((((long) b[off + 4]) & 0xFF) << 24)
443: + ((((long) b[off + 3]) & 0xFF) << 32)
444: + ((((long) b[off + 2]) & 0xFF) << 40)
445: + ((((long) b[off + 1]) & 0xFF) << 48)
446: + ((((long) b[off + 0]) & 0xFF) << 56);
447: }
448:
449: /**
450: * Converts an integer to four bytes
451: * @param n - the integer
452: * @return - four bytes in an array
453: * @deprecated use toBytes(boolean,byte[],int)
454: */
455: public static byte[] toBytes(boolean bool) {
456: byte[] b = new byte[1];
457: return toBytes(bool, b, 0);
458:
459: }
460:
461: public static byte[] toBytes(boolean bool, byte[] data, int offset) {
462: data[offset] = (byte) (bool ? 1 : 0);
463: return data;
464: }
465:
466: /**
467: *
468: * @param <any> long
469: * @return use
470: */
471: public static boolean toBoolean(byte[] b, int offset) {
472: return b[offset] != 0;
473: }
474:
475: /**
476: * Converts an integer to four bytes
477: * @param n - the integer
478: * @return - four bytes in an array
479: * @deprecated use toBytes(int,byte[],int)
480: */
481: public static byte[] toBytes(int n) {
482: return toBytes(n, new byte[4], 0);
483: }
484:
485: public static byte[] toBytes(int n, byte[] b, int offset) {
486: b[offset + 3] = (byte) (n);
487: n >>>= 8;
488: b[offset + 2] = (byte) (n);
489: n >>>= 8;
490: b[offset + 1] = (byte) (n);
491: n >>>= 8;
492: b[offset + 0] = (byte) (n);
493: return b;
494: }
495:
496: /**
497: * Converts an long to eight bytes
498: * @param n - the long
499: * @return - eight bytes in an array
500: * @deprecated use toBytes(long,byte[],int)
501: */
502: public static byte[] toBytes(long n) {
503: return toBytes(n, new byte[8], 0);
504: }
505:
506: public static byte[] toBytes(long n, byte[] b, int offset) {
507: b[offset + 7] = (byte) (n);
508: n >>>= 8;
509: b[offset + 6] = (byte) (n);
510: n >>>= 8;
511: b[offset + 5] = (byte) (n);
512: n >>>= 8;
513: b[offset + 4] = (byte) (n);
514: n >>>= 8;
515: b[offset + 3] = (byte) (n);
516: n >>>= 8;
517: b[offset + 2] = (byte) (n);
518: n >>>= 8;
519: b[offset + 1] = (byte) (n);
520: n >>>= 8;
521: b[offset + 0] = (byte) (n);
522: return b;
523: }
524:
525: /**
526: * Similar to a String.IndexOf, but uses pure bytes
527: * @param src - the source bytes to be searched
528: * @param srcOff - offset on the source buffer
529: * @param find - the string to be found within src
530: * @return - the index of the first matching byte. -1 if the find array is not found
531: */
532: public static int firstIndexOf(byte[] src, int srcOff, byte[] find) {
533: int result = -1;
534: if (find.length > src.length)
535: return result;
536: if (find.length == 0 || src.length == 0)
537: return result;
538: if (srcOff >= src.length)
539: throw new java.lang.ArrayIndexOutOfBoundsException();
540: boolean found = false;
541: int srclen = src.length;
542: int findlen = find.length;
543: byte first = find[0];
544: int pos = srcOff;
545: while (!found) {
546: //find the first byte
547: while (pos < srclen) {
548: if (first == src[pos])
549: break;
550: pos++;
551: }
552: if (pos >= srclen)
553: return -1;
554:
555: //we found the first character
556: //match the rest of the bytes - they have to match
557: if ((srclen - pos) < findlen)
558: return -1;
559: //assume it does exist
560: found = true;
561: for (int i = 1; ((i < findlen) && found); i++)
562: found = found && (find[i] == src[pos + i]);
563: if (found)
564: result = pos;
565: else if ((srclen - pos) < findlen)
566: return -1; //no more matches possible
567: else
568: pos++;
569: }
570: return result;
571: }
572:
573: public static Serializable deserialize(byte[] data)
574: throws IOException, ClassNotFoundException,
575: ClassCastException {
576: return deserialize(data, 0, data.length);
577: }
578:
579: public static Serializable deserialize(byte[] data, int offset,
580: int length) throws IOException, ClassNotFoundException,
581: ClassCastException {
582: return deserialize(data, offset, length, null);
583: }
584:
585: public static int invokecount = 0;
586:
587: public static Serializable deserialize(byte[] data, int offset,
588: int length, ClassLoader[] cls) throws IOException,
589: ClassNotFoundException, ClassCastException {
590: synchronized (XByteBuffer.class) {
591: invokecount++;
592: }
593: Object message = null;
594: if (cls == null)
595: cls = new ClassLoader[0];
596: if (data != null) {
597: InputStream instream = new ByteArrayInputStream(data,
598: offset, length);
599: ObjectInputStream stream = null;
600: stream = (cls.length > 0) ? new ReplicationStream(instream,
601: cls) : new ObjectInputStream(instream);
602: message = stream.readObject();
603: instream.close();
604: stream.close();
605: }
606: if (message == null) {
607: return null;
608: } else if (message instanceof Serializable)
609: return (Serializable) message;
610: else {
611: throw new ClassCastException(
612: "Message has the wrong class. It should implement Serializable, instead it is:"
613: + message.getClass().getName());
614: }
615: }
616:
617: /**
618: * Serializes a message into cluster data
619: * @param msg ClusterMessage
620: * @param compress boolean
621: * @return
622: * @throws IOException
623: */
624: public static byte[] serialize(Serializable msg) throws IOException {
625: ByteArrayOutputStream outs = new ByteArrayOutputStream();
626: ObjectOutputStream out = new ObjectOutputStream(outs);
627: out.writeObject(msg);
628: out.flush();
629: byte[] data = outs.toByteArray();
630: return data;
631: }
632:
633: public void setDiscard(boolean discard) {
634: this .discard = discard;
635: }
636:
637: public boolean getDiscard() {
638: return discard;
639: }
640:
641: }
|