0001: /*
0002: * Copyright (c) 1998 - 2005 Versant Corporation
0003: * All rights reserved. This program and the accompanying materials
0004: * are made available under the terms of the Eclipse Public License v1.0
0005: * which accompanies this distribution, and is available at
0006: * http://www.eclipse.org/legal/epl-v10.html
0007: *
0008: * Contributors:
0009: * Versant Corporation - initial API and implementation
0010: */
0011: package com.versant.core.jdbc;
0012:
0013: import com.versant.core.common.Debug;
0014: import com.versant.core.common.OID;
0015: import com.versant.core.common.QueryResultContainer;
0016: import com.versant.core.common.State;
0017: import com.versant.core.metadata.*;
0018: import com.versant.core.server.*;
0019: import com.versant.core.jdbc.metadata.*;
0020: import com.versant.core.jdbc.query.JdbcCompiledQuery;
0021: import com.versant.core.jdbc.query.JdbcJDOQLCompiler;
0022: import com.versant.core.jdbc.query.SqlStruct;
0023: import com.versant.core.jdbc.sql.exp.SelectExp;
0024: import com.versant.core.jdbc.sql.exp.SqlExp;
0025: import com.versant.core.jdbc.sql.SqlDriver;
0026:
0027: import com.versant.core.jdbc.ejbql.JdbcQueryResultEJBQL;
0028:
0029: import java.math.BigDecimal;
0030: import java.math.BigInteger;
0031: import java.sql.*;
0032: import java.util.*;
0033:
0034: import com.versant.core.common.BindingSupportImpl;
0035: import com.versant.core.storagemanager.RunningQuery;
0036: import com.versant.core.storagemanager.ExecuteQueryReturn;
0037: import com.versant.core.storagemanager.ApplicationContext;
0038: import com.versant.core.jdo.QueryDetails;
0039:
0040: /**
0041: * The results of a JDBC JDOQL or SQL query.
0042: */
0043: public class JdbcQueryResult implements RunningQuery,
0044: ExecuteQueryReturn {
0045:
0046: private FetchInfo fetchInfo = new FetchInfo();
0047:
0048: protected PreparedStatement ps;
0049: private ResultSet rs;
0050: private Connection con;
0051:
0052: private ClassMetaData cmd;
0053: protected final JdbcStorageManager sm;
0054: private FetchGroup fetchGroup;
0055: private boolean includeSubclasses;
0056: private boolean scrollable;
0057: /**
0058: * The rs column position.
0059: */
0060: private int pos;
0061: private int oidCols;
0062: private JdbcOID lastOID;
0063:
0064: public String sql;
0065:
0066: /**
0067: * A map of {@link ColFHKey} to {@link ColFieldHolder}. A ColFieldHolder is
0068: * added to the map once some or all of its results have been processed.
0069: */
0070: private Map colHMap = new HashMap();
0071: /**
0072: * The compiled query for this result.
0073: */
0074: protected JdbcCompiledQuery cq;
0075: /**
0076: * used for creating queries for collecion fields.
0077: */
0078: private JdbcJDOQLCompiler qCompiler;
0079:
0080: private ModelMetaData jmd;
0081: private Object[] params;
0082: private boolean forUpdate;
0083: private int nextCount;
0084:
0085: private final Object[] singleResult = new Object[1];
0086: private boolean isCrossJoined;
0087:
0088: /**
0089: * No iteration has been done on the query.
0090: */
0091: private static final int STATUS_NOT_STARTED = 0;
0092: /**
0093: * This query is being iterated over.
0094: */
0095: private static final int STATUS_BUSY = 1;
0096: /**
0097: * This query has been iterated over to the end.
0098: */
0099: private static final int STATUS_FINISHED = 2;
0100:
0101: /**
0102: * Container used for caching the results
0103: */
0104: CachedQueryResult qRCache;
0105: /**
0106: * The state of the result.
0107: *
0108: * @see #STATUS_NOT_STARTED
0109: * @see #STATUS_BUSY
0110: * @see #STATUS_FINISHED
0111: */
0112: private int status;
0113: /**
0114: * This is a flag to indicate if the results might be cached or may be retrieved from cache.
0115: * A true might change to false but a false may not become a true.
0116: */
0117: private boolean isCacheble;
0118:
0119: public JdbcQueryResult next;
0120: public JdbcQueryResult prev;
0121:
0122: public JdbcQueryResult(JdbcStorageManager sm, JdbcCompiledQuery cq,
0123: Object[] params, boolean cachable) {
0124: this .sm = sm;
0125: this .cq = cq;
0126: this .params = params;
0127: isCacheble = cachable && cq.isCacheble();
0128: }
0129:
0130: public void init(ClassMetaData cmd, boolean scrollable) {
0131: this .cmd = cmd;
0132: if (cmd != null) {
0133: this .fetchGroup = cmd.fetchGroups[cq.getFetchGroupIndex()];
0134: this .oidCols = ((JdbcClass) cmd.storeClass).table.pkSimpleColumnCount;
0135: isCrossJoined = cq.isCrossJoinAllowed()
0136: && fetchGroup.crossJoinedCollectionField != null;
0137: } else {
0138: isCrossJoined = false;
0139: }
0140:
0141: this .jmd = sm.getJmd();
0142: this .includeSubclasses = cq.isIncludeSubclasses();
0143: this .scrollable = scrollable;
0144: }
0145:
0146: public RunningQuery getRunningQuery() {
0147: return this ;
0148: }
0149:
0150: public int getRelativeResultCount() {
0151: return nextCount;
0152: }
0153:
0154: public void resetRelativeResultCount() {
0155: nextCount = 0;
0156: }
0157:
0158: public Object[] getParams() {
0159: return params;
0160: }
0161:
0162: public void setParams(Object[] params) {
0163: this .params = params;
0164: }
0165:
0166: public FgDs getFgDs() {
0167: return ((JdbcFetchGroup) fetchGroup.storeFetchGroup).getFgDs(
0168: includeSubclasses, false);
0169: }
0170:
0171: private OID getLastOID() {
0172: return lastOID;
0173: }
0174:
0175: public void cancel() {
0176: try {
0177: ps.cancel();
0178: } catch (NullPointerException e) {
0179: //ignore
0180: } catch (SQLException e) {
0181: throw BindingSupportImpl.getInstance().invalidOperation(
0182: e.getMessage(), e);
0183: }
0184: }
0185:
0186: public CompiledQuery getCompiledQuery() {
0187: return cq;
0188: }
0189:
0190: public JdbcCompiledQuery getJdbcCompiledQuery() {
0191: return cq;
0192: }
0193:
0194: public boolean isClosed() {
0195: return ps == null;
0196: }
0197:
0198: /**
0199: * Close these results.
0200: */
0201: public void close() {
0202: if (colHMap != null) {
0203: Collection col = colHMap.values();
0204: for (Iterator iterator = col.iterator(); iterator.hasNext();) {
0205: ColFieldHolder holder = (ColFieldHolder) iterator
0206: .next();
0207: if (holder != null) {
0208: holder.close();
0209: }
0210: }
0211: colHMap.clear();
0212: }
0213:
0214: if (ps == null)
0215: return;
0216: if (rs != null) {
0217: try {
0218: rs.close();
0219: } catch (SQLException e) {
0220: // ignore
0221: }
0222: rs = null;
0223: }
0224: if (isEJBQLHack()) {
0225:
0226: ((JdbcQueryResultEJBQL) this ).closeImp();
0227:
0228: } else {
0229: try {
0230: ps.close();
0231: ps = null;
0232: } catch (SQLException e) {
0233: throw sm.handleException(e);
0234: }
0235: }
0236:
0237: setNonCacheble();
0238: status = STATUS_FINISHED;
0239:
0240: qRCache = null;
0241: params = null;
0242: //cq = null;
0243: }
0244:
0245: /**
0246: * Are there more results? Calling this advances to the next result if
0247: * there is one and returns true otherwise it returns false.
0248: */
0249: public boolean next(int skip) {
0250: pos = 1;
0251: /**
0252: * If a crossJoin was used then we may be on the next row already.
0253: */
0254: try {
0255: if (isCrossJoined) {
0256: if (skip == 0 && fetchInfo.onNextRow) {
0257: fetchInfo.onNextRow = false;
0258: return true;
0259: }
0260: OID compareTo = null;
0261: if (!fetchInfo.onNextRow && !fetchInfo.onValidRow) {
0262: /**
0263: * Not already on a next row
0264: */
0265: if (!rs.next()) {
0266: fetchInfo.onValidRow = false;
0267: return false;
0268: }
0269: fetchInfo.onValidRow = true;
0270: if (skip == 0) {
0271: return true;
0272: }
0273: }
0274:
0275: /**
0276: * If already on next with a skip greater that zero
0277: */
0278: compareTo = fetchInfo.nextOid;
0279: if (compareTo == null) {
0280: compareTo = getResultOID();
0281: pos = 1;
0282: }
0283: for (; skip >= 0;) {
0284: /**
0285: * Skip over the amount of same oids.
0286: */
0287: if (!rs.next()) {
0288: return false;
0289: }
0290: if (!getResultOID().equals(compareTo)) {
0291: //found a next batch
0292: skip--;
0293: }
0294: //update to the lastOid
0295: compareTo = lastOID;
0296: }
0297: fetchInfo.nextOid = lastOID;
0298: return true;
0299: } else {
0300: if (scrollable) {
0301: return rs.relative(skip + 1);
0302: } else {
0303: for (int i = 0; i < skip; i++)
0304: if (!rs.next())
0305: return false;
0306: return rs.next();
0307: }
0308: }
0309: } catch (SQLException e) {
0310: throw sm.handleException(e);
0311: }
0312: }
0313:
0314: public boolean isRandomAccess() {
0315: return scrollable;
0316: }
0317:
0318: public int getResultCount() {
0319: if (isNotStarted()) {
0320: updateCacheble();
0321: executeQueryImp();
0322: if (isCacheble()) {
0323: qRCache = new CachedQueryResult();
0324: }
0325: }
0326:
0327: try {
0328: if (!rs.last())
0329: return 0;
0330: return rs.getRow();
0331: } catch (SQLException e) {
0332: throw sm.handleException(e);
0333: }
0334: }
0335:
0336: public boolean absolute(int index) {
0337: try {
0338: pos = 1;
0339: return rs.absolute(index + 1);
0340: } catch (SQLException e) {
0341: throw sm.handleException(e);
0342: }
0343: }
0344:
0345: /**
0346: * Get the OID for the current result. This may only be called once
0347: * per result and must be called before getResultState.
0348: */
0349: public OID getResultOID() {
0350: pos = 1;
0351: try {
0352: lastOID = (JdbcOID) cmd.createOID(false);
0353: if (cq.isSqlQuery()) {
0354: if (cmd.pkFields == null) {
0355: lastOID.copyKeyFields(rs,
0356: cq.getMappingInfo(rs).dsPkIndex);
0357: } else {
0358: lastOID.copyKeyFields(rs,
0359: cq.getMappingInfo(rs).fields,
0360: cq.getMappingInfo(rs).pkIndexInFieldsArray);
0361: }
0362: } else {
0363: lastOID.copyKeyFields(rs, pos);
0364: pos += oidCols;
0365: }
0366: if (Debug.DEBUG)
0367: nextCount++;
0368: return lastOID;
0369: } catch (SQLException e) {
0370: throw sm.handleException(e);
0371: }
0372: }
0373:
0374: /**
0375: * Get the State for the current result. This may only be called once
0376: * per result and may only be called if getResultOID has been called.
0377: *
0378: * @see #getResultOID
0379: */
0380: public State getResultState(boolean forUpdate,
0381: StateContainer container) {
0382: try {
0383: if (cq.isSqlQuery()) {
0384: return getState(lastOID, cq.getMappingInfo(rs), rs);
0385: } else {
0386: MutableInt mutableInt = new MutableInt();
0387: State state = sm.createStateImp(rs, lastOID,
0388: fetchGroup, forUpdate, pos, mutableInt,
0389: includeSubclasses, container, cq.fgDs, !cq
0390: .isParColFetchEnabled(), cq
0391: .isCrossJoinAllowed(), null);
0392: //must fetch crossJoin results if enabled
0393: if (cq.isCrossJoinAllowed()) {
0394: FetchGroupField cjFGF = cq.fgDs.fg.crossJoinedCollectionField;
0395: if (cjFGF != null) {
0396: JdbcCollectionField colField = (JdbcCollectionField) cjFGF.fmd.storeField;
0397: colField.fetchFrom(rs, lastOID, state,
0398: cq.fgDs.fg.crossJoinedCollectionField,
0399: forUpdate, container, !cq
0400: .isParColFetchEnabled(),
0401: mutableInt.value, fetchInfo, sm);
0402: }
0403: }
0404: return state;
0405: }
0406: } catch (SQLException e) {
0407: throw sm.handleException(e);
0408: }
0409: }
0410:
0411: private static State getState(OID oid,
0412: JdbcCompiledQuery.MappingInfo mi, ResultSet rs)
0413: throws SQLException {
0414: ClassMetaData cmd = mi.cmd;
0415: if (mi.discrIndex != 0) {
0416: Object classId = ((JdbcClass) cmd.storeClass).classIdCol
0417: .get(rs, mi.discrIndex);
0418: if (rs.wasNull()) {
0419: throw BindingSupportImpl
0420: .getInstance()
0421: .objectNotFound(
0422: "No row for "
0423: + cmd.storeClass
0424: + " "
0425: + oid.toSString()
0426: + " OR "
0427: + ((JdbcClass) cmd.storeClass).classIdCol.name
0428: + " is null for row");
0429: }
0430: cmd = ((JdbcClass) cmd.storeClass).findClass(classId);
0431: if (cmd == null) {
0432: throw BindingSupportImpl
0433: .getInstance()
0434: .fatalDatastore(
0435: "Row for OID "
0436: + oid.toSString()
0437: + " is not in the heirachy starting at "
0438: + mi.cmd.storeClass
0439: + " ("
0440: + ((JdbcClass) mi.cmd.storeClass).classIdCol.name
0441: + " for row is " + classId
0442: + ")");
0443: }
0444: }
0445: State state = cmd.createState();
0446: oid.resolve(state);
0447: ((JdbcState) state).copyPass1Fields(rs, mi.fields);
0448: return state;
0449: }
0450:
0451: /**
0452: * Get fetchAmount of data if possible. This assumes that the next cursor is
0453: * correctly positioned.
0454: */
0455: public boolean addNextResult(ApplicationContext context,
0456: QueryResultContainer results, int fetchAmount) {
0457: try {
0458: /**
0459: * If this query contains 'out' params then we will return a Object[]
0460: * with length the same as the amount of 'out' and 'out_cursor' params.
0461: */
0462: if (cq.isSqlQuery()) {
0463: if (cq.isUnique()) {
0464: int[] paramDir = cq.getParamDirection();
0465: Object[] outResult = new Object[cq
0466: .getOutParamCount()];
0467: int outCount = 0;
0468: for (int i = 0; i < paramDir.length; i++) {
0469: if (paramDir[i] == JdbcCompiledQuery.PARAM_OUT) {
0470: outResult[outCount++] = ((CallableStatement) ps)
0471: .getObject(i + 1);
0472: } else if (paramDir[i] == JdbcCompiledQuery.PARAM_OUT_CURSOR) {
0473: //if there is mapping info available then map it to a class
0474: //else create an object[] with all the cols read as getObject
0475: JdbcCompiledQuery.MappingInfo mi = cq
0476: .getMappingInfo(rs);
0477: if (mi.isPkValid()) {
0478: //if this can be resolved to oid-state
0479: throw BindingSupportImpl.getInstance()
0480: .unsupported();
0481: } else {
0482: outResult[outCount++] = fetchAsObjects(mi);
0483: }
0484: }
0485: }
0486: results.addRow(outResult);
0487: return true;
0488: } else {
0489: JdbcCompiledQuery.MappingInfo mi = cq
0490: .getMappingInfo(rs);
0491: if (mi.isPkValid()) {
0492: return fetchManagedTypes(context, fetchAmount,
0493: cq.getFetchGroup(), results);
0494: } else {
0495: int colCount = mi.colCount;
0496: do {
0497: Object[] oa = new Object[colCount];
0498: for (int j = 0; j < colCount; j++) {
0499: oa[j] = rs.getObject(j + 1);
0500:
0501: }
0502: results.addRow(oa);
0503: } while (next(0));
0504: return true;
0505: }
0506: }
0507: }
0508:
0509: ProjectionQueryDecoder decoder = cq.getProjectionDecoder();
0510: /**
0511: * This is a normal query for a instance with no supplied projection
0512: * or aggregate info('result' is null or equal to default).
0513: *
0514: * In this scenario nothing is added to the 'result' as the
0515: * StatesReturned will already contain all the data.
0516: */
0517: if (decoder == null || decoder.isContainsThisOnly()) {
0518: return fetchManagedTypes(context, fetchAmount, cq
0519: .getFetchGroup(), results);
0520: } else {
0521: if (decoder.containsThis()) {
0522: return doContainsThis(fetchAmount, cq
0523: .getFetchGroup(), decoder
0524: .getResultTypeArray(), decoder
0525: .getTypeCodes(), decoder.getFmdArray(),
0526: results);
0527: } else {
0528: return doResultOnly(fetchAmount, decoder
0529: .getResultTypeArray(), decoder
0530: .getFmdArray(), decoder.getTypeCodes(),
0531: results);
0532: }
0533: }
0534: } catch (SQLException e) {
0535: throw sm.handleException(e);
0536: }
0537: }
0538:
0539: private Object fetchAsObjects(JdbcCompiledQuery.MappingInfo mi)
0540: throws SQLException {
0541: int colCount = mi.colCount;
0542: ArrayList rowData = new ArrayList();
0543: do {
0544: Object[] oa = new Object[colCount];
0545: for (int j = 0; j < colCount; j++) {
0546: oa[j] = rs.getObject(j + 1);
0547: }
0548: rowData.add(oa);
0549: } while (next(0));
0550: return rowData;
0551: }
0552:
0553: /**
0554: * Fill the results with managed instances(oid-state).
0555: */
0556: private boolean fetchManagedTypes(ApplicationContext context,
0557: int fetchAmount, FetchGroup fg, QueryResultContainer results) {
0558: OID currentOid = null;
0559: OID prevOid = null;
0560:
0561: if (fetchAmount == -1) {
0562: //get all results
0563: for (;;) {
0564: currentOid = getResultOID();
0565:
0566: //this is used to skip over same oids
0567: if (!currentOid.equals(prevOid)) {
0568: sm.getState(context, currentOid, fg, this ,
0569: results.container);
0570: results.addRow(currentOid);
0571: } else {
0572: fetchInfo.onNextRow = false;
0573: }
0574: prevOid = currentOid;
0575: if (fetchInfo.finished || !next(0))
0576: break;
0577: }
0578: return true;
0579: } else {
0580: for (int i = 0; i < fetchAmount; i++) {
0581: if (fetchInfo.onNextRow && fetchInfo.nextOid != null) {
0582: currentOid = fetchInfo.nextOid;
0583: fetchInfo.onNextRow = false;
0584: fetchInfo.nextOid = null;
0585: } else {
0586: currentOid = getResultOID();
0587: }
0588:
0589: //this is used to skip over same oids
0590: if (!currentOid.equals(prevOid)) {
0591: sm.getState(context, currentOid, fg, this ,
0592: results.container);
0593: results.addRow(currentOid);
0594: } else {
0595: fetchInfo.onNextRow = false;
0596: }
0597:
0598: //if no more then return
0599: if (fetchInfo.finished || !next(0))
0600: return true;
0601: prevOid = currentOid;
0602: }
0603: return false;
0604: }
0605: }
0606:
0607: private boolean doContainsThis(int fetchAmount, FetchGroup fg,
0608: int[] typeArray, int[] typeCodes, Object[] fmdArray,
0609: QueryResultContainer results) throws SQLException {
0610: //this rs column index to start reading
0611: int rsIndex = 1 + cq.getSelectColumnCount();
0612: if (fetchAmount == -1) {
0613: do {
0614: sm.getState(getResultOID(), fg, results.container);
0615: getDataImp(typeArray, fmdArray, typeCodes, rsIndex,
0616: results);
0617: } while (next(0));
0618: return true;
0619: } else {
0620: for (int i = 0; i < fetchAmount; i++) {
0621: sm.getState(getResultOID(), fg, results.container);
0622: getDataImp(typeArray, fmdArray, typeCodes, rsIndex,
0623: results);
0624: if (!next(0))
0625: return true;
0626: }
0627: return false;
0628: }
0629: }
0630:
0631: private boolean doResultOnly(int fetchAmount, int[] typeArray,
0632: Object[] fmdArray, int[] typeCodes,
0633: QueryResultContainer results) throws SQLException {
0634: //this rs column index to start reading
0635: if (fetchAmount == -1) {
0636: do {
0637: getDataImp(typeArray, fmdArray, typeCodes, 1, results);
0638: } while (next(0));
0639: return true;
0640: } else {
0641: for (int i = 0; i < fetchAmount; i++) {
0642: getDataImp(typeArray, fmdArray, typeCodes, 1, results);
0643: if (!next(0))
0644: return true;
0645: }
0646: return false;
0647: }
0648: }
0649:
0650: private void getDataImp(int[] typeArray, Object[] typeObjectArray,
0651: int[] typeCodes, int currentRsIndex,
0652: QueryResultContainer results) throws SQLException {
0653:
0654: Object[] resArray;
0655: if (typeArray.length == 1) {
0656: resArray = singleResult;
0657: } else {
0658: resArray = new Object[typeArray.length];
0659: }
0660:
0661: for (int i = 0; i < typeArray.length; i++) {
0662: int type = typeArray[i];
0663: switch (type) {
0664: case ProjectionQueryDecoder.TYPE_THIS: //this
0665: resArray[i] = lastOID;
0666: break;
0667: case ProjectionQueryDecoder.TYPE_FIELD: //field
0668: currentRsIndex = getFieldData(
0669: (FieldMetaData) typeObjectArray[i], rs,
0670: currentRsIndex, resArray, i);
0671: break;
0672: case ProjectionQueryDecoder.TYPE_AGGRETATE: //aggregate or computed field
0673:
0674: currentRsIndex = getFieldData(typeCodes[i], rs,
0675: currentRsIndex, resArray, i);
0676:
0677: break;
0678: case ProjectionQueryDecoder.TYPE_EXP:
0679: currentRsIndex = getFieldData(typeCodes[i], rs,
0680: currentRsIndex, resArray, i);
0681: break;
0682: case ProjectionQueryDecoder.TYPE_VAR:
0683: ClassMetaData cmd = (ClassMetaData) typeObjectArray[i];
0684: JdbcOID oid = (JdbcOID) cmd.createOID(false);
0685: if (oid.copyKeyFields(rs, currentRsIndex)) {
0686: resArray[i] = oid;
0687: } else {
0688: resArray[i] = null;
0689: }
0690: currentRsIndex += ((JdbcClass) cmd.storeClass).table.pk.length;
0691: break;
0692: default:
0693: throw BindingSupportImpl.getInstance().internal("");
0694: }
0695: }
0696:
0697: if (typeArray.length == 1) {
0698: results.addRow(resArray[0]);
0699: } else {
0700: results.addRow(resArray);
0701: }
0702: }
0703:
0704: private int getFieldData(int typeCode, ResultSet rs, int rsIndex,
0705: Object[] dataRow, int dataIndex) throws SQLException {
0706: switch (typeCode) {
0707: case MDStatics.BYTEW:
0708: dataRow[dataIndex] = new Byte(rs.getByte(rsIndex));
0709: break;
0710: case MDStatics.SHORTW:
0711: dataRow[dataIndex] = new Short(rs.getShort(rsIndex));
0712: break;
0713: case MDStatics.INTW:
0714: dataRow[dataIndex] = new Integer(rs.getInt(rsIndex));
0715: break;
0716: case MDStatics.LONGW:
0717: dataRow[dataIndex] = new Long(rs.getLong(rsIndex));
0718: break;
0719: case MDStatics.FLOATW:
0720: dataRow[dataIndex] = new Float(rs.getFloat(rsIndex));
0721: break;
0722: case MDStatics.DOUBLEW:
0723: dataRow[dataIndex] = new Double(rs.getDouble(rsIndex));
0724: break;
0725: case MDStatics.STRING:
0726: dataRow[dataIndex] = rs.getString(rsIndex);
0727: break;
0728: case MDStatics.BIGDECIMAL:
0729: dataRow[dataIndex] = new BigDecimal(rs.getDouble(rsIndex));
0730: break;
0731: case MDStatics.BIGINTEGER:
0732: dataRow[dataIndex] = new BigInteger(rs.getString(rsIndex));
0733: break;
0734: default:
0735: throw BindingSupportImpl.getInstance().internal(
0736: "Unhandled type '" + typeCode + "'");
0737: }
0738: return rsIndex + 1;
0739: }
0740:
0741: public int getFieldData(FieldMetaData fmd, ResultSet rs,
0742: int firstCol, Object[] dataRow, int dataIndex)
0743: throws SQLException {
0744: JdbcField f = (JdbcField) fmd.storeField;
0745: if (f instanceof JdbcSimpleField) {
0746: JdbcColumn c = ((JdbcSimpleField) f).col;
0747: if (Debug.DEBUG) {
0748: if (!cq.isProjectionQuery()
0749: && !c.name.toUpperCase().equals(
0750: rs.getMetaData()
0751: .getColumnName(firstCol)
0752: .toUpperCase())) {
0753: throw BindingSupportImpl.getInstance().internal(
0754: "Reading the wrong column: \nrs field = "
0755: + rs.getMetaData().getColumnName(
0756: firstCol)
0757: + "\nmetaData field = " + c.name);
0758: }
0759: }
0760: if (c.converter != null) {
0761: dataRow[dataIndex] = c.converter.get(rs, firstCol++, c);
0762: } else {
0763: dataRow[dataIndex] = JdbcUtils.get(rs, firstCol++,
0764: c.javaTypeCode, c.scale);
0765: if (rs.wasNull()) {
0766: dataRow[dataIndex] = null;
0767: }
0768: }
0769: } else if (f instanceof JdbcRefField) {
0770: JdbcRefField rf = (JdbcRefField) f;
0771: JdbcOID oid = (JdbcOID) rf.targetClass.createOID(false);
0772: if (oid.copyKeyFields(rs, firstCol)) {
0773: dataRow[dataIndex] = oid;
0774: } else {
0775: dataRow[dataIndex] = null;
0776: }
0777: firstCol += rf.cols.length;
0778: } else if (f instanceof JdbcPolyRefField) {
0779: dataRow[dataIndex] = JdbcGenericState.getPolyRefOID(f, rs,
0780: firstCol);
0781: firstCol += ((JdbcPolyRefField) f).cols.length;
0782: } else {
0783: throw BindingSupportImpl.getInstance().internal(
0784: "not implemented");
0785: }
0786: return firstCol;
0787: }
0788:
0789: public void doParallelFetch(boolean forUpdate,
0790: StateContainer container) {
0791: this .forUpdate = forUpdate;
0792: try {
0793: if (cq.isParColFetchEnabled()) {
0794: processParallelFetch(cq.fgDs.getJoinStruct(), 1, true,
0795: null, container);
0796: }
0797: } catch (Exception e) {
0798: throw sm.handleException(e);
0799: }
0800: }
0801:
0802: private JdbcJDOQLCompiler getQComp() {
0803: if (qCompiler == null) {
0804: qCompiler = new JdbcJDOQLCompiler(sm);
0805: } else {
0806: qCompiler.reinit();
0807: }
0808: return qCompiler;
0809: }
0810:
0811: /**
0812: * Execute a query for a collection.
0813: */
0814: private void exectureQ(SqlStruct sqlStruct, ColFieldHolder colHolder)
0815: throws SQLException {
0816: sqlStruct
0817: .updateSql(sm.getSqlDriver(), params, forUpdate, false);
0818: String sql = sqlStruct.getSql();
0819: PreparedStatement p2Ps = con.prepareStatement(sql);
0820: sqlStruct.setParamsOnPS(jmd, sm.getSqlDriver(), p2Ps, params,
0821: sql);
0822: colHolder.ps = p2Ps;
0823: colHolder.rs = p2Ps.executeQuery();
0824: }
0825:
0826: private SqlStruct createSqlStruct(ColFieldHolder colHolder) {
0827: final SqlStruct sqlStruct = new SqlStruct();
0828: sqlStruct.jdoqlFilter = "PQ: "
0829: + cq.getQueryDetails().getFilter();
0830:
0831: JdbcJDOQLCompiler qComp = getQComp();
0832: SelectExp root = (SelectExp) qComp.compileParallelFetch(cq
0833: .getQueryDetails());
0834: SelectExp se = colHolder.createSE(sm, root);
0835:
0836: //add the root table pk to the front of the select list
0837: SqlExp e = JdbcColumn.toSqlExp(
0838: ((JdbcClass) cmd.storeClass).table.pk, root,
0839: root.selectList);
0840: root.selectList = e;
0841:
0842: root.appendOrderByExp(se.orderByList);
0843:
0844: JdbcJDOQLCompiler
0845: .doFinalSql(sqlStruct, root, sm.getSqlDriver());
0846: JdbcJDOQLCompiler.compileParams(qComp.getQParser(), sqlStruct);
0847: return sqlStruct;
0848: }
0849:
0850: /**
0851: * This must be called on a root JoinStructure.
0852: */
0853: private void processParallelFetch(JoinStructure rootJs, int level,
0854: boolean valueJoin, ColFieldHolder parent,
0855: StateContainer container) throws SQLException {
0856: if (rootJs == null)
0857: return;
0858: if (!rootJs.rootJoinStructure)
0859: throw BindingSupportImpl.getInstance().internal("");
0860: rootJs.finish();
0861:
0862: List l = rootJs.colJoinStructs;
0863: if (l == null)
0864: return;
0865: for (int i = 0; i < l.size(); i++) {
0866: JoinStructure js2 = (JoinStructure) l.get(i);
0867: ColFHKey key = new ColFHKey(level, valueJoin, js2);
0868: ColFieldHolder colFHolder = (ColFieldHolder) colHMap
0869: .get(key);
0870: if (colFHolder == null) {
0871: colFHolder = new ColFieldHolder(parent, valueJoin, js2);
0872: colHMap.put(key, colFHolder);
0873:
0874: //Leave this here -Jaco-
0875: // SqlStruct sqlStr = compiledQuery.get(key);
0876: // try {
0877: // if (sqlStr == null) {
0878: // sqlStr = createSqlStruct(colFHolder, js2);
0879: // compiledQuery.add(key, sqlStr);
0880: // }
0881: // exectureQ(sqlStr, js2, colFHolder);
0882: // } finally {
0883: // if (sqlStr != null) sqlStr.returnToPool();
0884: // }
0885:
0886: if (level == 1
0887: && js2.parent == rootJs
0888: && js2.fgField == rootJs.fetchGroup.crossJoinedCollectionField
0889: && cq.isCrossJoinAllowed()) {
0890: //crossjoined
0891: colFHolder.valueJs = cq.fgDs.valueJs;
0892: // colFHolder.returnState = JdbcCollectionField.STATUS_CLOSED;
0893: colFHolder.returnState = JdbcCollectionField.STATUS_VALID_ROWS
0894: + JdbcCollectionField.STATUS_DATA_ADDED;
0895: colFHolder.crossJoinedField = true;
0896: } else {
0897: SqlStruct sqlStr = cq.get(key);
0898: if (sqlStr == null) {
0899: sqlStr = createSqlStruct(colFHolder);
0900: exectureQ(sqlStr, colFHolder);
0901: cq.add(key, sqlStr);
0902: } else {
0903: synchronized (sqlStr) {
0904: exectureQ(sqlStr, colFHolder);
0905: }
0906: }
0907: }
0908: }
0909:
0910: if ((colFHolder.returnState & JdbcCollectionField.STATUS_CLOSED) == JdbcCollectionField.STATUS_CLOSED) {
0911: // System.out.println("--- CONTINUE ON STATUS_CLOSED");
0912: continue;
0913: }
0914:
0915: //fetch the coll
0916: if (!colFHolder.crossJoinedField) {
0917: colFHolder.returnState = ((JdbcCollectionField) js2.fgField.fmd.storeField)
0918: .fetchWithFilter(sm, container, js2.fgField,
0919: colFHolder.rs, forUpdate, getLastOID(),
0920: colFHolder.lastOIDs, cmd, colFHolder);
0921: }
0922:
0923: if ((colFHolder.returnState & JdbcCollectionField.STATUS_VALID_ROWS) != JdbcCollectionField.STATUS_VALID_ROWS) {
0924: // System.out.println("--- CONTINUE ON STATUS_VALID_ROWS");
0925: continue;
0926: }
0927:
0928: if ((colFHolder.returnState & JdbcCollectionField.STATUS_DATA_ADDED) != JdbcCollectionField.STATUS_DATA_ADDED) {
0929: // System.out.println("--- CONTINUE ON STATUS_DATA_ADDED");
0930: continue;
0931: }
0932:
0933: //for a normal collection
0934: if (js2.fgField.nextFetchGroup != null) {
0935: // System.out.println("--- DOING VALUE FIELD");
0936: processParallelFetch(colFHolder.valueJs, level + 1,
0937: true, colFHolder, container);
0938: }
0939:
0940: //for a map with pc keys
0941: if (js2.fgField.nextKeyFetchGroup != null) {
0942: // System.out.println("--- DOING KEY FIELD");
0943: processParallelFetch(colFHolder.keyJs, level + 1,
0944: false, colFHolder, container);
0945: }
0946: }
0947: }
0948:
0949: /**
0950: * If the cache can be checked for results represented by this query.
0951: */
0952: public boolean isCachedResultsOk() {
0953: return isCacheble && status == STATUS_NOT_STARTED;
0954: }
0955:
0956: public void setQResult(QueryResultContainer qContainer) {
0957: if (Debug.DEBUG) {
0958: if (status != STATUS_NOT_STARTED) {
0959: throw BindingSupportImpl.getInstance().internal(
0960: "query already started");
0961: }
0962: }
0963: status = STATUS_BUSY;
0964: if (cq.isRandomAccess()) {
0965: if (!absolute(0)) {
0966: status = STATUS_FINISHED;
0967: qContainer.qFinished = true;
0968: }
0969: } else {
0970: if (!next(0)) {
0971: status = STATUS_FINISHED;
0972: qContainer.qFinished = true;
0973: }
0974: }
0975: }
0976:
0977: public boolean isNotStarted() {
0978: return status == STATUS_NOT_STARTED;
0979: }
0980:
0981: public void getAbsolute(ApplicationContext context,
0982: QueryResultContainer qContainer, int index, int fetchAmount) {
0983: if (isNotStarted()) {
0984: executeQueryImp();
0985: setQResult(qContainer);
0986: }
0987:
0988: resetRelativeResultCount();
0989: if (Debug.DEBUG) {
0990: if (!isRandomAccess()) {
0991: throw BindingSupportImpl.getInstance().internal(
0992: "getAbsolute may only "
0993: + "be called on randomAccess queries");
0994: }
0995: }
0996: if (absolute(index)) {
0997: if (addNextResult(context, qContainer, fetchAmount)) {
0998: status = STATUS_FINISHED;
0999: }
1000: doParallelFetch(forUpdate, qContainer.container);
1001: }
1002:
1003: if (status == STATUS_FINISHED) {
1004: qContainer.qFinished = true;
1005: }
1006: }
1007:
1008: /**
1009: * Get the next batch of results. Returns true if there are no more
1010: * (i.e. this query should be closed).
1011: */
1012: public boolean nextBatch(ApplicationContext context,
1013: int skipAmount, QueryResultContainer qContainer) {
1014: prepare(qContainer);
1015: updateCacheble();
1016:
1017: //get the rs at the correct spot
1018: if (skipAmount != 0) {
1019: setNonCacheble();
1020: if (!next(skipAmount - 1)) {
1021: status = STATUS_FINISHED;
1022: }
1023: }
1024:
1025: if (status != STATUS_FINISHED) {
1026: if (addNextResult(context, qContainer, cq
1027: .getQueryResultBatchSize())) {
1028: status = STATUS_FINISHED;
1029: }
1030:
1031: if (qRCache != null) {
1032: qContainer.addResultsTo(qRCache, cq
1033: .isCopyResultsForCache());
1034: }
1035: }
1036: doParallelFetch(forUpdate, qContainer.container);
1037: if (status == STATUS_FINISHED) {
1038: qContainer.qFinished = true;
1039: }
1040: return status == STATUS_FINISHED;
1041: }
1042:
1043: private void prepare(QueryResultContainer qContainer) {
1044: if (isNotStarted()) {
1045: updateCacheble();
1046: executeQueryImp();
1047: setQResult(qContainer);
1048: if (isCacheble()) {
1049: qRCache = new CachedQueryResult();
1050: }
1051: } else {
1052: resetRelativeResultCount();
1053: }
1054: }
1055:
1056: /**
1057: * This is called to recheck if this results is still cacheble.
1058: */
1059: public void updateCacheble() {
1060: if (isCacheble) {
1061: if (sm.isActive() && !sm.isOptimistic() || sm.isFlushed()
1062: || !sm.getCache().isQueryCacheEnabled()) {
1063: setNonCacheble();
1064: return;
1065: }
1066: }
1067: }
1068:
1069: public boolean isFinished() {
1070: return status == STATUS_FINISHED;
1071: }
1072:
1073: public boolean isCacheble() {
1074: return isCacheble;
1075: }
1076:
1077: public QueryDetails getQueryDetails() {
1078: return cq.getQueryDetails();
1079: }
1080:
1081: private void executeQueryImp() {
1082: executePrepare(sm.isForUpdate(), sm.getSqlDriver());
1083: executeActual();
1084: }
1085:
1086: private void executePrepare(boolean forUpdate, SqlDriver sqlDriver) {
1087: JdbcCompiledQuery cq = getJdbcCompiledQuery();
1088: ClassMetaData cmd = cq.getCmd();
1089: boolean randomAccess = cq.isRandomAccess();
1090: if (randomAccess && !sqlDriver.isScrollableResultSetSupported()) {
1091: throw BindingSupportImpl.getInstance().datastore(
1092: "Random access not supported for "
1093: + sqlDriver.getName()
1094: + " using JDBC driver "
1095: + sm.getJdbcConnectionSource()
1096: .getDriverName());
1097: }
1098: init(cmd, randomAccess);
1099: Object[] params = getParams();
1100: boolean ok = false;
1101: try {
1102: con = sm.con();
1103: if (isEJBQLHack()) {
1104:
1105: ((JdbcQueryResultEJBQL) this )
1106: .executePrepareHack(params);
1107:
1108: } else if (!cq.isSqlQuery()) {
1109: // this sync block can be removed when compiledQuery is no longer shared
1110: SqlStruct sqlStr = cq.getSqlStruct();
1111: synchronized (sqlStr) {
1112: sqlStr.updateSql(sqlDriver, params, forUpdate,
1113: false);
1114: String sql = sql = cq.getSql();
1115: if (randomAccess) {
1116: ps = con.prepareStatement(sql,
1117: ResultSet.TYPE_SCROLL_INSENSITIVE,
1118: ResultSet.CONCUR_READ_ONLY);
1119: } else {
1120: ps = con.prepareStatement(sql);
1121: }
1122: sqlStr.setParamsOnPS(jmd, sqlDriver, ps, params,
1123: sql);
1124: }
1125:
1126: } else if (cq.isStoredProc()) {
1127: CallableStatement cs = con.prepareCall(cq
1128: .getQueryDetails().getFilter());
1129: ps = cs;
1130: int[] sqlTypes = cq.getSqlTypes();
1131: int[] paramDir = cq.getParamDirection();
1132:
1133: final int count = sqlTypes.length;
1134: for (int i = 0; i < count; i++) {
1135: if (paramDir[i] == JdbcCompiledQuery.PARAM_IN) {
1136: cs.setObject(i + 1, params[i], sqlTypes[i]);
1137: } else if (paramDir[i] == JdbcCompiledQuery.PARAM_OUT) {
1138: cs.registerOutParameter(i + 1, sqlTypes[i]);
1139: } else if (paramDir[i] == JdbcCompiledQuery.PARAM_OUT_CURSOR) {
1140: cs.registerOutParameter(i + 1, -10);
1141: }
1142: }
1143:
1144: } else if (cq.isDirectSql()) {
1145: ps = con.prepareStatement(cq.getQueryDetails()
1146: .getFilter());
1147: int[] sqlTypes = cq.getSqlTypes();
1148: final int count = sqlTypes.length;
1149: for (int i = 0; i < count; i++) {
1150: ps.setObject(i + 1, params[i], sqlTypes[i]);
1151: }
1152: } else {
1153: throw BindingSupportImpl.getInstance().internal(
1154: "Undefined query type");
1155: }
1156:
1157: int maxRows = cq.getMaxRows();
1158: ps.setMaxRows(maxRows < 0 ? 0 : maxRows);
1159: ok = true;
1160: } catch (SQLException x) {
1161: throw sm.handleException(x);
1162: } finally {
1163: if (!ok) {
1164: try {
1165: close();
1166: } catch (Exception x) {
1167: // ignore as we are already in an exception
1168: }
1169: }
1170: }
1171: }
1172:
1173: private void executeActual() {
1174: JdbcCompiledQuery cq = getJdbcCompiledQuery();
1175:
1176: int maxRows = cq.getMaxRows();
1177:
1178: boolean ok = false;
1179: try {
1180: if (sm.getSqlDriver().isFetchSizeSupported()) {
1181: int fetchSize = cq.getQueryResultBatchSize();
1182: if (maxRows > 0 && fetchSize > maxRows)
1183: fetchSize = maxRows;
1184: if (fetchSize > 0)
1185: ps.setFetchSize(fetchSize);
1186: }
1187:
1188: try {
1189: if (isEJBQLHack()) {
1190:
1191: ((JdbcQueryResultEJBQL) this ).executeActualHack();
1192:
1193: } else if (cq.isSqlQuery()) {
1194: if (cq.isStoredProc()
1195: && sm.getSqlDriver().isOracleStoreProcs()) {
1196: ps.executeUpdate();
1197: int[] paramDir = cq.getParamDirection();
1198: boolean cursorSet = false;
1199: QueryDetails queryDetails = cq
1200: .getQueryDetails();
1201: final int count = queryDetails.getParamCount();
1202: for (int i = 0; i < count; i++) {
1203: if (paramDir[i] == JdbcCompiledQuery.PARAM_OUT_CURSOR) {
1204: if (cursorSet) {
1205: throw BindingSupportImpl
1206: .getInstance()
1207: .invalidOperation(
1208: "Query may have only one OUT parameter");
1209: }
1210: rs = (ResultSet) ((CallableStatement) ps)
1211: .getObject(i + 1);
1212: cq.getMappingInfo(rs);
1213: cursorSet = true;
1214: }
1215: }
1216: if (!cursorSet) {
1217: rs = (ResultSet) ((CallableStatement) ps)
1218: .getObject(count + 1);
1219: cq.getMappingInfo(rs);
1220: }
1221: } else {
1222: rs = ps.executeQuery();
1223: cq.getMappingInfo(rs);
1224: }
1225: /**
1226: * Check for a valid pk mapping
1227: */
1228: if (!cq.getMappingInfo(rs).isPkValid()
1229: && cq.getQueryDetails().getCandidateClass() != null) {
1230: throw BindingSupportImpl
1231: .getInstance()
1232: .invalidOperation(
1233: "Candidate class '"
1234: + cq
1235: .getQueryDetails()
1236: .getCandidateClass()
1237: .getName()
1238: + "' was specified, "
1239: + "but the ResultSet does not contain any/all of the pk columns.");
1240: }
1241: } else {
1242: rs = ps.executeQuery();
1243: }
1244: } catch (SQLException e) {
1245: throw sm.handleException("Query failed: "
1246: + JdbcUtils.toString(e) + "\n"
1247: + JdbcUtils.getPreparedStatementInfo(sql, ps),
1248: e);
1249: }
1250:
1251: ok = true;
1252: } catch (SQLException e) {
1253: sm.handleException(e);
1254: } finally {
1255: if (!ok) {
1256: try {
1257: close();
1258: } catch (Exception x) {
1259: // ignore as we are already in an exception
1260: }
1261: }
1262: }
1263: }
1264:
1265: /**
1266: * If at any stage it is detected that the results may not be cached this
1267: * this method is called.
1268: */
1269: public void setNonCacheble() {
1270: isCacheble = false;
1271: qRCache = null;
1272: }
1273:
1274: /**
1275: * Todo get rid of this horrible hack when we refactor all the query stuff
1276: */
1277: public boolean isEJBQLHack() {
1278: return false;
1279: }
1280:
1281: public void getAllResults(ApplicationContext context,
1282: QueryResultContainer container, boolean forUpdate) {
1283: try {
1284: executeQueryImp();
1285: resetRelativeResultCount();
1286:
1287: if (next(0)) {
1288: addNextResult(context, container, -1);
1289: }
1290: doParallelFetch(forUpdate, container.container);
1291: } finally {
1292: close();
1293: }
1294: }
1295:
1296: /**
1297: * This is a key for a {@link ColFieldHolder}.
1298: */
1299: public class ColFHKey {
1300:
1301: private int level;
1302: private boolean valueJoin;
1303: private JoinStructure js;
1304:
1305: public ColFHKey(int level, boolean valueJoin, JoinStructure js) {
1306: this .level = level;
1307: this .valueJoin = valueJoin;
1308: this .js = js;
1309: }
1310:
1311: public boolean equals(Object o) {
1312: if (this == o)
1313: return true;
1314: if (!(o instanceof ColFHKey))
1315: return false;
1316:
1317: final ColFHKey colFHKey = (ColFHKey) o;
1318:
1319: if (level != colFHKey.level)
1320: return false;
1321: if (valueJoin != colFHKey.valueJoin)
1322: return false;
1323: if (!js.equals(colFHKey.js))
1324: return false;
1325:
1326: return true;
1327: }
1328:
1329: public int hashCode() {
1330: int result;
1331: result = level;
1332: result = 29 * result + (valueJoin ? 1 : 0);
1333: result = 29 * result + js.hashCode();
1334: return result;
1335: }
1336:
1337: public void dump() {
1338: System.out.println("\n\n --JdbcQueryResult$ColFHKey.dump");
1339: System.out.println("level = " + level);
1340: System.out.println("valueJoin = " + valueJoin);
1341: System.out.println("js = " + js);
1342: }
1343:
1344: }
1345: }
|