001: /*
002: * Copyright 2004-2008 H2 Group. Licensed under the H2 License, Version 1.0
003: * (http://h2database.com/html/license.html).
004: * Initial Developer: H2 Group
005: */
006: package org.h2.server;
007:
008: import java.io.IOException;
009: import java.io.PrintWriter;
010: import java.io.StringWriter;
011: import java.net.Socket;
012: import java.sql.SQLException;
013:
014: import org.h2.command.Command;
015: import org.h2.constant.ErrorCode;
016: import org.h2.constant.SysProperties;
017: import org.h2.engine.ConnectionInfo;
018: import org.h2.engine.Constants;
019: import org.h2.engine.Engine;
020: import org.h2.engine.Session;
021: import org.h2.engine.SessionRemote;
022: import org.h2.expression.Parameter;
023: import org.h2.jdbc.JdbcSQLException;
024: import org.h2.message.Message;
025: import org.h2.result.LocalResult;
026: import org.h2.result.ResultColumn;
027: import org.h2.util.ObjectArray;
028: import org.h2.util.SmallMap;
029: import org.h2.value.Transfer;
030: import org.h2.value.Value;
031:
032: /**
033: * One server thread is opened per client connection.
034: */
035: public class TcpServerThread implements Runnable {
036: private TcpServer server;
037: private Session session;
038: private boolean stop;
039: private Thread thread;
040: private Transfer transfer;
041: private Command commit;
042: private SmallMap cache = new SmallMap(
043: SysProperties.SERVER_CACHED_OBJECTS);
044: private int id;
045:
046: public TcpServerThread(Socket socket, TcpServer server, int id) {
047: this .server = server;
048: this .id = id;
049: transfer = new Transfer(null);
050: transfer.setSocket(socket);
051: }
052:
053: private void log(String s) {
054: server.log(this + " " + s);
055: }
056:
057: public void run() {
058: try {
059: transfer.init();
060: log("Connect");
061: // TODO server: should support a list of allowed databases and a
062: // list of allowed clients
063: try {
064: int version = transfer.readInt();
065: if (!server.allow(transfer.getSocket())) {
066: throw Message
067: .getSQLException(ErrorCode.REMOTE_CONNECTION_NOT_ALLOWED);
068: }
069: if (version != Constants.TCP_DRIVER_VERSION) {
070: throw Message
071: .getSQLException(
072: ErrorCode.DRIVER_VERSION_ERROR_2,
073: new String[] {
074: "" + version,
075: ""
076: + Constants.TCP_DRIVER_VERSION });
077: }
078: String db = transfer.readString();
079: String originalURL = transfer.readString();
080: String baseDir = server.getBaseDir();
081: if (baseDir == null) {
082: baseDir = SysProperties.getBaseDir();
083: }
084: ConnectionInfo ci = new ConnectionInfo(db);
085: if (baseDir != null) {
086: ci.setBaseDir(baseDir);
087: }
088: if (server.getIfExists()) {
089: ci.setProperty("IFEXISTS", "TRUE");
090: }
091: ci.setOriginalURL(originalURL);
092: ci.setUserName(transfer.readString());
093: ci.setUserPasswordHash(transfer.readBytes());
094: ci.setFilePasswordHash(transfer.readBytes());
095: int len = transfer.readInt();
096: for (int i = 0; i < len; i++) {
097: ci.setProperty(transfer.readString(), transfer
098: .readString());
099: }
100: Engine engine = Engine.getInstance();
101: session = engine.getSession(ci);
102: transfer.setSession(session);
103: transfer.writeInt(SessionRemote.STATUS_OK).flush();
104: server.addConnection(id, originalURL, ci.getUserName());
105: log("Connected");
106: } catch (Throwable e) {
107: sendError(e);
108: stop = true;
109: }
110: while (!stop) {
111: try {
112: process();
113: } catch (Throwable e) {
114: sendError(e);
115: }
116: }
117: log("Disconnect");
118: } catch (Throwable e) {
119: server.logError(e);
120: } finally {
121: close();
122: }
123: }
124:
125: private void closeSession() {
126: if (session != null) {
127: try {
128: Command rollback = session.prepareLocal("ROLLBACK");
129: rollback.executeUpdate();
130: session.close();
131: server.removeConnection(id);
132: } catch (Exception e) {
133: server.logError(e);
134: } finally {
135: session = null;
136: }
137: }
138: }
139:
140: public void close() {
141: try {
142: stop = true;
143: closeSession();
144: transfer.close();
145: log("Close");
146: } catch (Exception e) {
147: server.logError(e);
148: }
149: server.remove(this );
150: }
151:
152: private void sendError(Throwable e) {
153: try {
154: SQLException s = Message.convert(e);
155: StringWriter writer = new StringWriter();
156: e.printStackTrace(new PrintWriter(writer));
157: String trace = writer.toString();
158: String message;
159: String sql;
160: if (e instanceof JdbcSQLException) {
161: JdbcSQLException j = (JdbcSQLException) e;
162: message = j.getOriginalMessage();
163: sql = j.getSQL();
164: } else {
165: message = e.getMessage();
166: sql = null;
167: }
168: transfer.writeInt(SessionRemote.STATUS_ERROR).writeString(
169: s.getSQLState()).writeString(message).writeString(
170: sql).writeInt(s.getErrorCode()).writeString(trace)
171: .flush();
172: } catch (IOException e2) {
173: server.logError(e2);
174: // if writing the error does not work, close the connection
175: stop = true;
176: }
177: }
178:
179: private void setParameters(Command command) throws IOException,
180: SQLException {
181: int len = transfer.readInt();
182: ObjectArray params = command.getParameters();
183: for (int i = 0; i < len; i++) {
184: Parameter p = (Parameter) params.get(i);
185: p.setValue(transfer.readValue());
186: }
187: }
188:
189: private void process() throws IOException, SQLException {
190: int operation = transfer.readInt();
191: switch (operation) {
192: case SessionRemote.SESSION_PREPARE: {
193: int id = transfer.readInt();
194: String sql = transfer.readString();
195: Command command = session.prepareLocal(sql);
196: boolean readonly = command.isReadOnly();
197: cache.addObject(id, command);
198: boolean isQuery = command.isQuery();
199: int paramCount = command.getParameters().size();
200: transfer.writeInt(SessionRemote.STATUS_OK).writeBoolean(
201: isQuery).writeBoolean(readonly)
202: .writeInt(paramCount).flush();
203: break;
204: }
205: case SessionRemote.SESSION_CLOSE: {
206: closeSession();
207: transfer.writeInt(SessionRemote.STATUS_OK).flush();
208: close();
209: break;
210: }
211: case SessionRemote.COMMAND_COMMIT: {
212: if (commit == null) {
213: commit = session.prepareLocal("COMMIT");
214: }
215: commit.executeUpdate();
216: transfer.writeInt(SessionRemote.STATUS_OK).flush();
217: break;
218: }
219: case SessionRemote.COMMAND_GET_META_DATA: {
220: int id = transfer.readInt();
221: int objectId = transfer.readInt();
222: Command command = (Command) cache.getObject(id, false);
223: LocalResult result = command.getMetaDataLocal();
224: cache.addObject(objectId, result);
225: int columnCount = result.getVisibleColumnCount();
226: transfer.writeInt(SessionRemote.STATUS_OK).writeInt(
227: columnCount).writeInt(0);
228: for (int i = 0; i < columnCount; i++) {
229: ResultColumn.writeColumn(transfer, result, i);
230: }
231: transfer.flush();
232: break;
233: }
234: case SessionRemote.COMMAND_EXECUTE_QUERY: {
235: int id = transfer.readInt();
236: int objectId = transfer.readInt();
237: int maxRows = transfer.readInt();
238: int fetchSize = transfer.readInt();
239: Command command = (Command) cache.getObject(id, false);
240: setParameters(command);
241: LocalResult result = command.executeQueryLocal(maxRows);
242: cache.addObject(objectId, result);
243: int columnCount = result.getVisibleColumnCount();
244: transfer.writeInt(SessionRemote.STATUS_OK).writeInt(
245: columnCount);
246: int rowCount = result.getRowCount();
247: transfer.writeInt(rowCount);
248: for (int i = 0; i < columnCount; i++) {
249: ResultColumn.writeColumn(transfer, result, i);
250: }
251: int fetch = Math.min(rowCount, fetchSize);
252: for (int i = 0; i < fetch; i++) {
253: sendRow(result);
254: }
255: transfer.flush();
256: break;
257: }
258: case SessionRemote.COMMAND_EXECUTE_UPDATE: {
259: int id = transfer.readInt();
260: Command command = (Command) cache.getObject(id, false);
261: setParameters(command);
262: int updateCount = command.executeUpdate();
263: int status = SessionRemote.STATUS_OK;
264: if (session.isClosed()) {
265: status = SessionRemote.STATUS_CLOSED;
266: }
267: transfer.writeInt(status).writeInt(updateCount)
268: .writeBoolean(session.getAutoCommit());
269: transfer.flush();
270: break;
271: }
272: case SessionRemote.COMMAND_CLOSE: {
273: int id = transfer.readInt();
274: Command command = (Command) cache.getObject(id, true);
275: if (command != null) {
276: command.close();
277: cache.freeObject(id);
278: }
279: break;
280: }
281: case SessionRemote.RESULT_FETCH_ROWS: {
282: int id = transfer.readInt();
283: int count = transfer.readInt();
284: LocalResult result = (LocalResult) cache.getObject(id,
285: false);
286: transfer.writeInt(SessionRemote.STATUS_OK);
287: for (int i = 0; i < count; i++) {
288: sendRow(result);
289: }
290: transfer.flush();
291: break;
292: }
293: case SessionRemote.RESULT_RESET: {
294: int id = transfer.readInt();
295: LocalResult result = (LocalResult) cache.getObject(id,
296: false);
297: result.reset();
298: break;
299: }
300: case SessionRemote.RESULT_CLOSE: {
301: int id = transfer.readInt();
302: LocalResult result = (LocalResult) cache
303: .getObject(id, true);
304: if (result != null) {
305: result.close();
306: cache.freeObject(id);
307: }
308: break;
309: }
310: case SessionRemote.CHANGE_ID: {
311: int oldId = transfer.readInt();
312: int newId = transfer.readInt();
313: Object obj = cache.getObject(oldId, false);
314: cache.freeObject(oldId);
315: cache.addObject(newId, obj);
316: break;
317: }
318: default:
319: server.logInternalError("Unknown operation: " + operation);
320: server.log("Unknown operation: " + operation);
321: closeSession();
322: close();
323: }
324: }
325:
326: private void sendRow(LocalResult result) throws IOException,
327: SQLException {
328: if (result.next()) {
329: transfer.writeBoolean(true);
330: Value[] v = result.currentRow();
331: for (int i = 0; i < result.getVisibleColumnCount(); i++) {
332: transfer.writeValue(v[i]);
333: }
334: } else {
335: transfer.writeBoolean(false);
336: }
337: }
338:
339: public void setThread(Thread thread) {
340: this .thread = thread;
341: }
342:
343: public Thread getThread() {
344: return thread;
345: }
346:
347: }
|