001: /**
002: * com.mckoi.database.jdbcserver.JDBCProcessor 22 Jul 2000
003: *
004: * Mckoi SQL Database ( http://www.mckoi.com/database )
005: * Copyright (C) 2000, 2001, 2002 Diehl and Associates, Inc.
006: *
007: * This program is free software; you can redistribute it and/or
008: * modify it under the terms of the GNU General Public License
009: * Version 2 as published by the Free Software Foundation.
010: *
011: * This program is distributed in the hope that it will be useful,
012: * but WITHOUT ANY WARRANTY; without even the implied warranty of
013: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
014: * GNU General Public License Version 2 for more details.
015: *
016: * You should have received a copy of the GNU General Public License
017: * Version 2 along with this program; if not, write to the Free Software
018: * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
019: *
020: * Change Log:
021: *
022: *
023: */package com.mckoi.database.jdbcserver;
024:
025: import com.mckoi.database.global.ObjectTransfer;
026: import com.mckoi.database.jdbc.StreamableObjectPart;
027: import com.mckoi.database.jdbc.ProtocolConstants;
028: import com.mckoi.database.jdbc.MSQLException;
029: import com.mckoi.database.jdbc.DatabaseCallBack;
030: import com.mckoi.database.jdbc.DatabaseInterface;
031: import com.mckoi.database.jdbc.QueryResponse;
032: import com.mckoi.database.jdbc.ResultPart;
033: import com.mckoi.database.jdbc.SQLQuery;
034: import com.mckoi.debug.*;
035: import com.mckoi.util.ByteArrayUtil;
036:
037: import java.sql.SQLException;
038: import java.io.*;
039:
040: /**
041: * This processes JDBC commands from a JDBC client and dispatches the commands
042: * to the database. This is a state based class. There is a single processor
043: * for each JDBC client connected. This class is designed to be flexible
044: * enough to handle packet based protocols as well as stream based
045: * protocols.
046: *
047: * @author Tobias Downer
048: */
049:
050: abstract class JDBCProcessor implements ProtocolConstants {
051:
052: /**
053: * The version of the server protocol.
054: */
055: private static final int SERVER_VERSION = 1;
056:
057: /**
058: * The current state we are in. 0 indicates we haven't logged in yet. 100
059: * indicates we are logged in.
060: */
061: private int state;
062:
063: /**
064: * Number of authentications tried.
065: */
066: private int authentication_tries;
067:
068: /**
069: * The interface to the database.
070: */
071: private DatabaseInterface db_interface;
072:
073: /**
074: * An object the debug information can be logged to.
075: */
076: private DebugLogger debug;
077:
078: /**
079: * Sets up the processor.
080: */
081: JDBCProcessor(DatabaseInterface db_interface, DebugLogger logger) {
082: this .debug = logger;
083: this .db_interface = db_interface;
084: state = 0;
085: authentication_tries = 0;
086: }
087:
088: /**
089: * The database call back method that sends database events back to the
090: * client.
091: */
092: private DatabaseCallBack db_call_back = new DatabaseCallBack() {
093: public void databaseEvent(int event_type, String event_message) {
094: try {
095: // Format the call back and send the event.
096: ByteArrayOutputStream bout = new ByteArrayOutputStream();
097: DataOutputStream dout = new DataOutputStream(bout);
098: dout.writeInt(event_type);
099: dout.writeUTF(event_message);
100: sendEvent(bout.toByteArray());
101: } catch (IOException e) {
102: debug.write(Lvl.ERROR, this , "IO Error: "
103: + e.getMessage());
104: debug.writeException(e);
105: }
106: }
107: };
108:
109: protected static void printByteArray(byte[] array) {
110: System.out.println("Length: " + array.length);
111: for (int i = 0; i < array.length; ++i) {
112: System.out.print(array[i]);
113: System.out.print(", ");
114: }
115: }
116:
117: /**
118: * Processes a single JDBCCommand from the client. The command comes in as
119: * a byte[] array and the response is written out as a byte[] array. If
120: * it returns 'null' then it means the connection has been closed.
121: */
122: byte[] processJDBCCommand(byte[] command) throws IOException {
123:
124: // printByteArray(command);
125:
126: if (state == 0) {
127: // State 0 means we looking for the header...
128: int magic = ByteArrayUtil.getInt(command, 0);
129: // The driver version number
130: int maj_ver = ByteArrayUtil.getInt(command, 4);
131: int min_ver = ByteArrayUtil.getInt(command, 8);
132:
133: byte[] ack_command = new byte[4 + 1 + 4 + 1];
134: // Send back an acknowledgement and the version number of the server
135: ByteArrayUtil.setInt(ACKNOWLEDGEMENT, ack_command, 0);
136: ack_command[4] = 1;
137: ByteArrayUtil.setInt(SERVER_VERSION, ack_command, 5);
138: ack_command[9] = 0;
139:
140: // Set to the next state.
141: state = 4;
142:
143: // Return the acknowledgement
144: return ack_command;
145:
146: // // We accept drivers equal or less than 1.00 currently.
147: // if ((maj_ver == 1 && min_ver == 0) || maj_ver == 0) {
148: // // Go to next state.
149: // state = 4;
150: // return single(ACKNOWLEDGEMENT);
151: // }
152: // else {
153: // // Close the connection if driver invalid.
154: // close();
155: // }
156: //
157: // return null;
158: }
159:
160: else if (state == 4) {
161: // State 4 means we looking for username and password...
162: ByteArrayInputStream bin = new ByteArrayInputStream(command);
163: DataInputStream din = new DataInputStream(bin);
164: String default_schema = din.readUTF();
165: String username = din.readUTF();
166: String password = din.readUTF();
167:
168: try {
169: boolean good = db_interface.login(default_schema,
170: username, password, db_call_back);
171: if (good == false) {
172: // Close after 12 tries.
173: if (authentication_tries >= 12) {
174: close();
175: } else {
176: ++authentication_tries;
177: return single(USER_AUTHENTICATION_FAILED);
178: }
179: } else {
180: state = 100;
181: return single(USER_AUTHENTICATION_PASSED);
182: }
183: } catch (SQLException e) {
184: }
185: return null;
186:
187: }
188:
189: else if (state == 100) {
190: // Process the query
191: return processQuery(command);
192: }
193:
194: else {
195: throw new Error("Illegal state: " + state);
196: }
197:
198: }
199:
200: /**
201: * Returns the state of the connection. 0 = not logged in yet. 1 = logged
202: * in.
203: */
204: int getState() {
205: return state;
206: }
207:
208: /**
209: * Convenience, returns a single 4 byte array with the given int encoded
210: * into it.
211: */
212: private byte[] single(int val) {
213: byte[] buf = new byte[4];
214: ByteArrayUtil.setInt(val, buf, 0);
215: return buf;
216: }
217:
218: /**
219: * Creates a response that represents an SQL exception failure.
220: */
221: private byte[] exception(int dispatch_id, SQLException e)
222: throws IOException {
223:
224: int code = e.getErrorCode();
225: String msg = e.getMessage();
226: if (msg == null) {
227: msg = "NULL exception message";
228: }
229: String server_msg = "";
230: String stack_trace = "";
231:
232: if (e instanceof MSQLException) {
233: MSQLException me = (MSQLException) e;
234: server_msg = me.getServerErrorMsg();
235: stack_trace = me.getServerErrorStackTrace();
236: } else {
237: StringWriter writer = new StringWriter();
238: e.printStackTrace(new PrintWriter(writer));
239: stack_trace = writer.toString();
240: }
241:
242: ByteArrayOutputStream bout = new ByteArrayOutputStream();
243: DataOutputStream dout = new DataOutputStream(bout);
244: dout.writeInt(dispatch_id);
245: dout.writeInt(EXCEPTION);
246: dout.writeInt(code);
247: dout.writeUTF(msg);
248: dout.writeUTF(stack_trace);
249:
250: return bout.toByteArray();
251:
252: }
253:
254: /**
255: * Creates a response that indicates a simple success of an operation with
256: * the given dispatch id.
257: */
258: private byte[] simpleSuccess(int dispatch_id) throws IOException {
259: byte[] buf = new byte[8];
260: ByteArrayUtil.setInt(dispatch_id, buf, 0);
261: ByteArrayUtil.setInt(SUCCESS, buf, 4);
262: return buf;
263: }
264:
265: /**
266: * Processes a query on the byte[] array and returns the result.
267: */
268: private byte[] processQuery(byte[] command) throws IOException {
269:
270: byte[] result;
271:
272: // The first int is the command.
273: int ins = ByteArrayUtil.getInt(command, 0);
274:
275: // Otherwise must be a dispatch type request.
276: // The second is the dispatch id.
277: int dispatch_id = ByteArrayUtil.getInt(command, 4);
278:
279: if (dispatch_id == -1) {
280: throw new Error("Special case dispatch id of -1 in query");
281: }
282:
283: if (ins == RESULT_SECTION) {
284: result = resultSection(dispatch_id, command);
285: } else if (ins == QUERY) {
286: result = queryCommand(dispatch_id, command);
287: } else if (ins == PUSH_STREAMABLE_OBJECT_PART) {
288: result = pushStreamableObjectPart(dispatch_id, command);
289: } else if (ins == DISPOSE_RESULT) {
290: result = disposeResult(dispatch_id, command);
291: } else if (ins == STREAMABLE_OBJECT_SECTION) {
292: result = streamableObjectSection(dispatch_id, command);
293: } else if (ins == DISPOSE_STREAMABLE_OBJECT) {
294: result = disposeStreamableObject(dispatch_id, command);
295: } else if (ins == CLOSE) {
296: close();
297: result = null;
298: } else {
299: throw new Error("Command (" + ins + ") not understood.");
300: }
301:
302: return result;
303:
304: }
305:
306: /**
307: * Disposes of this processor.
308: */
309: void dispose() {
310: try {
311: db_interface.dispose();
312: } catch (Throwable e) {
313: debug.writeException(Lvl.ERROR, e);
314: }
315: }
316:
317: // ---------- JDBC primitive commands ----------
318:
319: /**
320: * Executes a query and returns the header for the result in the response.
321: * This keeps track of all result sets because sections of the result are
322: * later queries via the 'RESULT_SECTION' command.
323: * <p>
324: * 'dispatch_id' is the number we need to respond with.
325: */
326: private byte[] queryCommand(int dispatch_id, byte[] command)
327: throws IOException {
328:
329: // Read the query from the command.
330: ByteArrayInputStream bin = new ByteArrayInputStream(command, 8,
331: command.length - 8);
332: DataInputStream din = new DataInputStream(bin);
333: SQLQuery query = SQLQuery.readFrom(din);
334:
335: try {
336: // Do the query
337: QueryResponse response = db_interface.execQuery(query);
338:
339: // Prepare the stream to output the response to,
340: ByteArrayOutputStream bout = new ByteArrayOutputStream();
341: DataOutputStream dout = new DataOutputStream(bout);
342:
343: dout.writeInt(dispatch_id);
344: dout.writeInt(SUCCESS);
345:
346: // The response sends the result id, the time the query took, the
347: // total row count, and description of each column in the result.
348: dout.writeInt(response.getResultID());
349: dout.writeInt(response.getQueryTimeMillis());
350: dout.writeInt(response.getRowCount());
351: int col_count = response.getColumnCount();
352: dout.writeInt(col_count);
353: for (int i = 0; i < col_count; ++i) {
354: response.getColumnDescription(i).writeTo(dout);
355: }
356:
357: return bout.toByteArray();
358:
359: } catch (SQLException e) {
360: // debug.writeException(e);
361: return exception(dispatch_id, e);
362: }
363:
364: }
365:
366: /**
367: * Pushes a part of a streamable object onto the server.
368: * <p>
369: * 'dispatch_id' is the number we need to respond with.
370: */
371: private byte[] pushStreamableObjectPart(int dispatch_id,
372: byte[] command) throws IOException {
373: byte type = command[8];
374: long object_id = ByteArrayUtil.getLong(command, 9);
375: long object_length = ByteArrayUtil.getLong(command, 17);
376: int length = ByteArrayUtil.getInt(command, 25);
377: byte[] ob_buf = new byte[length];
378: System.arraycopy(command, 29, ob_buf, 0, length);
379: long offset = ByteArrayUtil.getLong(command, 29 + length);
380:
381: try {
382: // Pass this through to the underlying database interface.
383: db_interface.pushStreamableObjectPart(type, object_id,
384: object_length, ob_buf, offset, length);
385:
386: // Return operation success.
387: return simpleSuccess(dispatch_id);
388:
389: } catch (SQLException e) {
390: return exception(dispatch_id, e);
391: }
392:
393: }
394:
395: /**
396: * Responds with a part of the result set of a query made via the 'QUERY'
397: * command.
398: * <p>
399: * 'dispatch_id' is the number we need to respond with.
400: */
401: private byte[] resultSection(int dispatch_id, byte[] command)
402: throws IOException {
403:
404: int result_id = ByteArrayUtil.getInt(command, 8);
405: int row_number = ByteArrayUtil.getInt(command, 12);
406: int row_count = ByteArrayUtil.getInt(command, 16);
407:
408: try {
409: // Get the result part...
410: ResultPart block = db_interface.getResultPart(result_id,
411: row_number, row_count);
412:
413: ByteArrayOutputStream bout = new ByteArrayOutputStream();
414: DataOutputStream dout = new DataOutputStream(bout);
415:
416: dout.writeInt(dispatch_id);
417: dout.writeInt(SUCCESS);
418:
419: // Send the contents of the result set.
420: // HACK - Work out column count by dividing number of entries in block
421: // by number of rows.
422: int col_count = block.size() / row_count;
423: dout.writeInt(col_count);
424: int bsize = block.size();
425: for (int index = 0; index < bsize; ++index) {
426: ObjectTransfer.writeTo(dout, block.elementAt(index));
427: }
428:
429: return bout.toByteArray();
430: } catch (SQLException e) {
431: return exception(dispatch_id, e);
432: }
433: }
434:
435: /**
436: * Returns a section of a streamable object.
437: * <p>
438: * 'dispatch_id' is the number we need to respond with.
439: */
440: private byte[] streamableObjectSection(int dispatch_id,
441: byte[] command) throws IOException {
442: int result_id = ByteArrayUtil.getInt(command, 8);
443: long streamable_object_id = ByteArrayUtil.getLong(command, 12);
444: long offset = ByteArrayUtil.getLong(command, 20);
445: int length = ByteArrayUtil.getInt(command, 28);
446:
447: try {
448: StreamableObjectPart ob_part = db_interface
449: .getStreamableObjectPart(result_id,
450: streamable_object_id, offset, length);
451:
452: ByteArrayOutputStream bout = new ByteArrayOutputStream();
453: DataOutputStream dout = new DataOutputStream(bout);
454:
455: dout.writeInt(dispatch_id);
456: dout.writeInt(SUCCESS);
457:
458: byte[] buf = ob_part.getContents();
459: dout.writeInt(buf.length);
460: dout.write(buf, 0, buf.length);
461:
462: return bout.toByteArray();
463: } catch (SQLException e) {
464: return exception(dispatch_id, e);
465: }
466:
467: }
468:
469: /**
470: * Disposes of a streamable object.
471: * <p>
472: * 'dispatch_id' is the number we need to respond with.
473: */
474: private byte[] disposeStreamableObject(int dispatch_id,
475: byte[] command) throws IOException {
476: int result_id = ByteArrayUtil.getInt(command, 8);
477: long streamable_object_id = ByteArrayUtil.getLong(command, 12);
478:
479: try {
480: // Pass this through to the underlying database interface.
481: db_interface.disposeStreamableObject(result_id,
482: streamable_object_id);
483:
484: // Return operation success.
485: return simpleSuccess(dispatch_id);
486:
487: } catch (SQLException e) {
488: return exception(dispatch_id, e);
489: }
490: }
491:
492: /**
493: * Disposes of a result set we queries via the 'QUERY' command.
494: * <p>
495: * 'dispatch_id' is the number we need to respond with.
496: */
497: private byte[] disposeResult(int dispatch_id, byte[] command)
498: throws IOException {
499:
500: // Get the result id.
501: int result_id = ByteArrayUtil.getInt(command, 8);
502:
503: try {
504: // Dispose the table.
505: db_interface.disposeResult(result_id);
506: // Return operation success.
507: return simpleSuccess(dispatch_id);
508: } catch (SQLException e) {
509: return exception(dispatch_id, e);
510: }
511: }
512:
513: // ---------- Abstract methods ----------
514:
515: /**
516: * Sends an event to the client. This is used to notify the client of
517: * trigger events, etc.
518: * <p>
519: * SECURITY ISSUE: This is always invoked by the DatabaseDispatcher. We
520: * have to be careful that this method isn't allowed to block. Otherwise
521: * the DatabaseDispatcher thread will be out of operation. Unfortunately
522: * assuring this may not be possible until Java has non-blocking IO, or we
523: * use datagrams for transmission. I know for sure that the TCP
524: * implementation is vunrable. If the client doesn't 'read' what we are
525: * sending then this'll block when the buffers become full.
526: */
527: public abstract void sendEvent(byte[] event_msg) throws IOException;
528:
529: /**
530: * Closes the connection with the client.
531: */
532: public abstract void close() throws IOException;
533:
534: /**
535: * Returns true if the connection to the client is closed.
536: */
537: public abstract boolean isClosed() throws IOException;
538:
539: // ---------- Finalize ----------
540:
541: public final void finalize() throws Throwable {
542: super .finalize();
543: try {
544: dispose();
545: } catch (Throwable e) { /* ignore */
546: }
547: }
548:
549: }
|