001: /*
002: * Copyright 2004-2008 H2 Group. Licensed under the H2 License, Version 1.0
003: * (http://h2database.com/html/license.html).
004: * Initial Developer: H2 Group
005: */
006: package org.h2.value;
007:
008: import java.io.BufferedInputStream;
009: import java.io.BufferedOutputStream;
010: import java.io.BufferedWriter;
011: import java.io.DataInputStream;
012: import java.io.DataOutputStream;
013: import java.io.IOException;
014: import java.io.InputStream;
015: import java.io.OutputStreamWriter;
016: import java.io.Reader;
017: import java.io.Writer;
018: import java.math.BigDecimal;
019: import java.net.Socket;
020: import java.sql.Date;
021: import java.sql.ResultSet;
022: import java.sql.ResultSetMetaData;
023: import java.sql.SQLException;
024: import java.sql.Time;
025: import java.sql.Timestamp;
026:
027: import org.h2.constant.ErrorCode;
028: import org.h2.constant.SysProperties;
029: import org.h2.engine.Constants;
030: import org.h2.engine.SessionInterface;
031: import org.h2.message.Message;
032: import org.h2.message.TraceSystem;
033: import org.h2.tools.SimpleResultSet;
034: import org.h2.util.ExactUTF8InputStreamReader;
035: import org.h2.util.IOUtils;
036: import org.h2.util.StringCache;
037:
038: /**
039: * The transfer class is used to send and receive Value objects.
040: * It is used on both the client side, and on the server side.
041: */
042: public class Transfer {
043:
044: private static final int BUFFER_SIZE = 16 * 1024;
045: private static final int LOB_MAGIC = 0x1234;
046:
047: private SessionInterface session;
048: protected Socket socket;
049: protected DataInputStream in;
050: protected DataOutputStream out;
051:
052: public Transfer(SessionInterface session) {
053: this .session = session;
054: }
055:
056: public void setSocket(Socket s) {
057: socket = s;
058: }
059:
060: public void init() throws IOException {
061: in = new DataInputStream(new BufferedInputStream(socket
062: .getInputStream(), Transfer.BUFFER_SIZE));
063: out = new DataOutputStream(new BufferedOutputStream(socket
064: .getOutputStream(), Transfer.BUFFER_SIZE));
065: }
066:
067: public void flush() throws IOException {
068: out.flush();
069: }
070:
071: public Transfer writeBoolean(boolean x) throws IOException {
072: out.writeByte((byte) (x ? 1 : 0));
073: return this ;
074: }
075:
076: public boolean readBoolean() throws IOException {
077: return in.readByte() == 1;
078: }
079:
080: public Transfer writeByte(byte x) throws IOException {
081: out.writeByte(x);
082: return this ;
083: }
084:
085: public byte readByte() throws IOException {
086: return in.readByte();
087: }
088:
089: public Transfer writeInt(int i) throws IOException {
090: out.writeInt(i);
091: return this ;
092: }
093:
094: public int readInt() throws IOException {
095: return in.readInt();
096: }
097:
098: public Transfer writeLong(long i) throws IOException {
099: out.writeLong(i);
100: return this ;
101: }
102:
103: public long readLong() throws IOException {
104: return in.readLong();
105: }
106:
107: public Transfer writeDouble(double i) throws IOException {
108: out.writeDouble(i);
109: return this ;
110: }
111:
112: public Transfer writeFloat(float i) throws IOException {
113: out.writeFloat(i);
114: return this ;
115: }
116:
117: public double readDouble() throws IOException {
118: return in.readDouble();
119: }
120:
121: public float readFloat() throws IOException {
122: return in.readFloat();
123: }
124:
125: public Transfer writeString(String s) throws IOException {
126: if (s == null) {
127: out.writeInt(-1);
128: } else {
129: int len = s.length();
130: out.writeInt(len);
131: for (int i = 0; i < len; i++) {
132: out.writeChar(s.charAt(i));
133: }
134: }
135: return this ;
136: }
137:
138: public String readString() throws IOException {
139: int len = in.readInt();
140: if (len == -1) {
141: return null;
142: }
143: // TODO optimize: StringBuffer is synchronized, maybe use a char array
144: // (but that means more memory)
145: StringBuffer buff = new StringBuffer(len);
146: for (int i = 0; i < len; i++) {
147: buff.append(in.readChar());
148: }
149: String s = buff.toString();
150: s = StringCache.get(s);
151: return s;
152: }
153:
154: public Transfer writeBytes(byte[] data) throws IOException {
155: if (data == null) {
156: writeInt(-1);
157: } else {
158: writeInt(data.length);
159: out.write(data);
160: }
161: return this ;
162: }
163:
164: public byte[] readBytes() throws IOException {
165: int len = readInt();
166: if (len == -1) {
167: return null;
168: }
169: byte[] b = new byte[len];
170: in.readFully(b);
171: return b;
172: }
173:
174: public void close() {
175: if (socket != null) {
176: try {
177: out.flush();
178: if (socket != null) {
179: socket.close();
180: }
181: } catch (IOException e) {
182: TraceSystem.traceThrowable(e);
183: } finally {
184: socket = null;
185: }
186: }
187: }
188:
189: public void writeValue(Value v) throws IOException, SQLException {
190: int type = v.getType();
191: writeInt(type);
192: switch (type) {
193: case Value.NULL:
194: break;
195: case Value.BYTES:
196: case Value.JAVA_OBJECT:
197: writeBytes(v.getBytesNoCopy());
198: break;
199: case Value.UUID: {
200: ValueUuid uuid = (ValueUuid) v;
201: writeLong(uuid.getHigh());
202: writeLong(uuid.getLow());
203: break;
204: }
205: case Value.BOOLEAN:
206: writeBoolean(v.getBoolean().booleanValue());
207: break;
208: case Value.BYTE:
209: writeByte(v.getByte());
210: break;
211: case Value.TIME:
212: writeLong(v.getTimeNoCopy().getTime());
213: break;
214: case Value.DATE:
215: writeLong(v.getDateNoCopy().getTime());
216: break;
217: case Value.TIMESTAMP: {
218: Timestamp ts = v.getTimestampNoCopy();
219: writeLong(ts.getTime());
220: writeInt(ts.getNanos());
221: break;
222: }
223: case Value.DECIMAL:
224: writeString(v.getString());
225: break;
226: case Value.DOUBLE:
227: writeDouble(v.getDouble());
228: break;
229: case Value.FLOAT:
230: writeFloat(v.getFloat());
231: break;
232: case Value.INT:
233: writeInt(v.getInt());
234: break;
235: case Value.LONG:
236: writeLong(v.getLong());
237: break;
238: case Value.SHORT:
239: writeInt(v.getShort());
240: break;
241: case Value.STRING:
242: case Value.STRING_IGNORECASE:
243: case Value.STRING_FIXED:
244: writeString(v.getString());
245: break;
246: case Value.BLOB: {
247: long length = v.getPrecision();
248: if (SysProperties.CHECK && length < 0) {
249: throw Message.getInternalError("length: " + length);
250: }
251: writeLong(length);
252: InputStream in = v.getInputStream();
253: long written = IOUtils.copyAndCloseInput(in, out);
254: if (SysProperties.CHECK && written != length) {
255: throw Message.getInternalError("length:" + length
256: + " written:" + written);
257: }
258: writeInt(LOB_MAGIC);
259: break;
260: }
261: case Value.CLOB: {
262: long length = v.getPrecision();
263: if (SysProperties.CHECK && length < 0) {
264: throw Message.getInternalError("length: " + length);
265: }
266: writeLong(length);
267: Reader reader = v.getReader();
268: // below, writer.flush needs to be called to ensure the buffer is written
269: // but, this will also flush the output stream, and this slows things down
270: // so construct an output stream that will ignore this chained flush call
271: java.io.OutputStream out2 = new java.io.FilterOutputStream(
272: out) {
273: public void flush() {
274: }
275: };
276: Writer writer = new BufferedWriter(new OutputStreamWriter(
277: out2, Constants.UTF8));
278: long written = IOUtils.copyAndCloseInput(reader, writer);
279: if (SysProperties.CHECK && written != length) {
280: throw Message.getInternalError("length:" + length
281: + " written:" + written);
282: }
283: writer.flush();
284: writeInt(LOB_MAGIC);
285: break;
286: }
287: case Value.ARRAY: {
288: Value[] list = ((ValueArray) v).getList();
289: writeInt(list.length);
290: for (int i = 0; i < list.length; i++) {
291: writeValue(list[i]);
292: }
293: break;
294: }
295: case Value.RESULT_SET: {
296: ResultSet rs = ((ValueResultSet) v).getResultSet();
297: rs.beforeFirst();
298: ResultSetMetaData meta = rs.getMetaData();
299: int columnCount = meta.getColumnCount();
300: writeInt(columnCount);
301: for (int i = 0; i < columnCount; i++) {
302: writeString(meta.getColumnName(i + 1));
303: writeInt(meta.getColumnType(i + 1));
304: writeInt(meta.getPrecision(i + 1));
305: writeInt(meta.getScale(i + 1));
306: }
307: while (rs.next()) {
308: writeBoolean(true);
309: for (int i = 0; i < columnCount; i++) {
310: int t = DataType.convertSQLTypeToValueType(meta
311: .getColumnType(i + 1));
312: Value val = DataType.readValue(session, rs, i + 1,
313: t);
314: writeValue(val);
315: }
316: }
317: writeBoolean(false);
318: rs.beforeFirst();
319: break;
320: }
321: default:
322: throw Message.getInternalError("type=" + type);
323: }
324: }
325:
326: public Value readValue() throws IOException, SQLException {
327: int type = readInt();
328: switch (type) {
329: case Value.NULL:
330: return ValueNull.INSTANCE;
331: case Value.BYTES:
332: return ValueBytes.getNoCopy(readBytes());
333: case Value.UUID:
334: return ValueUuid.get(readLong(), readLong());
335: case Value.JAVA_OBJECT:
336: return ValueJavaObject.getNoCopy(readBytes());
337: case Value.BOOLEAN:
338: return ValueBoolean.get(readBoolean());
339: case Value.BYTE:
340: return ValueByte.get(readByte());
341: case Value.DATE:
342: return ValueDate.getNoCopy(new Date(readLong()));
343: case Value.TIME:
344: return ValueTime.getNoCopy(new Time(readLong()));
345: case Value.TIMESTAMP: {
346: Timestamp ts = new Timestamp(readLong());
347: ts.setNanos(readInt());
348: return ValueTimestamp.getNoCopy(ts);
349: }
350: case Value.DECIMAL:
351: return ValueDecimal.get(new BigDecimal(readString()));
352: case Value.DOUBLE:
353: return ValueDouble.get(readDouble());
354: case Value.FLOAT:
355: return ValueFloat.get(readFloat());
356: case Value.INT:
357: return ValueInt.get(readInt());
358: case Value.LONG:
359: return ValueLong.get(readLong());
360: case Value.SHORT:
361: return ValueShort.get((short) readInt());
362: case Value.STRING:
363: return ValueString.get(readString());
364: case Value.STRING_IGNORECASE:
365: return ValueStringIgnoreCase.get(readString());
366: case Value.STRING_FIXED:
367: return ValueStringFixed.get(readString());
368: case Value.BLOB: {
369: long length = readLong();
370: ValueLob v = ValueLob.createBlob(in, length, session
371: .getDataHandler());
372: if (readInt() != LOB_MAGIC) {
373: throw Message
374: .getSQLException(ErrorCode.CONNECTION_BROKEN);
375: }
376: return v;
377: }
378: case Value.CLOB: {
379: long length = readLong();
380: ValueLob v = ValueLob.createClob(
381: new ExactUTF8InputStreamReader(in), length, session
382: .getDataHandler());
383: if (readInt() != LOB_MAGIC) {
384: throw Message
385: .getSQLException(ErrorCode.CONNECTION_BROKEN);
386: }
387: return v;
388: }
389: case Value.ARRAY: {
390: int len = readInt();
391: Value[] list = new Value[len];
392: for (int i = 0; i < len; i++) {
393: list[i] = readValue();
394: }
395: return ValueArray.get(list);
396: }
397: case Value.RESULT_SET: {
398: SimpleResultSet rs = new SimpleResultSet();
399: int columns = readInt();
400: for (int i = 0; i < columns; i++) {
401: rs.addColumn(readString(), readInt(), readInt(),
402: readInt());
403: }
404: while (true) {
405: if (!readBoolean()) {
406: break;
407: }
408: Object[] o = new Object[columns];
409: for (int i = 0; i < columns; i++) {
410: o[i] = readValue().getObject();
411: }
412: rs.addRow(o);
413: }
414: return ValueResultSet.get(rs);
415: }
416: default:
417: throw Message.getInternalError("type=" + type);
418: }
419: }
420:
421: public Socket getSocket() {
422: return socket;
423: }
424:
425: public void setSession(SessionInterface session) {
426: this.session = session;
427: }
428:
429: }
|