001: /*
002:
003: Derby - Class org.apache.derby.impl.sql.execute.GroupedAggregateResultSet
004:
005: Licensed to the Apache Software Foundation (ASF) under one or more
006: contributor license agreements. See the NOTICE file distributed with
007: this work for additional information regarding copyright ownership.
008: The ASF licenses this file to you under the Apache License, Version 2.0
009: (the "License"); you may not use this file except in compliance with
010: the License. You may obtain a copy of the License at
011:
012: http://www.apache.org/licenses/LICENSE-2.0
013:
014: Unless required by applicable law or agreed to in writing, software
015: distributed under the License is distributed on an "AS IS" BASIS,
016: WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017: See the License for the specific language governing permissions and
018: limitations under the License.
019:
020: */
021:
022: package org.apache.derby.impl.sql.execute;
023:
024: import org.apache.derby.iapi.services.monitor.Monitor;
025:
026: import org.apache.derby.iapi.services.sanity.SanityManager;
027:
028: import org.apache.derby.iapi.services.stream.HeaderPrintWriter;
029: import org.apache.derby.iapi.services.stream.InfoStreams;
030:
031: import org.apache.derby.iapi.services.io.Formatable;
032:
033: import org.apache.derby.iapi.sql.execute.CursorResultSet;
034: import org.apache.derby.iapi.sql.Activation;
035: import org.apache.derby.iapi.sql.ResultSet;
036: import org.apache.derby.iapi.sql.execute.ExecRow;
037: import org.apache.derby.iapi.sql.execute.ExecIndexRow;
038: import org.apache.derby.iapi.sql.execute.NoPutResultSet;
039:
040: import org.apache.derby.iapi.store.access.ColumnOrdering;
041: import org.apache.derby.iapi.types.DataValueDescriptor;
042: import org.apache.derby.iapi.store.access.SortObserver;
043: import org.apache.derby.iapi.store.access.TransactionController;
044: import org.apache.derby.iapi.store.access.SortController;
045: import org.apache.derby.iapi.store.access.ScanController;
046:
047: import org.apache.derby.iapi.services.loader.GeneratedMethod;
048:
049: import org.apache.derby.iapi.sql.execute.ExecutionFactory;
050: import org.apache.derby.iapi.sql.execute.ExecutionContext;
051: import org.apache.derby.iapi.sql.conn.LanguageConnectionContext;
052:
053: import org.apache.derby.iapi.error.StandardException;
054:
055: import org.apache.derby.iapi.types.RowLocation;
056:
057: import org.apache.derby.iapi.services.io.FormatableArrayHolder;
058:
059: import java.util.Properties;
060: import java.util.Vector;
061: import java.util.Enumeration;
062:
063: /**
064: * This ResultSet evaluates grouped, non distinct aggregates.
065: * It will scan the entire source result set and calculate
066: * the grouped aggregates when scanning the source during the
067: * first call to next().
068: *
069: * @author jerry (broken out from SortResultSet)
070: */
071: class GroupedAggregateResultSet extends GenericAggregateResultSet
072: implements CursorResultSet {
073:
074: /* Run time statistics variables */
075: public int rowsInput;
076: public int rowsReturned;
077:
078: // set in constructor and not altered during
079: // life of object.
080: private ColumnOrdering[] order;
081: private ExecIndexRow sortTemplateRow;
082: public boolean hasDistinctAggregate; // true if distinct aggregate
083: public boolean isInSortedOrder; // true if source results in sorted order
084: private int maxRowSize;
085:
086: // set in open and not modified thereafter
087: private ScanController scanController;
088:
089: // Cache ExecIndexRow
090: private ExecIndexRow sourceExecIndexRow;
091:
092: private ExecIndexRow sortResultRow;
093:
094: // In order group bys
095: private ExecIndexRow currSortedRow;
096: private boolean nextCalled;
097:
098: // used to track and close sorts
099: private long distinctAggSortId;
100: private boolean dropDistinctAggSort;
101: private long genericSortId;
102: private boolean dropGenericSort;
103: private TransactionController tc;
104:
105: // RTS
106: public Properties sortProperties = new Properties();
107:
108: /**
109: * Constructor
110: *
111: * @param s input result set
112: * @param isInSortedOrder true if the source results are in sorted order
113: * @param aggregateItem indicates the number of the
114: * SavedObject off of the PreparedStatement that holds the
115: * AggregatorInfoList used by this routine.
116: * @param orderingItem indicates the number of the
117: * SavedObject off of the PreparedStatement that holds the
118: * ColumOrdering array used by this routine
119: * @param a activation
120: * @param ra generated method to build an empty
121: * output row
122: * @param maxRowSize approx row size, passed to sorter
123: * @param resultSetNumber The resultSetNumber for this result set
124: *
125: * @exception StandardException Thrown on error
126: */
127: GroupedAggregateResultSet(NoPutResultSet s,
128: boolean isInSortedOrder, int aggregateItem,
129: int orderingItem, Activation a, GeneratedMethod ra,
130: int maxRowSize, int resultSetNumber,
131: double optimizerEstimatedRowCount,
132: double optimizerEstimatedCost) throws StandardException {
133: super (s, aggregateItem, a, ra, resultSetNumber,
134: optimizerEstimatedRowCount, optimizerEstimatedCost);
135: this .isInSortedOrder = isInSortedOrder;
136: sortTemplateRow = getExecutionFactory().getIndexableRow(
137: (ExecRow) rowAllocator.invoke(activation));
138: order = (ColumnOrdering[]) ((FormatableArrayHolder) (a
139: .getPreparedStatement().getSavedObject(orderingItem)))
140: .getArray(ColumnOrdering.class);
141:
142: if (SanityManager.DEBUG) {
143: SanityManager.DEBUG("AggregateTrace", "execution time: "
144: + a.getPreparedStatement().getSavedObject(
145: aggregateItem));
146: }
147:
148: constructorTime += getElapsedMillis(beginTime);
149: }
150:
151: ///////////////////////////////////////////////////////////////////////////////
152: //
153: // ResultSet interface (leftover from NoPutResultSet)
154: //
155: ///////////////////////////////////////////////////////////////////////////////
156:
157: /**
158: * Open the scan. Load the sorter and prepare to get
159: * rows from it.
160: *
161: * @exception StandardException thrown if cursor finished.
162: */
163: public void openCore() throws StandardException {
164: beginTime = getCurrentTimeMillis();
165: // REVISIT: through the direct DB API, this needs to be an
166: // error, not an ASSERT; users can open twice. Only through JDBC
167: // is access to open controlled and ensured valid.
168: if (SanityManager.DEBUG)
169: SanityManager.ASSERT(!isOpen,
170: "GroupedAggregateResultSet already open");
171:
172: sortResultRow = getExecutionFactory().getIndexableRow(
173: sortTemplateRow.getClone());
174: sourceExecIndexRow = getExecutionFactory().getIndexableRow(
175: sortTemplateRow.getClone());
176:
177: source.openCore();
178:
179: /* If this is an in-order group by then we do not need the sorter.
180: * (We can do the aggregation ourselves.)
181: * We save a clone of the first row so that subsequent next()s
182: * do not overwrite the saved row.
183: */
184: if (isInSortedOrder) {
185: currSortedRow = getNextRowFromRS();
186: if (currSortedRow != null) {
187: currSortedRow = (ExecIndexRow) currSortedRow.getClone();
188: initializeVectorAggregation(currSortedRow);
189: }
190: } else {
191: /*
192: ** Load up the sorter
193: */
194: scanController = loadSorter();
195: }
196:
197: isOpen = true;
198: numOpens++;
199:
200: openTime += getElapsedMillis(beginTime);
201: }
202:
203: /**
204: * Load up the sorter. Feed it every row from the
205: * source scan. If we have a vector aggregate, initialize
206: * the aggregator for each source row. When done, close
207: * the source scan and open the sort. Return the sort
208: * scan controller.
209: *
210: * @exception StandardException thrown on failure.
211: *
212: * @return the sort controller
213: */
214: private ScanController loadSorter() throws StandardException {
215: SortController sorter;
216: long sortId;
217: ExecRow sourceRow;
218: ExecRow inputRow;
219: int inputRowCountEstimate = (int) optimizerEstimatedRowCount;
220: boolean inOrder = isInSortedOrder;
221:
222: tc = getTransactionController();
223:
224: ColumnOrdering[] currentOrdering = order;
225:
226: /*
227: ** Do we have any distinct aggregates? If so, we'll need
228: ** a separate sort. We use all of the sorting columns and
229: ** drop the aggregation on the distinct column. Then
230: ** we'll feed this into the sorter again w/o the distinct
231: ** column in the ordering list.
232: */
233: if (aggInfoList.hasDistinct()) {
234: hasDistinctAggregate = true;
235:
236: GenericAggregator[] aggsNoDistinct = getSortAggregators(
237: aggInfoList, true, activation
238: .getLanguageConnectionContext(), source);
239: SortObserver sortObserver = new AggregateSortObserver(true,
240: aggsNoDistinct, aggregates, sortTemplateRow);
241:
242: sortId = tc.createSort((Properties) null, sortTemplateRow
243: .getRowArray(), order, sortObserver, false, // not in order
244: inputRowCountEstimate, // est rows, -1 means no idea
245: maxRowSize // est rowsize
246: );
247: sorter = tc.openSort(sortId);
248: distinctAggSortId = sortId;
249: dropDistinctAggSort = true;
250:
251: while ((sourceRow = source.getNextRowCore()) != null) {
252: sorter.insert(sourceRow.getRowArray());
253: rowsInput++;
254: }
255:
256: /*
257: ** End the sort and open up the result set
258: */
259: source.close();
260: sortProperties = sorter.getSortInfo().getAllSortInfo(
261: sortProperties);
262: sorter.close();
263:
264: scanController = tc.openSortScan(sortId, activation
265: .getResultSetHoldability());
266:
267: /*
268: ** Aggs are initialized and input rows
269: ** are in order. All we have to do is
270: ** another sort to remove (merge) the
271: ** duplicates in the distinct column
272: */
273: inOrder = true;
274: inputRowCountEstimate = rowsInput;
275:
276: /*
277: ** Drop the last column from the ordering. The
278: ** last column is the distinct column. Don't
279: ** pay any attention to the fact that the ordering
280: ** object's name happens to correspond to a techo
281: ** band from the 80's.
282: **
283: ** If there aren't any ordering columns other
284: ** than the distinct (i.e. for scalar distincts)
285: ** just skip the 2nd sort altogether -- we'll
286: ** do the aggregate merge ourselves rather than
287: ** force a 2nd sort.
288: */
289: if (order.length == 1) {
290: return scanController;
291: }
292:
293: ColumnOrdering[] newOrder = new ColumnOrdering[order.length - 1];
294: System.arraycopy(order, 0, newOrder, 0, order.length - 1);
295: currentOrdering = newOrder;
296: }
297:
298: SortObserver sortObserver = new AggregateSortObserver(true,
299: aggregates, aggregates, sortTemplateRow);
300:
301: sortId = tc.createSort((Properties) null, sortTemplateRow
302: .getRowArray(), currentOrdering, sortObserver, inOrder,
303: inputRowCountEstimate, // est rows
304: maxRowSize // est rowsize
305: );
306: sorter = tc.openSort(sortId);
307: genericSortId = sortId;
308: dropGenericSort = true;
309:
310: /* The sorter is responsible for doing the cloning */
311: while ((inputRow = getNextRowFromRS()) != null) {
312: sorter.insert(inputRow.getRowArray());
313: }
314: source.close();
315: sortProperties = sorter.getSortInfo().getAllSortInfo(
316: sortProperties);
317: sorter.close();
318:
319: return tc.openSortScan(sortId, activation
320: .getResultSetHoldability());
321: }
322:
323: /**
324: * Return the next row.
325: *
326: * @exception StandardException thrown on failure.
327: * @exception StandardException ResultSetNotOpen thrown if not yet open.
328: *
329: * @return the next row in the result
330: */
331: public ExecRow getNextRowCore() throws StandardException {
332: if (!isOpen) {
333: return null;
334: }
335:
336: beginTime = getCurrentTimeMillis();
337:
338: // In order group by
339: if (isInSortedOrder) {
340: // No rows, no work to do
341: if (currSortedRow == null) {
342: nextTime += getElapsedMillis(beginTime);
343: return null;
344: }
345:
346: ExecIndexRow nextRow = getNextRowFromRS();
347:
348: /* Drain and merge rows until we find new distinct values for the grouping columns. */
349: while (nextRow != null) {
350: /* We found a new set of values for the grouping columns.
351: * Update the current row and return this group.
352: */
353: if (!sameGroupingValues(currSortedRow, nextRow)) {
354: ExecIndexRow result = currSortedRow;
355:
356: /* Save a clone of the new row so that it doesn't get overwritten */
357: currSortedRow = (ExecIndexRow) nextRow.getClone();
358: initializeVectorAggregation(currSortedRow);
359:
360: nextTime += getElapsedMillis(beginTime);
361: rowsReturned++;
362: return finishAggregation(result);
363: } else {
364: /* Same group - initialize the new row and then merge the aggregates */
365: initializeVectorAggregation(nextRow);
366: mergeVectorAggregates(nextRow, currSortedRow);
367: }
368:
369: // Get the next row
370: nextRow = getNextRowFromRS();
371: }
372:
373: // We've drained the source, so no more rows to return
374: ExecIndexRow result = currSortedRow;
375: currSortedRow = null;
376: nextTime += getElapsedMillis(beginTime);
377: return finishAggregation(result);
378: } else {
379: ExecIndexRow sortResult = null;
380:
381: if ((sortResult = getNextRowFromRS()) != null) {
382: setCurrentRow(sortResult);
383: }
384:
385: /*
386: ** Only finish the aggregation
387: ** if we have a return row. We don't generate
388: ** a row on a vector aggregate unless there was
389: ** a group.
390: */
391: if (sortResult != null) {
392: sortResult = finishAggregation(sortResult);
393: currentRow = sortResult;
394: }
395:
396: if (sortResult != null) {
397: rowsReturned++;
398: }
399:
400: nextTime += getElapsedMillis(beginTime);
401: return sortResult;
402: }
403: }
404:
405: /**
406: * Return whether or not the new row has the same values for the
407: * grouping columns as the current row. (This allows us to process in-order
408: * group bys without a sorter.)
409: *
410: * @param currRow The current row.
411: * @param newRow The new row.
412: *
413: * @return Whether or not to filter out the new row has the same values for the
414: * grouping columns as the current row.
415: *
416: * @exception StandardException thrown on failure to get row location
417: */
418: private boolean sameGroupingValues(ExecRow currRow, ExecRow newRow)
419: throws StandardException {
420: for (int index = 0; index < order.length; index++) {
421: DataValueDescriptor currOrderable = currRow
422: .getColumn(order[index].getColumnId() + 1);
423: DataValueDescriptor newOrderable = newRow
424: .getColumn(order[index].getColumnId() + 1);
425: if (!(currOrderable.compare(
426: DataValueDescriptor.ORDER_OP_EQUALS, newOrderable,
427: true, true))) {
428: return false;
429: }
430: }
431: return true;
432: }
433:
434: /**
435: * If the result set has been opened,
436: * close the open scan.
437: *
438: * @exception StandardException thrown on error
439: */
440: public void close() throws StandardException {
441: beginTime = getCurrentTimeMillis();
442: if (isOpen) {
443: // we don't want to keep around a pointer to the
444: // row ... so it can be thrown away.
445: // REVISIT: does this need to be in a finally
446: // block, to ensure that it is executed?
447: clearCurrentRow();
448:
449: sortResultRow = null;
450: sourceExecIndexRow = null;
451: closeSource();
452:
453: if (dropDistinctAggSort) {
454: tc.dropSort(distinctAggSortId);
455: dropDistinctAggSort = false;
456: }
457:
458: if (dropGenericSort) {
459: tc.dropSort(genericSortId);
460: dropGenericSort = false;
461: }
462: super .close();
463: } else if (SanityManager.DEBUG)
464: SanityManager.DEBUG("CloseRepeatInfo",
465: "Close of SortResultSet repeated");
466:
467: closeTime += getElapsedMillis(beginTime);
468:
469: isOpen = false;
470: }
471:
472: /**
473: * Return the total amount of time spent in this ResultSet
474: *
475: * @param type CURRENT_RESULTSET_ONLY - time spent only in this ResultSet
476: * ENTIRE_RESULTSET_TREE - time spent in this ResultSet and below.
477: *
478: * @return long The total amount of time spent (in milliseconds).
479: */
480: public long getTimeSpent(int type) {
481: long totTime = constructorTime + openTime + nextTime
482: + closeTime;
483:
484: if (type == NoPutResultSet.CURRENT_RESULTSET_ONLY) {
485: return totTime
486: - originalSource
487: .getTimeSpent(ENTIRE_RESULTSET_TREE);
488: } else {
489: return totTime;
490: }
491: }
492:
493: ///////////////////////////////////////////////////////////////////////////////
494: //
495: // CursorResultSet interface
496: //
497: ///////////////////////////////////////////////////////////////////////////////
498:
499: /**
500: * This result set has its row location from
501: * the last fetch done. If the cursor is closed,
502: * a null is returned.
503: *
504: * @see CursorResultSet
505: *
506: * @return the row location of the current cursor row.
507: * @exception StandardException thrown on failure to get row location
508: */
509: public RowLocation getRowLocation() throws StandardException {
510: if (!isOpen)
511: return null;
512:
513: // REVISIT: could we reuse the same rowlocation object
514: // across several calls?
515: RowLocation rl;
516: rl = scanController.newRowLocationTemplate();
517: scanController.fetchLocation(rl);
518: return rl;
519: }
520:
521: /**
522: * This result set has its row from the last fetch done.
523: * If the cursor is closed, a null is returned.
524: *
525: * @see CursorResultSet
526: *
527: * @return the last row returned;
528: * @exception StandardException thrown on failure.
529: */
530: /* RESOLVE - this should return activation.getCurrentRow(resultSetNumber),
531: * once there is such a method. (currentRow is redundant)
532: */
533: public ExecRow getCurrentRow() throws StandardException {
534: if (SanityManager.DEBUG)
535: SanityManager.ASSERT(isOpen,
536: "SortResultSet expected to be open");
537:
538: return currentRow;
539: }
540:
541: ///////////////////////////////////////////////////////////////////////////////
542: //
543: // SCAN ABSTRACTION UTILITIES
544: //
545: ///////////////////////////////////////////////////////////////////////////////
546: /**
547: * Get the next output row for processing
548: */
549: private ExecIndexRow getNextRowFromRS() throws StandardException {
550: return (scanController == null) ? getRowFromResultSet()
551: : getRowFromSorter();
552: }
553:
554: /**
555: * Get a row from the input result set.
556: */
557: private ExecIndexRow getRowFromResultSet() throws StandardException {
558: ExecRow sourceRow;
559: ExecIndexRow inputRow = null;
560:
561: if ((sourceRow = source.getNextRowCore()) != null) {
562: rowsInput++;
563: sourceExecIndexRow.execRowToExecIndexRow(sourceRow);
564: inputRow = sourceExecIndexRow;
565: }
566:
567: return inputRow;
568: }
569:
570: /**
571: * Get a row from the sorter. Side effects:
572: * sets currentRow.
573: */
574: private ExecIndexRow getRowFromSorter() throws StandardException {
575: ExecIndexRow inputRow = null;
576:
577: if (scanController.next()) {
578: // REMIND: HACKALERT we are assuming that result will
579: // point to what sortResult is manipulating when
580: // we complete the fetch.
581: currentRow = sortResultRow;
582:
583: inputRow = getExecutionFactory()
584: .getIndexableRow(currentRow);
585:
586: scanController.fetch(inputRow.getRowArray());
587: }
588: return inputRow;
589: }
590:
591: /**
592: * Close the source of whatever we have been scanning.
593: *
594: * @exception StandardException thrown on error
595: */
596: public void closeSource() throws StandardException {
597: if (scanController == null) {
598: /*
599: ** NOTE: do not null out source, we
600: ** may be opened again, in which case
601: ** we will open source again.
602: */
603: source.close();
604: } else {
605: scanController.close();
606: scanController = null;
607: }
608: }
609:
610: ///////////////////////////////////////////////////////////////////////////////
611: //
612: // AGGREGATION UTILITIES
613: //
614: ///////////////////////////////////////////////////////////////////////////////
615: /**
616: * Run the aggregator initialization method for
617: * each aggregator in the row. Accumulate the
618: * input column. WARNING: initializiation performs
619: * accumulation -- no need to accumulate a row
620: * that has been passed to initialization.
621: *
622: * @param row the row to initialize
623: *
624: * @exception standard cloudscape exception
625: */
626: private void initializeVectorAggregation(ExecRow row)
627: throws StandardException {
628: int size = aggregates.length;
629:
630: if (SanityManager.DEBUG) {
631: SanityManager.ASSERT(row != null,
632: "Null row passed to initializeVectorAggregation");
633: }
634:
635: for (int i = 0; i < size; i++) {
636: GenericAggregator currAggregate = aggregates[i];
637:
638: // initialize the aggregator
639: currAggregate.initialize(row);
640:
641: // get the first value, accumulate it into itself
642: currAggregate.accumulate(row, row);
643: }
644: }
645:
646: /**
647: * Run the aggregator merge method for
648: * each aggregator in the row.
649: *
650: * @param newRow the row to merge
651: * @param currRow the row to merge into
652: *
653: * @exception standard cloudscape exception
654: */
655: private void mergeVectorAggregates(ExecRow newRow, ExecRow currRow)
656: throws StandardException {
657: for (int i = 0; i < aggregates.length; i++) {
658: GenericAggregator currAggregate = aggregates[i];
659:
660: // merge the aggregator
661: currAggregate.merge(newRow, currRow);
662: }
663: }
664:
665: }
|