001: /**
002: * Sequoia: Database clustering technology.
003: * Copyright (C) 2002-2004 French National Institute For Research In Computer
004: * Science And Control (INRIA).
005: * Copyright (C) 2005 AmicoSoft, Inc. dba Emic Networks
006: * Contact: sequoia@continuent.org
007: *
008: * Licensed under the Apache License, Version 2.0 (the "License");
009: * you may not use this file except in compliance with the License.
010: * You may obtain a copy of the License at
011: *
012: * http://www.apache.org/licenses/LICENSE-2.0
013: *
014: * Unless required by applicable law or agreed to in writing, software
015: * distributed under the License is distributed on an "AS IS" BASIS,
016: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017: * See the License for the specific language governing permissions and
018: * limitations under the License.
019: *
020: * Initial developer(s): Emmanuel Cecchet.
021: * Contributor(s): Diego Malpica.
022: */package org.continuent.sequoia.controller.backend.result;
023:
024: import java.io.IOException;
025: import java.io.Serializable;
026: import java.sql.ResultSet;
027: import java.sql.SQLException;
028: import java.sql.SQLWarning;
029: import java.sql.Statement;
030: import java.util.ArrayList;
031: import java.util.Iterator;
032:
033: import org.continuent.sequoia.common.exceptions.NotImplementedException;
034: import org.continuent.sequoia.common.exceptions.driver.protocol.BackendDriverException;
035: import org.continuent.sequoia.common.protocol.Field;
036: import org.continuent.sequoia.common.protocol.SQLDataSerialization;
037: import org.continuent.sequoia.common.protocol.TypeTag;
038: import org.continuent.sequoia.common.stream.DriverBufferedOutputStream;
039: import org.continuent.sequoia.controller.cache.metadata.MetadataCache;
040: import org.continuent.sequoia.controller.core.ControllerConstants;
041: import org.continuent.sequoia.controller.requests.AbstractRequest;
042:
043: /**
044: * A <code>ControllerResultSet</code> is a lightweight ResultSet for the
045: * controller side. It only contains row data and column metadata. The real
046: * ResultSet is constructed on by the driver (DriverResultSet object) on the
047: * client side from the ControllerResultSet information.
048: *
049: * @author <a href="mailto:Emmanuel.Cecchet@inria.fr">Emmanuel Cecchet </a>
050: * @version 1.0
051: */
052: public class ControllerResultSet extends AbstractResult implements
053: Serializable {
054: private static final long serialVersionUID = 4109773059200535129L;
055:
056: /** The results */
057: private ArrayList data = null;
058: /** The fields */
059: private Field[] fields = null;
060: /** Cursor name for this ResultSet (not used yet) */
061: private String cursorName = null;
062: /** Fetch size if we need to fetch only a subset of the ResultSet */
063: private int fetchSize = 0;
064: /** Backend ResultSet. We need to hold a ref to it when streaming. */
065: private transient ResultSet dbResultSet = null;
066: /**
067: * Temporary reference to Statement when we are built from a dbResultSet.
068: * Maybe this should be a local variable.
069: */
070: private transient Statement owningStatement = null;
071: /** True if the underlying database ResultSet is closed */
072: private boolean dbResultSetClosed = true;
073: /** True if there is still more data to fetch from dbResultSet */
074: private boolean hasMoreData = false;
075: /** Maximum number of rows remaining to fetch */
076: private int maxRows = 0;
077: /** Pointers to column-specific de/serializer */
078: private SQLDataSerialization.Serializer[] serializers;
079: /** SQL Warning attached to this resultset */
080: private SQLWarning warnings = null;
081:
082: /**
083: * Build a Sequoia ResultSet from a database specific ResultSet. The metadata
084: * can be retrieved from the MetadataCache if provided. If a metadata cache is
085: * provided but the data is not in the cache, the MetadataCache is updated
086: * accordingly. The remaining code is a straightforward copy of both metadata
087: * and data. Used for actual queries, as opposed to metadata ResultSets.
088: * <p>
089: * The statement used to execute the query will be closed when the ResultSet
090: * has been completely copied or when the ResultSet is closed while in
091: * streaming mode.
092: *
093: * @param request Request to which this ResultSet belongs
094: * @param rs The database specific ResultSet
095: * @param metadataCache MetadataCache (null if none)
096: * @param s Statement used to get rs
097: * @param isPartOfMultipleResults true if this ResultSet is part of a call to
098: * a query that returns multiple ResultSets
099: * @throws SQLException if an error occurs
100: */
101: public ControllerResultSet(AbstractRequest request,
102: java.sql.ResultSet rs, MetadataCache metadataCache,
103: Statement s, boolean isPartOfMultipleResults)
104: throws SQLException {
105: this .owningStatement = s;
106: try {
107: if (rs == null)
108: throw new SQLException(
109: "Cannot build a ControllerResultSet with a null java.sql.ResultSet");
110:
111: // This is already a result coming from another controller.
112: // if (rs instanceof org.continuent.sequoia.driver.ResultSet)
113: // return (org.continuent.sequoia.driver.ResultSet) rs;
114:
115: // Build the ResultSet metaData
116: if ((metadataCache != null) && !isPartOfMultipleResults)
117: fields = metadataCache.getMetadata(request);
118:
119: if (fields == null) { // Metadata Cache miss or part of multiple results
120: // Build the fields from the MetaData
121: java.sql.ResultSetMetaData metaData = rs.getMetaData();
122: fields = ControllerConstants.CONTROLLER_FACTORY
123: .getResultSetMetaDataFactory()
124: .copyResultSetMetaData(metaData, metadataCache);
125: if ((metadataCache != null) && !isPartOfMultipleResults)
126: metadataCache.addMetadata(request, fields);
127: }
128: // Copy the warnings
129: warnings = null;
130: if (request.getRetrieveSQLWarnings()) {
131: warnings = rs.getWarnings();
132: }
133:
134: // Build the ResultSet data
135: data = new ArrayList();
136: if (rs.next()) // not empty RS
137: {
138: cursorName = request.getCursorName();
139: fetchSize = request.getFetchSize();
140: maxRows = request.getMaxRows();
141: if (maxRows == 0)
142: maxRows = Integer.MAX_VALUE; // Infinite number of rows
143:
144: // Note that fetchData updates the data field
145: dbResultSet = rs;
146: fetchData();
147: if (hasMoreData && (cursorName == null))
148: // hashCode() is not guaranteed to be injective in theory,
149: // but returns the address of the object in practice.
150: cursorName = String.valueOf(dbResultSet.hashCode());
151: } else
152: // empty RS
153: {
154: hasMoreData = false;
155: dbResultSet = null;
156: dbResultSetClosed = true;
157: rs.close();
158: if (owningStatement != null) {
159: try {
160: owningStatement.close();
161: } catch (SQLException ignore) {
162: }
163: owningStatement = null;
164: }
165: }
166: } catch (SQLException e) {
167: throw (SQLException) new SQLException(
168: "Error while building Sequoia ResultSet ("
169: + e.getLocalizedMessage() + ")", e
170: .getSQLState(), e.getErrorCode())
171: .initCause(e);
172: }
173: }
174:
175: /**
176: * Creates a new <code>ControllerResultSet</code> object from already built
177: * data. Used for ResultSets holding metadata.
178: *
179: * @param fields ResultSet metadata fields
180: * @param data ResultSet data (an ArrayList of Object[] representing row
181: * content)
182: */
183: public ControllerResultSet(Field[] fields, ArrayList data) {
184: if (data == null)
185: throw new IllegalArgumentException(
186: "Cannot build a ControllerResultSet with null data ArrayList");
187:
188: this .fields = fields;
189: this .data = data;
190: warnings = null;
191: }
192:
193: /**
194: * Closes the database ResultSet to release the resource and garbage collect
195: * data.
196: */
197: public void closeResultSet() {
198: if ((dbResultSet != null) && !dbResultSetClosed) {
199: try {
200: dbResultSet.close();
201: } catch (SQLException ignore) {
202: }
203: dbResultSet = null; // to allow GC to work properly
204:
205: // TODO: explain how owningStatement could be not null since we set it to
206: // null at end of constructor ??
207: if (owningStatement != null) {
208: try {
209: owningStatement.close();
210: } catch (SQLException ignore) {
211: }
212: owningStatement = null;
213: }
214: }
215: }
216:
217: /**
218: * Sets the fetch size and calls fetchData()
219: *
220: * @param fetchSizeParam the number of rows to fetch
221: * @throws SQLException if an error occurs
222: * @see #fetchData()
223: */
224: public void fetchData(int fetchSizeParam) throws SQLException {
225: this .fetchSize = fetchSizeParam;
226: fetchData();
227: if (!hasMoreData) {
228: if (owningStatement != null) {
229: try {
230: owningStatement.close();
231: } catch (SQLException ignore) {
232: }
233: owningStatement = null;
234: }
235: }
236: }
237:
238: /**
239: * Fetch the next rows of data from dbResultSet according to fetchSize and
240: * maxRows parameters. This methods directly updates the data and hasMoreData
241: * fields returned by getData() and hadMoreData() accessors.
242: *
243: * @throws SQLException from the backend or if dbResultSet is closed. Maybe we
244: * should use a different type internally.
245: */
246: public void fetchData() throws SQLException {
247: if (dbResultSet == null)
248: throw new SQLException("Backend ResultSet is closed");
249:
250: Object[] row;
251: // We directly update the data field
252:
253: // Re-use the existing ArrayList with the same size: more efficient in the
254: // usual case (constant fetchSize)
255: data.clear();
256: int toFetch;
257: if (fetchSize > 0) {
258: toFetch = fetchSize < maxRows ? fetchSize : maxRows;
259: // instead of remembering how much we sent, it's simpler to decrease how
260: // much we still may send.
261: maxRows -= toFetch;
262: } else
263: toFetch = maxRows;
264: int nbColumn = fields.length;
265: Object object;
266: do {
267: row = new Object[nbColumn];
268: for (int i = 0; i < nbColumn; i++) {
269: object = ControllerConstants.CONTROLLER_FACTORY
270: .getBackendObjectConverter()
271: .convertResultSetObject(dbResultSet, i,
272: fields[i]);
273: row[i] = object;
274: }
275: data.add(row);
276: toFetch--;
277: hasMoreData = dbResultSet.next();
278: } while (hasMoreData && (toFetch > 0));
279: if (hasMoreData && (fetchSize > 0) && (maxRows > 0)) { // More data to fetch later on
280: maxRows += toFetch;
281: dbResultSetClosed = false;
282: } else {
283: hasMoreData = false;
284: dbResultSet.close();
285: if (owningStatement != null)
286: owningStatement.close();
287: dbResultSet = null;
288: dbResultSetClosed = true;
289: }
290: }
291:
292: /**
293: * Get the name of the SQL cursor used by this ResultSet
294: *
295: * @return the ResultSet's SQL cursor name.
296: */
297: public String getCursorName() {
298: return cursorName;
299: }
300:
301: /**
302: * Returns the data value.
303: *
304: * @return Returns the data.
305: */
306: public ArrayList getData() {
307: return data;
308: }
309:
310: /**
311: * Returns the fields value.
312: *
313: * @return Returns the fields.
314: */
315: public Field[] getFields() {
316: return fields;
317: }
318:
319: /**
320: * Returns the hasMoreData value.
321: *
322: * @return Returns the hasMoreData.
323: */
324: public boolean hasMoreData() {
325: return hasMoreData;
326: }
327:
328: //
329: // Serialization
330: //
331:
332: /**
333: * Serialize the <code>DriverResultSet</code> on the output stream by
334: * sending only the needed parameters to reconstruct it on the driver. Caller
335: * MUST have called #initSerializers() before. MUST mirror the following
336: * deserialization method:
337: * {@link org.continuent.sequoia.driver.DriverResultSet#DriverResultSet(org.continuent.sequoia.driver.Connection)}
338: *
339: * @param output destination stream
340: * @throws IOException if a network error occurs
341: */
342:
343: public void sendToStream(
344: org.continuent.sequoia.common.stream.DriverBufferedOutputStream output)
345: throws IOException {
346: // Serialize SQL warning chain first (in case of result streaming, results
347: // must be the last ones to be sent
348: if (warnings != null) {
349: output.writeBoolean(true);
350: new BackendDriverException(warnings).sendToStream(output);
351: } else
352: output.writeBoolean(false);
353:
354: int nbOfColumns = fields.length;
355: int nbOfRows = data.size();
356: // serialize columns information
357: output.writeInt(nbOfColumns);
358: for (int f = 0; f < nbOfColumns; f++)
359: this .fields[f].sendToStream(output);
360:
361: TypeTag.COL_TYPES.sendToStream(output);
362:
363: // This could be just a boolean, see next line. But there is no real need
364: // for change.
365: output.writeInt(nbOfRows);
366:
367: // Send Java columns type. We need to do it only once: not for every row!
368: if (nbOfRows > 0) {
369: if (null == this .serializers)
370: throw new IllegalStateException(
371: "Bug: forgot to initialize serializers of a non empty ControllerResultSet");
372:
373: for (int col = 0; col < nbOfColumns; col++)
374: serializers[col].getTypeTag().sendToStream(output);
375: }
376:
377: // Finally send the actual data
378: sendRowsToStream(output);
379:
380: if (this .hasMoreData) { // Send the cursor name for further references
381: output.writeLongUTF(this .cursorName);
382: }
383: output.flush();
384: }
385:
386: /**
387: * Initialize serializers based on the analysis of actual Java Objects of the
388: * ResultSet to send (typically issued by backend's driver readObject()
389: * method). MUST be called before #sendToStream()
390: *
391: * @throws NotImplementedException in case we don't know how to serialize
392: * something
393: */
394: public void initSerializers() throws NotImplementedException {
395: /* we don't expect the column types of "this" result set to change */
396: if (this .serializers != null)
397: return;
398:
399: if (data.size() == 0)
400: return;
401:
402: final int nbOfColumns = fields.length;
403: this .serializers = new SQLDataSerialization.Serializer[nbOfColumns];
404:
405: for (int col = 0; col < nbOfColumns; col++) {
406: int rowIdx = -1;
407: while (serializers[col] == null) {
408: rowIdx++;
409:
410: // We browsed the whole column and found nothing but NULLs
411: if (rowIdx >= data.size()) // ? || rowIdx > 100)
412: break;
413:
414: final Object[] row = (Object[]) data.get(rowIdx);
415: final Object sqlObj = row[col];
416:
417: /*
418: * If SQL was NULL, we only have a null reference and can't do much with
419: * it. Move down to next row
420: */
421: if (sqlObj == null)
422: continue;
423:
424: try {
425: serializers[col] = SQLDataSerialization
426: .getSerializer(sqlObj);
427: } catch (NotImplementedException nie) {
428: if (sqlObj instanceof Short) {
429: /**
430: * This is a workaround for a bug in (at least) PostgreSQL's driver.
431: * This bug has been only very recently fixed: 8 jun 2005 in version
432: * 1.75 of source file
433: * pgjdbc/org/postgresql/jdbc2/AbstractJdbc2ResultSet.java
434: * http://jdbc.postgresql.org/development/cvs.html.
435: * <p>
436: * It seems this java.lang.Short bug happens with multiple DBMS:
437: * http://archives.postgresql.org/pgsql-jdbc/2005-07/threads.php#00382
438: */
439:
440: // FIXME: we should probably convert to Integer sooner in
441: // backendObjectConverter. Or just set a different serializer.
442: // Unfortunately we have not access to any logger at this point.
443: // TODO: append the following SQLwarning() to this resultset
444: // "Buggy backend driver returns a java.lang.Short"
445: // + " for column number " + col + ", converting to Integer"
446: serializers[col] = SQLDataSerialization
447: .getSerializer(TypeTag.INTEGER);
448:
449: } // no known serialization workaround
450: else {
451: NotImplementedException betterNIE = new NotImplementedException(
452: "Backend driver gave an object of an unsupported java type:"
453: + sqlObj.getClass().getName()
454: + ", at colum number " + col
455: + " of name "
456: + fields[col].getFieldName());
457: /**
458: * No need for this, see
459: * {@link SQLDataSerialization#getSerializer(Object)}
460: */
461: // betterNIE.initCause(nie);
462: throw betterNIE;
463: }
464: }
465:
466: } // while (serializers[col] == null)
467:
468: if (serializers[col] == null) // we found nothing
469: {
470: // TODO: add the following SQLWarning() to this resultset
471: // "The whole column number " + col + " was null"
472:
473: /**
474: * The whole column is null. Fall back on the JDBC type provided by
475: * backend's metaData.getColumnType(), hoping it's right. Since we are
476: * sending just nulls, a wrong typing should not do much harm anyway ?
477: *
478: * @see org.continuent.sequoia.controller.virtualdatabase.ControllerResultSet#ControllerResultSet(AbstractRequest,
479: * java.sql.ResultSet, MetadataCache, Statement)
480: */
481: /**
482: * We could (should ?) also use {@link Field#getColumnClassName()}, and
483: * do some reflection instead. This is depending on the behaviour and
484: * quality of the JDBC driver of the backends we want to support.
485: */
486:
487: final TypeTag javaObjectType = TypeTag
488: .jdbcToJavaObjectType(fields[col].getSqlType());
489:
490: if (!TypeTag.TYPE_ERROR.equals(javaObjectType))
491:
492: serializers[col] = SQLDataSerialization
493: .getSerializer(javaObjectType);
494:
495: else {
496: // set the undefined serializer (we promise not to use it)
497: serializers[col] = SQLDataSerialization
498: .getSerializer(null);
499:
500: // TODO: add the following SQLWarning() to this resultset
501: // "The whole column number " + col + " was null"
502: // **AND** there was an unknown JDBC type number
503:
504: // throw new NotImplementedException(
505: // "Could not guess type of column number " + (col + 1)
506: // + ". Whole column is null and backend provides "
507: // + "unknown JDBC type number: " + fields[col].getSqlType());
508: }
509:
510: } // if (serializers[col] == null) we found nothing for this whole column
511:
512: } // for (column)
513: }
514:
515: /**
516: * Serialize only rows, not any metadata. Useful for streaming. Called by the
517: * controller side. This method MUST mirror the following deserialization
518: * method: {@link org.continuent.sequoia.driver.DriverResultSet#receiveRows()}
519: *
520: * @param output destination stream
521: * @throws IOException on stream error
522: */
523: public void sendRowsToStream(DriverBufferedOutputStream output)
524: throws IOException {
525:
526: output.writeInt(data.size());
527:
528: boolean[] nulls = new boolean[fields.length];
529:
530: Iterator rowsIter = this .data.iterator();
531: while (rowsIter.hasNext()) {
532: Object[] row = (Object[]) rowsIter.next();
533: TypeTag.ROW.sendToStream(output);
534:
535: // first flag null values
536: for (int col = 0; col < row.length; col++) {
537: if (null == row[col])
538: nulls[col] = true;
539: else
540: nulls[col] = false;
541: // TODO: we should compress this
542: output.writeBoolean(nulls[col]);
543: }
544:
545: for (int col = 0; col < row.length; col++)
546: if (!nulls[col]) // send only non-nulls
547: {
548: try {
549: /**
550: * Here we are sure that serializers are initialized because:
551: * <p>
552: * (1) we went through
553: * {@link #sendToStream(DriverBufferedOutputStream)} at least once
554: * before
555: * <p>
556: * (2) and there was a non-zero ResultSet transfered, else we would
557: * not come here again.
558: */
559: serializers[col].sendToStream(row[col], output);
560: } catch (ClassCastException cce1) {
561: ClassCastException cce2 = new ClassCastException(
562: "Serializer "
563: + serializers[col]
564: + " failed on Java object: "
565: + row[col]
566: + " found in column: "
567: + col
568: + ", because of unexpected type "
569: + row[col].getClass().getName());
570: cce2.initCause(cce1);
571: throw cce2;
572: }
573: } // if !null
574:
575: } // while (rows)
576:
577: output.writeBoolean(this.hasMoreData);
578: output.flush();
579: }
580:
581: }
|