001: /**
002: * Sequoia: Database clustering technology.
003: * Copyright (C) 2005 Emic Networks
004: * Contact: sequoia@continuent.org
005: *
006: * Licensed under the Apache License, Version 2.0 (the "License");
007: * you may not use this file except in compliance with the License.
008: * You may obtain a copy of the License at
009: *
010: * http://www.apache.org/licenses/LICENSE-2.0
011: *
012: * Unless required by applicable law or agreed to in writing, software
013: * distributed under the License is distributed on an "AS IS" BASIS,
014: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015: * See the License for the specific language governing permissions and
016: * limitations under the License.
017: *
018: * Initial developer(s): Marc Herbert
019: * Contributor(s): ______________________.
020: */package org.continuent.sequoia.common.protocol;
021:
022: import java.io.ByteArrayInputStream;
023: import java.io.ByteArrayOutputStream;
024: import java.io.IOException;
025: import java.io.ObjectInputStream;
026: import java.io.ObjectOutputStream;
027: import java.io.Serializable;
028: import java.math.BigDecimal;
029: import java.math.BigInteger;
030: import java.sql.Clob;
031: import java.sql.SQLException;
032: import java.sql.Timestamp;
033:
034: import org.continuent.sequoia.common.exceptions.NotImplementedException;
035: import org.continuent.sequoia.common.stream.LongUTFDataInputStream;
036: import org.continuent.sequoia.common.stream.LongUTFDataOutputStream;
037:
038: /**
039: * This class defines Serializers for SQL Data: per type serialization +
040: * deserialization methods and information wrapped in one object. Serializers
041: * are implemented as singletons for efficiency.
042: *
043: * @author <a href="mailto:Marc.Herbert@continuent.com">Marc Herbert </a>
044: * @author <a href="mailto:Gilles.Rayrat@continuent.com">Gilles Rayrat </a>
045: * @version 1.0
046: */
047: public final class SQLDataSerialization {
048:
049: /**
050: * Unsupported types listed in {@link TypeTag}won't be added in the near
051: * future, except maybe java.net.URL for JDBC 3.0
052: */
053: /**
054: * CLOB support should be easy to base on BLOB implementation once we figure
055: * out the encoding issues.
056: */
057:
058: private static final Serializer JAVA_STRING = new StringSerializer();
059: private static final Serializer MATH_BIGDECIMAL = new BigDecimalBytesSerializer();
060: private static final Serializer JAVA_BOOLEAN = new BooleanSerializer();
061: private static final Serializer JAVA_INTEGER = new IntegerSerializer();
062: private static final Serializer JAVA_LONG = new LongSerializer();
063: private static final Serializer JAVA_FLOAT = new FloatSerializer();
064: private static final Serializer JAVA_DOUBLE = new DoubleSerializer();
065: private static final Serializer JAVA_BYTES = new BytesSerializer();
066:
067: private static final Serializer SQL_DATE = new DateSerializer();
068: private static final Serializer SQL_TIME = new TimeSerializer();
069: private static final Serializer SQL_TIMESTAMP = new TimestampSerializer();
070:
071: // CLOB: TODO
072: private static final Serializer SQL_BLOB = new BlobSerializer();
073:
074: private static final Serializer JAVA_SERIALIZABLE = new JavaSerializableSerializer();
075:
076: /** Serializer returned for nulls. Public static final so we can == */
077: private static final Serializer UNKNOWN_TYPE = new UndefinedSerializer();
078:
079: private static final int STREAM_BUF_SIZE = 65536;
080:
081: // java.net.URL: TODO
082:
083: /** Abstract class hiding type-specific serialization methods and information */
084: public abstract static class Serializer {
085: protected TypeTag typeTag;
086:
087: /**
088: * Special value: non-existing serializer of null references.
089: *
090: * @return true if undefined
091: */
092: public boolean isUndefined() {
093: return this == UNKNOWN_TYPE;
094: }
095:
096: /**
097: * @return the corresponding TypeTag
098: */
099: public TypeTag getTypeTag() {
100: return typeTag;
101: }
102:
103: /**
104: * Serialize the object to the stream. Warning: the caller must ensure that
105: * Serializer subtype and Object subtype are compatible.
106: *
107: * @param obj object to send
108: * @param output Output stream
109: * @throws IOException stream error
110: * @throws ClassCastException wrong serializer for this object type
111: */
112:
113: public abstract void sendToStream(Object obj,
114: LongUTFDataOutputStream output) throws IOException,
115: ClassCastException;
116:
117: /**
118: * De-serialize an object from the stream. Warning: the caller must ensure
119: * that Serializer subtype and the incoming object are compatible.
120: *
121: * @param input Input stream
122: * @return the object received from the stream
123: * @throws IOException stream error
124: */
125: public abstract Object receiveFromStream(
126: LongUTFDataInputStream input) throws IOException;
127:
128: }
129:
130: /**
131: * Returns the de/serializer appropriate for the given TypeTag, or for the
132: * type of the given SQL object if argument is not a TypeTag (TypeTag already
133: * knows how to serialize itself).
134: *
135: * @param sqlObjOrTypeTag a typetag or a sample SQL object of the type of
136: * interest
137: * @return appropriate serialization + deserialization methods. Returns the
138: * UNKNOWN_TYPE serializer on "null" arg.
139: * @throws NotImplementedException if we don't know how to serialize objects
140: * such as the given one. Set with a default message but caller has
141: * generally more useful information and should catch and replace
142: * (not even chain) this NotImplementedException by a better one.
143: * @throws IllegalArgumentException if we gave a wrong TypeTag
144: */
145: public static Serializer getSerializer(Object sqlObjOrTypeTag)
146: throws NotImplementedException, IllegalArgumentException {
147: return getSerializerImpl(sqlObjOrTypeTag);
148: }
149:
150: /** @see #getSerializer(Object) */
151: public static Serializer getSerializer(TypeTag t)
152: throws IllegalArgumentException {
153: try {
154: return getSerializerImpl(t);
155: } catch (NotImplementedException nie) {
156: // since we passed a TypeTag it's really impossible to come here
157: IllegalArgumentException ipe = new IllegalArgumentException(
158: "Internal bug: there should be a serializer available for every"
159: + " existing TypeTag, including:" + t);
160: ipe.initCause(nie);
161: throw ipe;
162: }
163: }
164:
165: private static Serializer getSerializerImpl(Object sqlObjOrTypeTag)
166: throws NotImplementedException, IllegalArgumentException {
167: /*
168: * Default values that never match anything: we just need any reference that
169: * is both non-SQL and non-null.
170: */
171: TypeTag tag = TypeTag.CONTROLLER_READY;
172: Object obj = JAVA_STRING;
173:
174: /**
175: * Now let's get rid of all these nasty type issues for good by casting once
176: * for all.
177: */
178: if (sqlObjOrTypeTag instanceof TypeTag)
179: tag = (TypeTag) sqlObjOrTypeTag;
180: else
181: obj = sqlObjOrTypeTag;
182:
183: if (obj == null || TypeTag.UNDEFINED.equals(tag))
184: return UNKNOWN_TYPE;
185:
186: /**
187: * THE big switch on (type). "instanceof" is used on the serialization side,
188: * "TypeTag.equals()" is used on the de-serialization side. We could for
189: * performance split this method into two different methods (with the added
190: * burden of keeping them perfectly synchronized)
191: *
192: * @see TypeTag
193: */
194: // STRING
195: if (obj instanceof String || TypeTag.STRING.equals(tag))
196: return JAVA_STRING;
197:
198: // BIGDECIMAL
199: if (obj instanceof BigDecimal || TypeTag.BIGDECIMAL.equals(tag))
200: return MATH_BIGDECIMAL;
201:
202: // BOOLEAN
203: if (obj instanceof Boolean || TypeTag.BOOLEAN.equals(tag))
204: return JAVA_BOOLEAN;
205:
206: // INTEGER
207: if (obj instanceof Integer || TypeTag.INTEGER.equals(tag))
208: return JAVA_INTEGER;
209:
210: // LONG
211: if (obj instanceof Long || TypeTag.LONG.equals(tag))
212: return JAVA_LONG;
213:
214: // FLOAT
215: if (obj instanceof Float || TypeTag.FLOAT.equals(tag))
216: return JAVA_FLOAT;
217:
218: // DOUBLE
219: if (obj instanceof Double || TypeTag.DOUBLE.equals(tag))
220: return JAVA_DOUBLE;
221:
222: // BYTE ARRAY
223: if (obj instanceof byte[] || TypeTag.BYTE_ARRAY.equals(tag))
224: return JAVA_BYTES;
225:
226: // DATE
227: if (obj instanceof java.sql.Date
228: || TypeTag.SQL_DATE.equals(tag))
229: return SQL_DATE;
230:
231: // TIME
232: if (obj instanceof java.sql.Time
233: || TypeTag.SQL_TIME.equals(tag))
234: return SQL_TIME;
235:
236: // TIMESTAMP
237: if (obj instanceof Timestamp
238: || TypeTag.SQL_TIMESTAMP.equals(tag))
239: return SQL_TIMESTAMP;
240:
241: // CLOB: TODO
242: if (obj instanceof Clob || TypeTag.CLOB.equals(tag))
243: throw new NotImplementedException(
244: "Clob serialization not yet implemented");
245:
246: // BLOB
247: if (obj instanceof java.sql.Blob || TypeTag.BLOB.equals(tag))
248: return SQL_BLOB;
249:
250: // java.net.URL: TODO
251:
252: // Serializable Java object, MUST be last!
253: if (sqlObjOrTypeTag instanceof Serializable
254: || TypeTag.JAVA_SERIALIZABLE.equals(tag))
255: return JAVA_SERIALIZABLE;
256:
257: if (sqlObjOrTypeTag instanceof TypeTag)
258: throw new IllegalArgumentException(
259: "Internal error: getSerializer() misused with unknown TypeTag argument:"
260: + tag);
261:
262: // An alternative could be to tag only the problematic column, return a
263: // "do-nothing" serializer and send the rest of the result set anyway.
264:
265: // Should be replaced by caller, see javadoc above
266: throw new NotImplementedException(
267: "Unable to serialize unknown type "
268: + sqlObjOrTypeTag.getClass() + " of object "
269: + sqlObjOrTypeTag);
270: }
271:
272: /*
273: * These classes define one serializer per type
274: */
275:
276: // STRING
277: private static final class StringSerializer extends Serializer {
278: {
279: typeTag = TypeTag.STRING;
280: }
281:
282: public void sendToStream(Object obj,
283: LongUTFDataOutputStream output) throws IOException {
284: output.writeLongUTF((String) obj);
285: }
286:
287: public Object receiveFromStream(LongUTFDataInputStream input)
288: throws IOException {
289: return input.readLongUTF();
290:
291: }
292: }
293:
294: // we serialize this by sending the value as 4-bytes packs inside ints,
295: // then by sending the scale as an integer
296: private static final class BigDecimalBytesSerializer extends
297: Serializer {
298: {
299: typeTag = TypeTag.BIGDECIMAL;
300: }
301:
302: public void sendToStream(Object obj,
303: LongUTFDataOutputStream output) throws IOException {
304: // A BigDecimal is a BigInteger called 'value' and a scale
305: // To serialize it, we first convert the value to a byte array using
306: // BigInteger.toByteArray(). Then we serialize it using integers (for
307: // perfomance purpose)
308: // NOTE: we cannot directly use the int array reprensentation of the
309: // BigIntegers because there is no public accessor for it (!!!)
310:
311: // The array of bytes returned by .toByteArray() is the two's-complement
312: // representation of the BigInteger in big-endian byte-order.
313: // But to save everyone's time and sweat, we actually split the
314: // BigInteger into its sign and absolute value, avoiding the useless cost
315: // of computing the two's complement for both the sender and the receiver.
316:
317: // To send this array of bytes in integers, we have to align the bytes.
318: // This is done by padding the first bytes
319: // For example, let's take the byte array AA BB CC DD EE (hexa values)
320: // We will need two integers for these five bytes:
321: // i1, the first integer, will be padded until byte array tail is aligned,
322: // so i1 will be equal to '00 00 00 AA' while i2 will be 'BB CC DD EE'
323:
324: // Number to serialize
325: BigDecimal toBeSerialized = (BigDecimal) obj;
326:
327: // 1. Send unscaled, absolute value:
328: // 1.1 Send byte array size
329: byte[] byteArray = toBeSerialized.unscaledValue().abs()
330: .toByteArray();
331: output.writeInt(byteArray.length);
332:
333: // 1.2 Compute head-padding. The padding is done on the beginning of the
334: // array. Zeros are send before first "real" byte in order to align the
335: // array on integers
336: int idx = 0, word = 0;
337: int padding = byteArray.length % 4;
338: if (padding > 0) {
339: // This should be hard to read so:
340: // bytes are shifted so that last byte is the least-significant byte of
341: // the integer 'word'. More materially:
342: // if padding is 1, no shift -> byte[0] is put at the tail of the int
343: // if padding is 2, shift first byte from 8 while 2nd has no shift
344: // if padding is 3, shift 1st byte from 16, 2nd from 8, none for 3rd
345: for (idx = 0; idx < padding; idx++) {
346: // 0xFF is needed because of sign extension
347: word |= (byteArray[idx] & 0xFF) << (8 * (padding
348: - idx - 1));
349: }
350: // let's write this padded bytes-as-integer
351: output.writeInt(word);
352: }
353: // 1.3 Send the rest of the byte array 4 bytes by 4 bytes in an int
354: // we start from the first aligned byte
355: for (; idx < byteArray.length; idx += 4) {
356: word = (byteArray[idx] & 0xFF) << 24
357: | (byteArray[idx + 1] & 0xFF) << 16
358: | (byteArray[idx + 2] & 0xFF) << 8
359: | byteArray[idx + 3] & 0xFF;
360: output.writeInt(word);
361: }
362:
363: // 1.4 Send sign as an int
364: output.writeInt(toBeSerialized.signum());
365:
366: // 2. Send scale
367: output.writeInt(toBeSerialized.scale());
368: }
369:
370: public Object receiveFromStream(LongUTFDataInputStream input)
371: throws IOException {
372: // new method - Deserialize int value then scale
373: // 1. Read intVal:
374: // 1.1 Read byte array length
375: int byteArrayLength = input.readInt();
376: byte[] byteArray = new byte[byteArrayLength];
377: // 1.2 Compute padding
378: int idx = 0, wordRead = 0;
379: int padding = byteArrayLength % 4;
380: // If there is a padding, we must read the first 'padding' bytes
381: // so we are aligned for the rest of the bytes
382: if (padding > 0) {
383: wordRead = input.readInt();
384: for (idx = 0; idx < padding; idx++) {
385: byteArray[idx] = (byte) ((wordRead >> (8 * (padding
386: - idx - 1))) & 0xFF);
387: }
388: }
389: // 1.3 Read the byte array from integers
390: // we start from the first aligned byte
391: for (; idx < byteArrayLength; idx += 4) {
392: wordRead = input.readInt();
393: byteArray[idx] = (byte) ((wordRead >> 24) & 0xFF);
394: byteArray[idx + 1] = (byte) ((wordRead >> 16) & 0xFF);
395: byteArray[idx + 2] = (byte) ((wordRead >> 8) & 0xFF);
396: byteArray[idx + 3] = (byte) (wordRead & 0xFF);
397: }
398: BigInteger intVal = new BigInteger(byteArray);
399:
400: // 1.4 read sign as an int
401: if (input.readInt() < 0)
402: intVal = intVal.negate();
403:
404: // 2. Read scale
405: int scale = input.readInt();
406:
407: return new BigDecimal(intVal, scale);
408: }
409: }
410:
411: // BOOLEAN
412: private static final class BooleanSerializer extends Serializer {
413: {
414: typeTag = TypeTag.BOOLEAN;
415: }
416:
417: public void sendToStream(Object obj,
418: LongUTFDataOutputStream output) throws IOException {
419: output.writeBoolean(((Boolean) obj).booleanValue());
420: }
421:
422: public Object receiveFromStream(LongUTFDataInputStream input)
423: throws IOException {
424: return new Boolean(input.readBoolean());
425: }
426: }
427:
428: // INTEGER
429: private static final class IntegerSerializer extends Serializer {
430: {
431: typeTag = TypeTag.INTEGER;
432: }
433:
434: public void sendToStream(Object obj,
435: LongUTFDataOutputStream output) throws IOException {
436: /**
437: * let's also accept Short, see PostgreSQL bug explained here
438: *
439: * @see org.continuent.sequoia.driver.DriverResultSet#initSerializers()
440: */
441: output.writeInt(((Number) obj).intValue());
442: }
443:
444: public Object receiveFromStream(LongUTFDataInputStream input)
445: throws IOException {
446: return new Integer(input.readInt());
447: }
448: }
449:
450: // LONG
451: private static final class LongSerializer extends Serializer {
452: {
453: typeTag = TypeTag.LONG;
454: }
455:
456: public void sendToStream(Object obj,
457: LongUTFDataOutputStream output) throws IOException {
458: output.writeLong(((Long) obj).longValue());
459: }
460:
461: public Object receiveFromStream(LongUTFDataInputStream input)
462: throws IOException {
463: return new Long(input.readLong());
464: }
465: }
466:
467: // FLOAT
468: private static final class FloatSerializer extends Serializer {
469: {
470: typeTag = TypeTag.FLOAT;
471: }
472:
473: public void sendToStream(Object obj,
474: LongUTFDataOutputStream output) throws IOException {
475: output.writeFloat(((Float) obj).floatValue());
476: }
477:
478: public Object receiveFromStream(LongUTFDataInputStream input)
479: throws IOException {
480: return new Float(input.readFloat());
481: }
482: }
483:
484: // DOUBLE
485: private static final class DoubleSerializer extends Serializer {
486: {
487: typeTag = TypeTag.DOUBLE;
488: }
489:
490: public void sendToStream(Object obj,
491: LongUTFDataOutputStream output) throws IOException {
492: output.writeDouble(((Double) obj).doubleValue());
493: }
494:
495: public Object receiveFromStream(LongUTFDataInputStream input)
496: throws IOException {
497: return new Double(input.readDouble());
498: }
499: }
500:
501: // BYTE ARRAY
502: private static final class BytesSerializer extends Serializer {
503: {
504: typeTag = TypeTag.BYTE_ARRAY;
505: }
506:
507: public void sendToStream(Object obj,
508: LongUTFDataOutputStream output) throws IOException {
509: byte[] b = (byte[]) obj;
510: output.writeInt(b.length);
511: output.write(b);
512: }
513:
514: public Object receiveFromStream(LongUTFDataInputStream input)
515: throws IOException {
516: int len = input.readInt();
517: byte[] b = new byte[len];
518: input.readFully(b);
519: return b;
520: }
521: }
522:
523: // DATE
524: private static final class DateSerializer extends Serializer {
525: {
526: typeTag = TypeTag.SQL_DATE;
527: }
528:
529: public void sendToStream(Object obj,
530: LongUTFDataOutputStream output) throws IOException {
531: output.writeLong(((java.sql.Date) obj).getTime());
532: }
533:
534: public Object receiveFromStream(LongUTFDataInputStream input)
535: throws IOException {
536: return new java.sql.Date(input.readLong());
537: }
538: }
539:
540: // TIME
541: private static final class TimeSerializer extends Serializer {
542: {
543: typeTag = TypeTag.SQL_TIME;
544: }
545:
546: public void sendToStream(Object obj,
547: LongUTFDataOutputStream output) throws IOException {
548: output.writeInt((int) ((java.sql.Time) obj).getTime());
549: }
550:
551: public Object receiveFromStream(LongUTFDataInputStream input)
552: throws IOException {
553: return new java.sql.Time(input.readInt());
554: }
555: }
556:
557: // TIMESTAMP
558: private static final class TimestampSerializer extends Serializer {
559: {
560: typeTag = TypeTag.SQL_TIMESTAMP;
561: }
562:
563: public void sendToStream(Object obj,
564: LongUTFDataOutputStream output) throws IOException {
565: Timestamp ts = (Timestamp) obj;
566: // put the milliseconds trick/CPU load on the driver side
567: output.writeLong(ts.getTime());
568: output.writeInt(ts.getNanos());
569: }
570:
571: public Object receiveFromStream(LongUTFDataInputStream input)
572: throws IOException {
573: long tsWithMilli = input.readLong();
574: // we don't want the milliseconds twice
575: Timestamp ts = new Timestamp((tsWithMilli / 1000) * 1000);
576: ts.setNanos(input.readInt());
577: return ts;
578: }
579: }
580:
581: // CLOB: TODO
582:
583: // BLOB
584: private static final class BlobSerializer extends Serializer {
585: {
586: typeTag = TypeTag.BLOB;
587: }
588:
589: public void sendToStream(Object obj,
590: LongUTFDataOutputStream output) throws IOException {
591: java.sql.Blob blob = (java.sql.Blob) obj;
592: try {
593: // Be very careful to be compatible with JAVA_BYTES.sendToStream(),
594: // since we use JAVA_BYTES.receiveFromStream on the other side.
595: // We don't use it on this side to save memory.
596:
597: if (blob.length() > Integer.MAX_VALUE)
598: // FIXME: this is currently corrupting protocol with driver
599: throw new IOException("Blobs bigger than "
600: + Integer.MAX_VALUE + " are not supported");
601:
602: // send the size of the byte array
603: output.writeInt((int) blob.length());
604:
605: byte[] tempBuffer = new byte[STREAM_BUF_SIZE];
606: java.io.InputStream input = blob.getBinaryStream();
607: int nbRead;
608: while (true) {
609: nbRead = input.read(tempBuffer);
610: if (-1 == nbRead)
611: break;
612: output.write(tempBuffer, 0, nbRead);
613: }
614: } catch (SQLException e) {
615: // Exceptions for Blobs is unfortunately tricky because we can't know in
616: // advance if a java array will be big enough (2^31) to hold them.
617: throw (IOException) new IOException(e
618: .getLocalizedMessage()).initCause(e);
619: }
620: }
621:
622: public Object receiveFromStream(LongUTFDataInputStream input)
623: throws IOException {
624: byte[] b = (byte[]) JAVA_BYTES.receiveFromStream(input);
625: return new org.continuent.sequoia.common.protocol.ByteArrayBlob(
626: b);
627: }
628: }
629:
630: // JAVA_SERIALIZABLE
631: private static final class JavaSerializableSerializer extends
632: Serializer {
633: {
634: typeTag = TypeTag.JAVA_SERIALIZABLE;
635: }
636:
637: public void sendToStream(Object obj,
638: LongUTFDataOutputStream output) throws IOException {
639: ByteArrayOutputStream baos = new ByteArrayOutputStream();
640: ObjectOutputStream oos = new ObjectOutputStream(baos);
641: oos.writeObject(obj);
642: oos.close();
643:
644: // send first the size of the byte array
645: output.writeInt(baos.size());
646: baos.writeTo(output);
647: baos.close();
648: }
649:
650: public Object receiveFromStream(LongUTFDataInputStream input)
651: throws IOException {
652: byte[] b = (byte[]) JAVA_BYTES.receiveFromStream(input);
653: ObjectInputStream ois = new ObjectInputStream(
654: new ByteArrayInputStream(b));
655: try {
656: Object obj = ois.readObject();
657: return obj;
658: } catch (ClassNotFoundException e) {
659: ClassCastException ioe = new ClassCastException(
660: "Class of deserialized object not found");
661: ioe.initCause(e);
662: throw ioe;
663: }
664: }
665: }
666:
667: // UNKNOWN TYPE
668: private static final class UndefinedSerializer extends Serializer {
669: {
670: typeTag = TypeTag.UNDEFINED;
671: }
672:
673: public void sendToStream(Object obj,
674: LongUTFDataOutputStream output) {
675: throw new RuntimeException(
676: "Internal bug: tried to send using the UNDEFINED serializer");
677: }
678:
679: public Object receiveFromStream(LongUTFDataInputStream input)
680: throws ClassCastException {
681: throw new RuntimeException(
682: "Internal bug: tried to receive using the UNDEFINED deserializer");
683: }
684: }
685:
686: }
|