001: /*
002:
003: Derby - Class org.apache.derby.impl.sql.execute.DistinctScalarAggregateResultSet
004:
005: Licensed to the Apache Software Foundation (ASF) under one or more
006: contributor license agreements. See the NOTICE file distributed with
007: this work for additional information regarding copyright ownership.
008: The ASF licenses this file to you under the Apache License, Version 2.0
009: (the "License"); you may not use this file except in compliance with
010: the License. You may obtain a copy of the License at
011:
012: http://www.apache.org/licenses/LICENSE-2.0
013:
014: Unless required by applicable law or agreed to in writing, software
015: distributed under the License is distributed on an "AS IS" BASIS,
016: WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017: See the License for the specific language governing permissions and
018: limitations under the License.
019:
020: */
021:
022: package org.apache.derby.impl.sql.execute;
023:
024: import org.apache.derby.iapi.services.sanity.SanityManager;
025:
026: import org.apache.derby.iapi.sql.execute.ExecRow;
027: import org.apache.derby.iapi.sql.execute.ExecIndexRow;
028: import org.apache.derby.iapi.sql.execute.NoPutResultSet;
029:
030: import org.apache.derby.iapi.sql.Activation;
031:
032: import org.apache.derby.iapi.store.access.ColumnOrdering;
033: import org.apache.derby.iapi.store.access.SortObserver;
034: import org.apache.derby.iapi.store.access.TransactionController;
035: import org.apache.derby.iapi.store.access.SortController;
036: import org.apache.derby.iapi.store.access.ScanController;
037:
038: import org.apache.derby.iapi.services.loader.GeneratedMethod;
039:
040: import org.apache.derby.iapi.error.StandardException;
041:
042: import org.apache.derby.iapi.services.io.FormatableArrayHolder;
043:
044: import java.util.Properties;
045:
046: /**
047: * This ResultSet evaluates scalar aggregates where
048: * 1 (or more, in the future) of the aggregates are distinct.
049: * It will scan the entire source result set and calculate
050: * the scalar aggregates when scanning the source during the
051: * first call to next().
052: *
053: * @author jerry (broken out from SortResultSet)
054: */
055: class DistinctScalarAggregateResultSet extends ScalarAggregateResultSet {
056: private ColumnOrdering[] order;
057: private int maxRowSize;
058: private boolean dropDistinctAggSort;
059: private long sortId;
060:
061: // set in open and not modified thereafter
062: private ScanController scanController;
063:
064: private ExecIndexRow sortResultRow;
065:
066: // remember whether or not any sort was performed
067: private boolean sorted;
068:
069: /**
070: * Constructor
071: *
072: * @param s input result set
073: * @param isInSortedOrder true if the source results are in sorted order
074: * @param aggregateItem indicates the number of the
075: * SavedObject off of the PreparedStatement that holds the
076: * AggregatorInfoList used by this routine.
077: * @param a activation
078: * @param ra generated method to build an empty
079: * output row
080: * @param resultSetNumber The resultSetNumber for this result set
081: *
082: * @exception StandardException Thrown on error
083: */
084: DistinctScalarAggregateResultSet(NoPutResultSet s,
085: boolean isInSortedOrder, int aggregateItem,
086: int orderingItem, Activation a, GeneratedMethod ra,
087: int maxRowSize, int resultSetNumber,
088: boolean singleInputRow, double optimizerEstimatedRowCount,
089: double optimizerEstimatedCost) throws StandardException {
090: super (s, isInSortedOrder, aggregateItem, a, ra,
091: resultSetNumber, singleInputRow,
092: optimizerEstimatedRowCount, optimizerEstimatedCost);
093:
094: order = (ColumnOrdering[]) ((FormatableArrayHolder) (a
095: .getPreparedStatement().getSavedObject(orderingItem)))
096: .getArray(ColumnOrdering.class);
097:
098: this .maxRowSize = maxRowSize;
099:
100: constructorTime += getElapsedMillis(beginTime);
101: }
102:
103: ///////////////////////////////////////////////////////////////////////////////
104: //
105: // ResultSet interface (leftover from NoPutResultSet)
106: //
107: ///////////////////////////////////////////////////////////////////////////////
108:
109: /**
110: * Open the scan. Load the sorter and prepare to get
111: * rows from it.
112: *
113: * @exception StandardException thrown if cursor finished.
114: */
115: public void openCore() throws StandardException {
116: beginTime = getCurrentTimeMillis();
117: // REVISIT: through the direct DB API, this needs to be an
118: // error, not an ASSERT; users can open twice. Only through JDBC
119: // is access to open controlled and ensured valid.
120: if (SanityManager.DEBUG)
121: SanityManager.ASSERT(!isOpen,
122: "DistinctScalarResultSet already open");
123:
124: sortResultRow = getExecutionFactory().getIndexableRow(
125: sortTemplateRow.getClone());
126: sourceExecIndexRow = getExecutionFactory().getIndexableRow(
127: sortTemplateRow.getClone());
128:
129: source.openCore();
130:
131: /*
132: ** Load up the sorter because we have something to sort.
133: */
134: scanController = loadSorter();
135: sorted = true;
136:
137: isOpen = true;
138: numOpens++;
139:
140: openTime += getElapsedMillis(beginTime);
141: }
142:
143: /* RESOLVE - THIS NEXT METHOD IS ONLY INCLUDED BECAUSE OF A JIT ERROR. THERE IS NO OTHER
144: * REASON TO OVERRIDE IT IN DistinctScalarAggregateResultSet. THE BUG WAS FOUND IN
145: * 1.1.6 WITH THE JIT.
146: */
147: /**
148: * Return the next row. If it is a scalar aggregate scan
149: *
150: * @exception StandardException thrown on failure.
151: * @exception StandardException ResultSetNotOpen thrown if not yet open.
152: *
153: * @return the next row in the result
154: */
155: public ExecRow getNextRowCore() throws StandardException {
156: ExecIndexRow sortResult = null;
157: ExecRow result = null;
158: ExecIndexRow execIndexRow = null;
159: ExecIndexRow aggResult = null;
160: boolean cloneArg = true;
161:
162: beginTime = getCurrentTimeMillis();
163: if (isOpen) {
164: /*
165: ** We are dealing with a scalar aggregate.
166: ** Zip through each row and accumulate.
167: ** Accumulate into the first row. Only
168: ** the first row is cloned.
169: */
170: while ((execIndexRow = getRowFromResultSet(cloneArg)) != null) {
171: /*
172: ** Use a clone of the first row as our result.
173: ** We need to get a clone since we will be reusing
174: ** the original as the wrapper of the source row.
175: ** Turn cloning off since we wont be keeping any
176: ** other rows.
177: */
178: if (aggResult == null) {
179: cloneArg = false;
180: aggResult = (ExecIndexRow) execIndexRow.getClone();
181: } else {
182: /*
183: ** Accumulate all aggregates. For the distinct
184: ** aggregates, we'll be accumulating, for the nondistinct
185: ** we'll be merging.
186: */
187: accumulateScalarAggregation(execIndexRow,
188: aggResult, true);
189: }
190: }
191:
192: /*
193: ** If we have aggregates, we need to generate a
194: ** value for them now. Only finish the aggregation
195: ** if we haven't yet (i.e. if countOfRows == 0).
196: ** If there weren't any input rows, we'll allocate
197: ** one here.
198: */
199: if (countOfRows == 0) {
200: aggResult = finishAggregation(aggResult);
201: currentRow = aggResult;
202: setCurrentRow(aggResult);
203: countOfRows++;
204: }
205: }
206:
207: nextTime += getElapsedMillis(beginTime);
208: return aggResult;
209: }
210:
211: /**
212: * reopen a scan on the table. scan parameters are evaluated
213: * at each open, so there is probably some way of altering
214: * their values...
215: *
216: * @exception StandardException thrown if cursor finished.
217: */
218: public void reopenCore() throws StandardException {
219: beginTime = getCurrentTimeMillis();
220: if (SanityManager.DEBUG)
221: SanityManager.ASSERT(isOpen,
222: "NormalizeResultSet already open");
223:
224: if (scanController != null) {
225: scanController.close();
226: scanController = null;
227: }
228:
229: source.reopenCore();
230:
231: /*
232: ** Load up the sorter because we have something to sort.
233: */
234: scanController = loadSorter();
235: sorted = true;
236: numOpens++;
237: countOfRows = 0;
238:
239: openTime += getElapsedMillis(beginTime);
240: }
241:
242: /**
243: * If the result set has been opened,
244: * close the open scan.
245: */
246: public void close() throws StandardException {
247: super .close();
248: closeSource();
249: }
250:
251: ///////////////////////////////////////////////////////////////////////////////
252: //
253: // SCAN ABSTRACTION UTILITIES
254: //
255: ///////////////////////////////////////////////////////////////////////////////
256:
257: /**
258: * Get a row from the sorter. Side effects:
259: * sets currentRow.
260: *
261: * @exception StandardException Thrown on error
262: */
263: public ExecIndexRow getRowFromResultSet(boolean doClone)
264: throws StandardException {
265: ExecIndexRow inputRow = null;
266:
267: if (scanController.next()) {
268: // REMIND: HACKALERT we are assuming that result will
269: // point to what sortResult is manipulating when
270: // we complete the fetch.
271: currentRow = doClone ? sortResultRow.getClone()
272: : sortResultRow;
273:
274: inputRow = getExecutionFactory()
275: .getIndexableRow(currentRow);
276:
277: scanController.fetch(inputRow.getRowArray());
278: }
279: return inputRow;
280: }
281:
282: /**
283: * Close the source of whatever we have been scanning.
284: *
285: * @exception StandardException thrown on error
286: */
287: protected void closeSource() throws StandardException {
288: if (scanController != null) {
289: if (dropDistinctAggSort) {
290: try {
291: getTransactionController().dropSort(sortId);
292: } catch (StandardException se) {
293: // Eat all errors at close() time
294: }
295: dropDistinctAggSort = false;
296: }
297: scanController.close();
298: scanController = null;
299: }
300: source.close();
301: }
302:
303: ///////////////////////////////////////////////////////////////////////////////
304: //
305: // MISC UTILITIES
306: //
307: ///////////////////////////////////////////////////////////////////////////////
308:
309: /**
310: * Load up the sorter. Feed it every row from the
311: * source scan. If we have a vector aggregate, initialize
312: * the aggregator for each source row. When done, close
313: * the source scan and open the sort. Return the sort
314: * scan controller.
315: *
316: * @exception StandardException thrown on failure.
317: *
318: * @return the sort controller
319: */
320: private ScanController loadSorter() throws StandardException {
321: SortController sorter;
322: ExecRow sourceRow;
323: int inputRowCountEstimate = (int) optimizerEstimatedRowCount;
324:
325: TransactionController tc = getTransactionController();
326:
327: /*
328: ** We have a distinct aggregate so, we'll need
329: ** to do a sort. We use all of the sorting columns and
330: ** drop the aggregation on the distinct column. Then
331: ** we'll feed this into the sorter again w/o the distinct
332: ** column in the ordering list.
333: */
334: GenericAggregator[] aggsNoDistinct = getSortAggregators(
335: aggInfoList, true, activation
336: .getLanguageConnectionContext(), source);
337: SortObserver sortObserver = new AggregateSortObserver(true,
338: aggsNoDistinct, aggregates, sortTemplateRow);
339:
340: sortId = tc.createSort((Properties) null, sortTemplateRow
341: .getRowArray(), order, sortObserver, false, // not in order
342: inputRowCountEstimate, // est rows, -1 means no idea
343: maxRowSize // est rowsize
344: );
345: sorter = tc.openSort(sortId);
346: dropDistinctAggSort = true;
347:
348: while ((sourceRow = source.getNextRowCore()) != null) {
349: sorter.insert(sourceRow.getRowArray());
350: rowsInput++;
351: }
352:
353: /*
354: ** End the sort and open up the result set
355: */
356: sorter.close();
357:
358: scanController = tc.openSortScan(sortId, activation
359: .getResultSetHoldability());
360:
361: /*
362: ** Aggs are initialized and input rows
363: ** are in order.
364: */
365: inputRowCountEstimate = rowsInput;
366:
367: return scanController;
368: }
369:
370: }
|