001: /*
002:
003: Derby - Class org.apache.derby.impl.sql.execute.ScalarAggregateResultSet
004:
005: Licensed to the Apache Software Foundation (ASF) under one or more
006: contributor license agreements. See the NOTICE file distributed with
007: this work for additional information regarding copyright ownership.
008: The ASF licenses this file to you under the Apache License, Version 2.0
009: (the "License"); you may not use this file except in compliance with
010: the License. You may obtain a copy of the License at
011:
012: http://www.apache.org/licenses/LICENSE-2.0
013:
014: Unless required by applicable law or agreed to in writing, software
015: distributed under the License is distributed on an "AS IS" BASIS,
016: WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017: See the License for the specific language governing permissions and
018: limitations under the License.
019:
020: */
021:
022: package org.apache.derby.impl.sql.execute;
023:
024: import org.apache.derby.iapi.services.monitor.Monitor;
025:
026: import org.apache.derby.iapi.services.sanity.SanityManager;
027:
028: import org.apache.derby.iapi.services.stream.HeaderPrintWriter;
029: import org.apache.derby.iapi.services.stream.InfoStreams;
030:
031: import org.apache.derby.iapi.services.io.Formatable;
032:
033: import org.apache.derby.iapi.sql.execute.CursorResultSet;
034: import org.apache.derby.iapi.sql.ResultSet;
035: import org.apache.derby.iapi.sql.execute.ExecRow;
036: import org.apache.derby.iapi.sql.execute.ExecIndexRow;
037: import org.apache.derby.iapi.sql.execute.NoPutResultSet;
038:
039: import org.apache.derby.iapi.sql.Activation;
040:
041: import org.apache.derby.iapi.store.access.ColumnOrdering;
042: import org.apache.derby.iapi.store.access.TransactionController;
043:
044: import org.apache.derby.iapi.services.loader.GeneratedMethod;
045:
046: import org.apache.derby.iapi.sql.conn.LanguageConnectionContext;
047:
048: import org.apache.derby.iapi.error.StandardException;
049:
050: import org.apache.derby.iapi.types.RowLocation;
051:
052: import org.apache.derby.iapi.services.io.FormatableArrayHolder;
053:
054: import java.util.Properties;
055: import java.util.Vector;
056: import java.util.Enumeration;
057:
058: /**
059: * This ResultSet evaluates scalar, non distinct aggregates.
060: * It will scan the entire source result set and calculate
061: * the scalar aggregates when scanning the source during the
062: * first call to next().
063: *
064: * @author jerry (broken out from SortResultSet)
065: */
066: class ScalarAggregateResultSet extends GenericAggregateResultSet
067: implements CursorResultSet {
068:
069: /* Run time statistics variables */
070: public int rowsInput;
071:
072: // set in constructor and not altered during
073: // life of object.
074: public boolean singleInputRow;
075: protected ExecIndexRow sortTemplateRow;
076: protected boolean isInSortedOrder; // true if source results in sorted order
077:
078: // Cache ExecIndexRow for scalar aggregates
079: protected ExecIndexRow sourceExecIndexRow;
080:
081: // Remember whether or not a next() has been satisfied
082: private boolean nextSatisfied;
083:
084: /**
085: * Constructor
086: *
087: * @param s input result set
088: * @param isInSortedOrder true if the source results are in sorted order
089: * @param aggregateItem indicates the number of the
090: * SavedObject off of the PreparedStatement that holds the
091: * AggregatorInfoList used by this routine.
092: * @param a activation
093: * @param ra generated method to build an empty
094: * output row
095: * @param resultSetNumber The resultSetNumber for this result set
096: *
097: * @exception StandardException Thrown on error
098: */
099: ScalarAggregateResultSet(NoPutResultSet s, boolean isInSortedOrder,
100: int aggregateItem, Activation a, GeneratedMethod ra,
101: int resultSetNumber, boolean singleInputRow,
102: double optimizerEstimatedRowCount,
103: double optimizerEstimatedCost) throws StandardException {
104: super (s, aggregateItem, a, ra, resultSetNumber,
105: optimizerEstimatedRowCount, optimizerEstimatedCost);
106: this .isInSortedOrder = isInSortedOrder;
107: // source expected to be non-null, mystery stress test bug
108: // - sometimes get NullPointerException in openCore().
109: if (SanityManager.DEBUG) {
110: SanityManager.ASSERT(source != null,
111: "SARS(), source expected to be non-null");
112: }
113: sortTemplateRow = getExecutionFactory().getIndexableRow(
114: (ExecRow) rowAllocator.invoke(activation));
115: this .singleInputRow = singleInputRow;
116:
117: if (SanityManager.DEBUG) {
118: SanityManager.DEBUG("AggregateTrace", "execution time: "
119: + a.getPreparedStatement().getSavedObject(
120: aggregateItem));
121: }
122: constructorTime += getElapsedMillis(beginTime);
123: }
124:
125: ///////////////////////////////////////////////////////////////////////////////
126: //
127: // ResultSet interface (leftover from NoPutResultSet)
128: //
129: ///////////////////////////////////////////////////////////////////////////////
130:
131: /**
132: * Open the scan. Load the sorter and prepare to get
133: * rows from it.
134: *
135: * @exception StandardException thrown if cursor finished.
136: */
137: public void openCore() throws StandardException {
138: beginTime = getCurrentTimeMillis();
139:
140: // source expected to be non-null, mystery stress test bug
141: // - sometimes get NullPointerException in openCore().
142: if (SanityManager.DEBUG) {
143: SanityManager.ASSERT(source != null,
144: "SARS.openCore(), source expected to be non-null");
145: SanityManager.ASSERT(!isOpen,
146: "ScalarAggregateResultSet already open");
147: }
148:
149: sourceExecIndexRow = getExecutionFactory().getIndexableRow(
150: sortTemplateRow);
151:
152: source.openCore();
153:
154: isOpen = true;
155: numOpens++;
156:
157: openTime += getElapsedMillis(beginTime);
158: }
159:
160: protected int countOfRows;
161:
162: /* RESOLVE - THIS NEXT METHOD IS OVERRIDEN IN DistinctScalarResultSet
163: * BEACAUSE OF A JIT ERROR. THERE IS NO OTHER
164: * REASON TO OVERRIDE IT IN DistinctScalarAggregateResultSet. THE BUG WAS FOUND IN
165: * 1.1.6 WITH THE JIT.
166: */
167: /**
168: * Return the next row. If it is a scalar aggregate scan
169: *
170: * @exception StandardException thrown on failure.
171: * @exception StandardException ResultSetNotOpen thrown if not yet open.
172: *
173: * @return the next row in the result
174: */
175: public ExecRow getNextRowCore() throws StandardException {
176: if (nextSatisfied) {
177: clearCurrentRow();
178: return null;
179: }
180:
181: ExecIndexRow sortResult = null;
182: ExecRow result = null;
183: ExecIndexRow execIndexRow = null;
184: ExecIndexRow aggResult = null;
185: //only care if it is a minAgg if we have a singleInputRow, then we know
186: //we are only looking at one aggregate
187: boolean minAgg = (singleInputRow && aggregates[0].aggInfo.aggregateName
188: .equals("MIN"));
189: beginTime = getCurrentTimeMillis();
190: if (isOpen) {
191: /*
192: ** We are dealing with a scalar aggregate.
193: ** Zip through each row and accumulate.
194: ** Accumulate into the first row. Only
195: ** the first row is cloned.
196: */
197: while ((execIndexRow = getRowFromResultSet(false)) != null) {
198: /*
199: ** Use a clone of the first row as our result.
200: ** We need to get a clone since we will be reusing
201: ** the original as the wrapper of the source row.
202: ** Turn cloning off since we wont be keeping any
203: ** other rows.
204: */
205: if (aggResult == null) {
206: /* No need to clone the row when doing the min/max
207: * optimization for MIN, since we will not do another
208: * next on the underlying result set.
209: */
210: aggResult = (singleInputRow && minAgg) ? execIndexRow
211: : (ExecIndexRow) execIndexRow.getClone();
212:
213: initializeScalarAggregation(aggResult);
214: } else {
215: accumulateScalarAggregation(execIndexRow,
216: aggResult, false);
217: }
218:
219: /* Only need to look at first single row if
220: * min/max optimization is on and operation is MIN
221: * or if operation is MAX first non-null row since null sorts
222: * as highest in btree
223: * Note only 1 aggregate is allowed in a singleInputRow
224: * optimization so we only need to look at the first aggregate
225: */
226: if (singleInputRow
227: && (minAgg || !aggResult.getColumn(
228: aggregates[0].aggregatorColumnId)
229: .isNull())) {
230: break;
231: }
232: }
233:
234: /*
235: ** If we have aggregates, we need to generate a
236: ** value for them now. Only finish the aggregation
237: ** if we haven't yet (i.e. if countOfRows == 0).
238: ** If there weren't any input rows, we'll allocate
239: ** one here.
240: */
241: if (countOfRows == 0) {
242: aggResult = finishAggregation(aggResult);
243: currentRow = aggResult;
244: setCurrentRow(aggResult);
245: countOfRows++;
246: }
247: }
248:
249: nextSatisfied = true;
250: nextTime += getElapsedMillis(beginTime);
251: return aggResult;
252: }
253:
254: /**
255: * If the result set has been opened,
256: * close the open scan.
257: *
258: * @exception StandardException thrown on error
259: */
260: public void close() throws StandardException {
261: beginTime = getCurrentTimeMillis();
262: if (isOpen) {
263: // we don't want to keep around a pointer to the
264: // row ... so it can be thrown away.
265: // REVISIT: does this need to be in a finally
266: // block, to ensure that it is executed?
267: clearCurrentRow();
268:
269: countOfRows = 0;
270: sourceExecIndexRow = null;
271: source.close();
272:
273: super .close();
274: } else if (SanityManager.DEBUG)
275: SanityManager.DEBUG("CloseRepeatInfo",
276: "Close of SortResultSet repeated");
277:
278: closeTime += getElapsedMillis(beginTime);
279:
280: nextSatisfied = false;
281: isOpen = false;
282: }
283:
284: /**
285: * Return the total amount of time spent in this ResultSet
286: *
287: * @param type CURRENT_RESULTSET_ONLY - time spent only in this ResultSet
288: * ENTIRE_RESULTSET_TREE - time spent in this ResultSet and below.
289: *
290: * @return long The total amount of time spent (in milliseconds).
291: */
292: public long getTimeSpent(int type) {
293: long totTime = constructorTime + openTime + nextTime
294: + closeTime;
295:
296: if (type == NoPutResultSet.CURRENT_RESULTSET_ONLY) {
297: return totTime
298: - originalSource
299: .getTimeSpent(ENTIRE_RESULTSET_TREE);
300: } else {
301: return totTime;
302: }
303: }
304:
305: ///////////////////////////////////////////////////////////////////////////////
306: //
307: // CursorResultSet interface
308: //
309: ///////////////////////////////////////////////////////////////////////////////
310:
311: /**
312: * This result set has its row location from
313: * the last fetch done. Always returns null.
314: *
315: * @see CursorResultSet
316: *
317: * @return the row location of the current cursor row.
318: * @exception StandardException thrown on failure to get row location
319: */
320: public RowLocation getRowLocation() throws StandardException {
321: return null;
322: }
323:
324: /**
325: * This result set has its row from the last fetch done.
326: * If the cursor is closed, a null is returned.
327: *
328: * @see CursorResultSet
329: *
330: * @return the last row returned;
331: * @exception StandardException thrown on failure.
332: */
333: /* RESOLVE - this should return activation.getCurrentRow(resultSetNumber),
334: * once there is such a method. (currentRow is redundant)
335: */
336: public ExecRow getCurrentRow() throws StandardException {
337: if (SanityManager.DEBUG)
338: SanityManager.ASSERT(isOpen,
339: "SortResultSet expected to be open");
340:
341: return currentRow;
342: }
343:
344: ///////////////////////////////////////////////////////////////////////////////
345: //
346: // SCAN ABSTRACTION UTILITIES
347: //
348: ///////////////////////////////////////////////////////////////////////////////
349:
350: /**
351: * Get a row from the input result set.
352: *
353: * @param doClone - true of the row should be cloned
354: *
355: * @exception StandardException Thrown on error
356: */
357: public ExecIndexRow getRowFromResultSet(boolean doClone)
358: throws StandardException {
359: ExecRow sourceRow;
360: ExecIndexRow inputRow = null;
361:
362: if ((sourceRow = source.getNextRowCore()) != null) {
363: rowsInput++;
364: sourceExecIndexRow
365: .execRowToExecIndexRow(doClone ? sourceRow
366: .getClone() : sourceRow);
367: inputRow = sourceExecIndexRow;
368: }
369:
370: return inputRow;
371: }
372:
373: /**
374: * reopen a scan on the table. scan parameters are evaluated
375: * at each open, so there is probably some way of altering
376: * their values...
377: *
378: * @exception StandardException thrown if cursor finished.
379: */
380: public void reopenCore() throws StandardException {
381: beginTime = getCurrentTimeMillis();
382: if (SanityManager.DEBUG)
383: SanityManager.ASSERT(isOpen,
384: "NormalizeResultSet already open");
385:
386: source.reopenCore();
387: numOpens++;
388: countOfRows = 0;
389: nextSatisfied = false;
390:
391: openTime += getElapsedMillis(beginTime);
392: }
393:
394: ///////////////////////////////////////////////////////////////////////////////
395: //
396: // AGGREGATION UTILITIES
397: //
398: ///////////////////////////////////////////////////////////////////////////////
399:
400: /**
401: * Run accumulation on every aggregate in this
402: * row. This method is useful when draining the source
403: * or sorter, depending on whether or not there were any
404: * distinct aggregates. Remember, if there are distinct
405: * aggregates, then the non-distinct aggregates were
406: * calculated on the way into the sorter and only the
407: * distinct aggregates will be accumulated here.
408: * Otherwise, all aggregates will be accumulated here.
409: *
410: * @param inputRow the input row
411: * @param accumulateRow the row with the accumulator (may be the same as the input row.
412: * @param hasDistinctAggregates does this scan have distinct
413: * aggregates. Used to figure out whether to merge
414: * or accumulate nondistinct aggregates.
415: *
416: * @exception StandardException Thrown on error
417: */
418: protected void accumulateScalarAggregation(ExecRow inputRow,
419: ExecRow accumulateRow, boolean hasDistinctAggregates)
420: throws StandardException {
421: int size = aggregates.length;
422:
423: if (SanityManager.DEBUG) {
424: SanityManager.ASSERT((inputRow != null)
425: && (accumulateRow != null),
426: "Null row passed to accumulateScalarAggregation");
427: }
428: for (int i = 0; i < size; i++) {
429: GenericAggregator currAggregate = aggregates[i];
430: if (hasDistinctAggregates
431: && !currAggregate.getAggregatorInfo().isDistinct()) {
432: currAggregate.merge(inputRow, accumulateRow);
433: } else {
434: currAggregate.accumulate(inputRow, accumulateRow);
435: }
436: }
437: }
438:
439: ///////////////////////////////////////////////////////////////////////////////
440: //
441: // CLASS SPECIFIC
442: //
443: ///////////////////////////////////////////////////////////////////////////////
444:
445: /*
446: ** Run the aggregator initialization method for
447: ** each aggregator in the row.
448: **
449: ** @param row the row to initialize
450: **
451: ** @return Nothing.
452: **
453: ** @exception standard cloudscape exception
454: */
455: private void initializeScalarAggregation(ExecRow row)
456: throws StandardException {
457: int size = aggregates.length;
458:
459: if (SanityManager.DEBUG) {
460: SanityManager.ASSERT(row != null,
461: "Null row passed to initializeScalarAggregation");
462: }
463:
464: for (int i = 0; i < size; i++) {
465: GenericAggregator currAggregate = aggregates[i];
466: currAggregate.initialize(row);
467: currAggregate.accumulate(row, row);
468: }
469: }
470: }
|