001: /*
002: * Copyright 2004-2008 H2 Group. Licensed under the H2 License, Version 1.0
003: * (http://h2database.com/html/license.html).
004: * Initial Developer: H2 Group
005: */
006: package org.h2.server.pg;
007:
008: import java.io.BufferedReader;
009: import java.io.ByteArrayInputStream;
010: import java.io.ByteArrayOutputStream;
011: import java.io.DataInputStream;
012: import java.io.DataOutputStream;
013: import java.io.EOFException;
014: import java.io.IOException;
015: import java.io.InputStream;
016: import java.io.InputStreamReader;
017: import java.io.OutputStream;
018: import java.io.Reader;
019: import java.io.StringReader;
020: import java.net.Socket;
021: import java.sql.Connection;
022: import java.sql.ParameterMetaData;
023: import java.sql.PreparedStatement;
024: import java.sql.ResultSet;
025: import java.sql.ResultSetMetaData;
026: import java.sql.SQLException;
027: import java.sql.Statement;
028: import java.sql.Types;
029: import java.util.HashMap;
030: import java.util.HashSet;
031:
032: import org.h2.constant.SysProperties;
033: import org.h2.engine.ConnectionInfo;
034: import org.h2.jdbc.JdbcConnection;
035: import org.h2.util.IOUtils;
036: import org.h2.util.JdbcUtils;
037: import org.h2.util.ObjectUtils;
038: import org.h2.util.ScriptReader;
039:
040: /**
041: * One server thread is opened for each client.
042: */
043: public class PgServerThread implements Runnable {
044: private static final int TYPE_STRING = Types.VARCHAR;
045: private PgServer server;
046: private Socket socket;
047: private Connection conn;
048: private boolean stop;
049: private DataInputStream dataInRaw;
050: private DataInputStream dataIn;
051: private OutputStream out;
052: private int messageType;
053: private ByteArrayOutputStream outBuffer;
054: private DataOutputStream dataOut;
055: private Thread thread;
056: private boolean initDone;
057: private String userName;
058: private String databaseName;
059: private int processId;
060: private String clientEncoding = "UTF-8";
061: private String dateStyle = "ISO";
062: private HashMap prepared = new HashMap();
063: private HashMap portals = new HashMap();
064: private HashSet types = new HashSet();
065:
066: PgServerThread(Socket socket, PgServer server) {
067: this .server = server;
068: this .socket = socket;
069: }
070:
071: public void run() {
072: try {
073: server.log("Connect");
074: InputStream ins = socket.getInputStream();
075: out = socket.getOutputStream();
076: dataInRaw = new DataInputStream(ins);
077: while (!stop) {
078: process();
079: out.flush();
080: }
081: } catch (EOFException e) {
082: // more or less normal disconnect
083: } catch (Exception e) {
084: error("process", e);
085: server.logError(e);
086: } finally {
087: server.log("Disconnect");
088: close();
089: }
090: }
091:
092: private String readString() throws IOException {
093: ByteArrayOutputStream buff = new ByteArrayOutputStream();
094: while (true) {
095: int x = dataIn.read();
096: if (x <= 0) {
097: break;
098: }
099: buff.write(x);
100: }
101: return new String(buff.toByteArray(), getEncoding());
102: }
103:
104: private int readInt() throws IOException {
105: return dataIn.readInt();
106: }
107:
108: private int readShort() throws IOException {
109: return dataIn.readShort();
110: }
111:
112: private byte readByte() throws IOException {
113: return dataIn.readByte();
114: }
115:
116: private void readFully(byte[] buff) throws IOException {
117: dataIn.readFully(buff);
118: }
119:
120: private void error(String message, Exception e) {
121: if (e != null) {
122: server.logError(e);
123: }
124: }
125:
126: private void process() throws IOException {
127: int x;
128: if (initDone) {
129: x = dataInRaw.read();
130: if (x < 0) {
131: stop = true;
132: return;
133: }
134: } else {
135: x = 0;
136: }
137: int len = dataInRaw.readInt();
138: len -= 4;
139: byte[] data = new byte[len];
140: dataInRaw.readFully(data, 0, len);
141: dataIn = new DataInputStream(new ByteArrayInputStream(data, 0,
142: len));
143: switch (x) {
144: case 0:
145: server.log("Init");
146: int version = readInt();
147: if (version == 80877102) {
148: server.log("CancelRequest (not supported)");
149: server.log(" pid: " + readInt());
150: server.log(" key: " + readInt());
151: error("CancelRequest", null);
152: } else if (version == 80877103) {
153: server.log("SSLRequest");
154: out.write('N');
155: } else {
156: server.log("StartupMessage");
157: server.log(" version " + version + " ("
158: + (version >> 16) + "." + (version & 0xff)
159: + ")");
160: while (true) {
161: String param = readString();
162: if (param.length() == 0) {
163: break;
164: }
165: String value = readString();
166: if ("user".equals(param)) {
167: this .userName = value;
168: } else if ("database".equals(param)) {
169: this .databaseName = value;
170: } else if ("client_encoding".equals(param)) {
171: clientEncoding = value;
172: } else if ("DateStyle".equals(param)) {
173: dateStyle = value;
174: }
175: // server.log(" param " + param + "=" + value);
176: }
177: sendAuthenticationCleartextPassword();
178: initDone = true;
179: }
180: break;
181: case 'p': {
182: server.log("PasswordMessage");
183: String password = readString();
184: try {
185: ConnectionInfo ci = new ConnectionInfo(databaseName);
186: String baseDir = server.getBaseDir();
187: if (baseDir == null) {
188: baseDir = SysProperties.getBaseDir();
189: }
190: if (baseDir != null) {
191: ci.setBaseDir(baseDir);
192: }
193: if (server.getIfExists()) {
194: ci.setProperty("IFEXISTS", "TRUE");
195: }
196: ci.setProperty("MODE", "PostgreSQL");
197: ci.setOriginalURL("jdbc:h2:" + databaseName
198: + ";MODE=PostgreSQL");
199: ci.setUserName(userName);
200: ci.setProperty("PASSWORD", password);
201: ci.readPasswords();
202: conn = new JdbcConnection(ci, false);
203: // can not do this because when called inside
204: // DriverManager.getConnection, a deadlock occurs
205: // conn = DriverManager.getConnection(url, userName, password);
206: initDb();
207: sendAuthenticationOk();
208: } catch (SQLException e) {
209: e.printStackTrace();
210: stop = true;
211: }
212: break;
213: }
214: case 'P': {
215: server.log("Parse");
216: Prepared p = new Prepared();
217: p.name = readString();
218: p.sql = getSQL(readString());
219: int count = readShort();
220: p.paramType = new int[count];
221: for (int i = 0; i < count; i++) {
222: int type = readInt();
223: checkType(type);
224: p.paramType[i] = type;
225: }
226: try {
227: p.prep = conn.prepareStatement(p.sql);
228: prepared.put(p.name, p);
229: sendParseComplete();
230: } catch (SQLException e) {
231: sendErrorResponse(e);
232: }
233: break;
234: }
235: case 'B': {
236: server.log("Bind");
237: Portal portal = new Portal();
238: portal.name = readString();
239: String prepName = readString();
240: Prepared prep = (Prepared) prepared.get(prepName);
241: if (prep == null) {
242: sendErrorResponse("Portal not found");
243: break;
244: }
245: portal.sql = prep.sql;
246: portal.prep = prep.prep;
247: portals.put(portal.name, portal);
248: int formatCodeCount = readShort();
249: int[] formatCodes = new int[formatCodeCount];
250: for (int i = 0; i < formatCodeCount; i++) {
251: formatCodes[i] = readShort();
252: }
253: int paramCount = readShort();
254: for (int i = 0; i < paramCount; i++) {
255: int paramLen = readInt();
256: byte[] d2 = new byte[paramLen];
257: readFully(d2);
258: try {
259: setParameter(portal.prep, i, d2, formatCodes);
260: } catch (SQLException e) {
261: sendErrorResponse(e);
262: }
263: }
264: int resultCodeCount = readShort();
265: portal.resultColumnFormat = new int[resultCodeCount];
266: for (int i = 0; i < resultCodeCount; i++) {
267: portal.resultColumnFormat[i] = readShort();
268: }
269: sendBindComplete();
270: break;
271: }
272: case 'D': {
273: char type = (char) readByte();
274: String name = readString();
275: server.log("Describe");
276: PreparedStatement prep;
277: if (type == 'S') {
278: Prepared p = (Prepared) prepared.get(name);
279: if (p == null) {
280: sendErrorResponse("Prepared not found: " + name);
281: }
282: prep = p.prep;
283: sendParameterDescription(p);
284: } else if (type == 'P') {
285: Portal p = (Portal) portals.get(name);
286: if (p == null) {
287: sendErrorResponse("Portal not found: " + name);
288: }
289: prep = p.prep;
290: try {
291: ResultSetMetaData meta = prep.getMetaData();
292: sendRowDescription(meta);
293: } catch (SQLException e) {
294: sendErrorResponse(e);
295: }
296: } else {
297: error("expected S or P, got " + type, null);
298: sendErrorResponse("expected S or P");
299: }
300: break;
301: }
302: case 'E': {
303: String name = readString();
304: server.log("Execute");
305: Portal p = (Portal) portals.get(name);
306: if (p == null) {
307: sendErrorResponse("Portal not found: " + name);
308: break;
309: }
310: int maxRows = readShort();
311: PreparedStatement prep = p.prep;
312: server.log(p.sql);
313: try {
314: prep.setMaxRows(maxRows);
315: boolean result = prep.execute();
316: if (result) {
317: try {
318: ResultSet rs = prep.getResultSet();
319: ResultSetMetaData meta = rs.getMetaData();
320: sendRowDescription(meta);
321: while (rs.next()) {
322: sendDataRow(p.resultColumnFormat, rs);
323: }
324: sendCommandComplete(p.sql, 0);
325: } catch (SQLException e) {
326: sendErrorResponse(e);
327: }
328: } else {
329: sendCommandComplete(p.sql, prep.getUpdateCount());
330: }
331: } catch (SQLException e) {
332: sendErrorResponse(e);
333: }
334: break;
335: }
336: case 'S': {
337: server.log("Sync");
338: sendReadyForQuery();
339: break;
340: }
341: case 'Q': {
342: server.log("Query");
343: String query = readString();
344: ScriptReader reader = new ScriptReader(new StringReader(
345: query));
346: while (true) {
347: Statement stat = null;
348: try {
349: String s = reader.readStatement();
350: if (s == null) {
351: break;
352: }
353: s = getSQL(s);
354: stat = conn.createStatement();
355: boolean result = stat.execute(s);
356: if (result) {
357: ResultSet rs = stat.getResultSet();
358: ResultSetMetaData meta = rs.getMetaData();
359: sendRowDescription(meta);
360: while (rs.next()) {
361: sendDataRow(null, rs);
362: }
363: sendCommandComplete(s, 0);
364: } else {
365: sendCommandComplete(s, stat.getUpdateCount());
366: }
367: } catch (SQLException e) {
368: sendErrorResponse(e);
369: } finally {
370: JdbcUtils.closeSilently(stat);
371: }
372: }
373: sendReadyForQuery();
374: break;
375: }
376: case 'X': {
377: server.log("Terminate");
378: close();
379: break;
380: }
381: default:
382: error("Unsupported: " + x + " (" + (char) x + ")", null);
383: break;
384: }
385: }
386:
387: private void checkType(int type) {
388: if (types.contains(ObjectUtils.getInteger(type))) {
389: error("Unsupported type: " + type, null);
390: }
391: }
392:
393: private String getSQL(String s) {
394: String lower = s.toLowerCase();
395: if (lower.startsWith("show max_identifier_length")) {
396: s = "CALL 63";
397: } else if (lower.startsWith("set client_encoding to")) {
398: s = "set DATESTYLE ISO";
399: }
400: // s = StringUtils.replaceAll(s, "i.indkey[ia.attnum-1]", "0");
401: if (server.getLog()) {
402: server.log(s + ";");
403: }
404: return s;
405: }
406:
407: private void sendCommandComplete(String sql, int updateCount)
408: throws IOException {
409: startMessage('C');
410: sql = sql.trim().toUpperCase();
411: // TODO remove remarks at the beginning
412: String tag;
413: if (sql.startsWith("INSERT")) {
414: tag = "INSERT 0 " + updateCount;
415: } else if (sql.startsWith("DELETE")) {
416: tag = "DELETE " + updateCount;
417: } else if (sql.startsWith("UPDATE")) {
418: tag = "UPDATE " + updateCount;
419: } else if (sql.startsWith("SELECT") || sql.startsWith("CALL")) {
420: tag = "SELECT";
421: } else if (sql.startsWith("BEGIN")) {
422: tag = "BEGIN";
423: } else {
424: error("check command tag: " + sql, null);
425: tag = "UPDATE " + updateCount;
426: }
427: writeString(tag);
428: sendMessage();
429: }
430:
431: private void sendDataRow(int[] formatCodes, ResultSet rs)
432: throws IOException {
433: try {
434: int columns = rs.getMetaData().getColumnCount();
435: String[] values = new String[columns];
436: for (int i = 0; i < columns; i++) {
437: values[i] = rs.getString(i + 1);
438: }
439: startMessage('D');
440: writeShort(columns);
441: for (int i = 0; i < columns; i++) {
442: String s = values[i];
443: if (s == null) {
444: writeInt(-1);
445: } else {
446: // TODO write Binary data
447: byte[] d2 = s.getBytes(getEncoding());
448: writeInt(d2.length);
449: write(d2);
450: }
451: }
452: sendMessage();
453: } catch (SQLException e) {
454: sendErrorResponse(e);
455: }
456: }
457:
458: private String getEncoding() {
459: if ("UNICODE".equals(clientEncoding)) {
460: return "UTF-8";
461: }
462: return clientEncoding;
463: }
464:
465: private void setParameter(PreparedStatement prep, int i, byte[] d2,
466: int[] formatCodes) throws SQLException {
467: boolean text = (i >= formatCodes.length)
468: || (formatCodes[i] == 0);
469: String s;
470: try {
471: if (text) {
472: s = new String(d2, getEncoding());
473: } else {
474: server.logError(new SQLException(
475: "Binary format not supported"));
476: s = new String(d2, getEncoding());
477: }
478: } catch (Exception e) {
479: error("conversion error", e);
480: s = null;
481: }
482: // if(server.getLog()) {
483: // server.log(" " + i + ": " + s);
484: // }
485: prep.setString(i + 1, s);
486: }
487:
488: private void sendErrorResponse(SQLException e) throws IOException {
489: error("SQLException", e);
490: startMessage('E');
491: write('S');
492: writeString("ERROR");
493: write('C');
494: writeString(e.getSQLState());
495: write('M');
496: writeString(e.getMessage());
497: write('D');
498: writeString(e.toString());
499: write(0);
500: sendMessage();
501: }
502:
503: private void sendParameterDescription(Prepared p)
504: throws IOException {
505: try {
506: PreparedStatement prep = p.prep;
507: ParameterMetaData meta = prep.getParameterMetaData();
508: int count = meta.getParameterCount();
509: startMessage('t');
510: writeShort(count);
511: for (int i = 0; i < count; i++) {
512: int type;
513: if (p.paramType != null && p.paramType[i] != 0) {
514: type = p.paramType[i];
515: } else {
516: type = TYPE_STRING;
517: }
518: checkType(type);
519: writeInt(type);
520: }
521: sendMessage();
522: } catch (SQLException e) {
523: sendErrorResponse(e);
524: }
525: }
526:
527: private void sendNoData() throws IOException {
528: startMessage('n');
529: sendMessage();
530: }
531:
532: private void sendRowDescription(ResultSetMetaData meta)
533: throws IOException {
534: try {
535: if (meta == null) {
536: sendNoData();
537: } else {
538: int columns = meta.getColumnCount();
539: int[] types = new int[columns];
540: int[] precision = new int[columns];
541: String[] names = new String[columns];
542: for (int i = 0; i < columns; i++) {
543: names[i] = meta.getColumnName(i + 1);
544: int type = meta.getColumnType(i + 1);
545: precision[i] = meta.getColumnDisplaySize(i + 1);
546: checkType(type);
547: types[i] = type;
548: }
549: startMessage('T');
550: writeShort(columns);
551: for (int i = 0; i < columns; i++) {
552: writeString(names[i].toLowerCase());
553: writeInt(0); // object ID
554: writeShort(0); // attribute number of the column
555: writeInt(types[i]); // data type
556: writeShort(getTypeSize(types[i], precision[i])); // pg_type.typlen
557: writeInt(getModifier(types[i])); // pg_attribute.atttypmod
558: writeShort(0); // text
559: }
560: sendMessage();
561: }
562: } catch (SQLException e) {
563: sendErrorResponse(e);
564: }
565: }
566:
567: private int getTypeSize(int type, int precision) {
568: switch (type) {
569: case Types.VARCHAR:
570: return Math.max(255, precision + 10);
571: default:
572: return precision + 4;
573: }
574: }
575:
576: private int getModifier(int type) {
577: return -1;
578: }
579:
580: private void sendErrorResponse(String message) throws IOException {
581: error("Exception: " + message, null);
582: startMessage('E');
583: write('S');
584: writeString("ERROR");
585: write('C');
586: writeString("08P01"); // PROTOCOL VIOLATION
587: write('M');
588: writeString(message);
589: sendMessage();
590: }
591:
592: private void sendParseComplete() throws IOException {
593: startMessage('1');
594: sendMessage();
595: }
596:
597: private void sendBindComplete() throws IOException {
598: startMessage('2');
599: sendMessage();
600: }
601:
602: private void initDb() throws SQLException {
603: Statement stat = null;
604: ResultSet rs = null;
605: Reader r = null;
606: try {
607: rs = conn.getMetaData().getTables(null, "PG_CATALOG",
608: "PG_VERSION", null);
609: boolean tableFound = rs.next();
610: stat = conn.createStatement();
611: if (tableFound) {
612: rs = stat
613: .executeQuery("SELECT VERSION FROM PG_CATALOG.PG_VERSION");
614: if (rs.next()) {
615: if (rs.getInt(1) == 1) {
616: // already installed
617: stat
618: .execute("set search_path = PUBLIC, pg_catalog");
619: return;
620: }
621: }
622: }
623: r = new InputStreamReader(PgServerThread.class
624: .getResourceAsStream("pg_catalog.sql"));
625: ScriptReader reader = new ScriptReader(
626: new BufferedReader(r));
627: while (true) {
628: String sql = reader.readStatement();
629: if (sql == null) {
630: break;
631: }
632: stat.execute(sql);
633: }
634: reader.close();
635:
636: rs = stat
637: .executeQuery("SELECT OID FROM PG_CATALOG.PG_TYPE");
638: while (rs.next()) {
639: types.add(ObjectUtils.getInteger(rs.getInt(1)));
640: }
641: } finally {
642: JdbcUtils.closeSilently(stat);
643: JdbcUtils.closeSilently(rs);
644: IOUtils.closeSilently(r);
645: }
646: }
647:
648: public void close() {
649: try {
650: stop = true;
651: JdbcUtils.closeSilently(conn);
652: if (socket != null) {
653: socket.close();
654: }
655: server.log("Close");
656: } catch (Exception e) {
657: server.logError(e);
658: }
659: conn = null;
660: socket = null;
661: server.remove(this );
662: }
663:
664: private void sendAuthenticationCleartextPassword()
665: throws IOException {
666: startMessage('R');
667: writeInt(3);
668: sendMessage();
669: }
670:
671: private void sendAuthenticationOk() throws IOException {
672: startMessage('R');
673: writeInt(0);
674: sendMessage();
675: sendParameterStatus("client_encoding", clientEncoding);
676: sendParameterStatus("DateStyle", dateStyle);
677: sendParameterStatus("integer_datetimes", "off");
678: sendParameterStatus("is_superuser", "off");
679: sendParameterStatus("server_encoding", "SQL_ASCII");
680: sendParameterStatus("server_version", "8.1.4");
681: sendParameterStatus("session_authorization", userName);
682: sendParameterStatus("standard_conforming_strings", "off");
683: sendParameterStatus("TimeZone", "CET"); // TODO
684: sendBackendKeyData();
685: sendReadyForQuery();
686: }
687:
688: private void sendReadyForQuery() throws IOException {
689: startMessage('Z');
690: char c;
691: try {
692: if (conn.getAutoCommit()) {
693: c = 'I'; // idle
694: } else {
695: c = 'T'; // in a transaction block
696: }
697: } catch (SQLException e) {
698: c = 'E'; // failed transaction block
699: }
700: write((byte) c);
701: sendMessage();
702: }
703:
704: private void sendBackendKeyData() throws IOException {
705: startMessage('K');
706: writeInt(processId);
707: writeInt(processId);
708: sendMessage();
709: }
710:
711: private void writeString(String s) throws IOException {
712: write(s.getBytes(getEncoding()));
713: write(0);
714: }
715:
716: private void writeInt(int i) throws IOException {
717: dataOut.writeInt(i);
718: }
719:
720: private void writeShort(int i) throws IOException {
721: dataOut.writeShort(i);
722: }
723:
724: private void write(byte[] data) throws IOException {
725: dataOut.write(data);
726: }
727:
728: private void write(int b) throws IOException {
729: dataOut.write(b);
730: }
731:
732: private void startMessage(int messageType) {
733: this .messageType = messageType;
734: outBuffer = new ByteArrayOutputStream();
735: dataOut = new DataOutputStream(outBuffer);
736: }
737:
738: private void sendMessage() throws IOException {
739: dataOut.flush();
740: byte[] buff = outBuffer.toByteArray();
741: int len = buff.length;
742: dataOut = new DataOutputStream(out);
743: dataOut.write(messageType);
744: dataOut.writeInt(len + 4);
745: dataOut.write(buff);
746: dataOut.flush();
747: }
748:
749: private void sendParameterStatus(String param, String value)
750: throws IOException {
751: startMessage('S');
752: writeString(param);
753: writeString(value);
754: sendMessage();
755: }
756:
757: public void setThread(Thread thread) {
758: this .thread = thread;
759: }
760:
761: public Thread getThread() {
762: return thread;
763: }
764:
765: public void setProcessId(int id) {
766: this .processId = id;
767: }
768:
769: private static class Prepared {
770: String name;
771: String sql;
772: PreparedStatement prep;
773: int[] paramType;
774: }
775:
776: private static class Portal {
777: String name;
778: String sql;
779: int[] resultColumnFormat;
780: PreparedStatement prep;
781: }
782:
783: }
|