001: /*
002:
003: Derby - Class org.apache.derby.impl.sql.execute.AggregateSortObserver
004:
005: Licensed to the Apache Software Foundation (ASF) under one or more
006: contributor license agreements. See the NOTICE file distributed with
007: this work for additional information regarding copyright ownership.
008: The ASF licenses this file to you under the Apache License, Version 2.0
009: (the "License"); you may not use this file except in compliance with
010: the License. You may obtain a copy of the License at
011:
012: http://www.apache.org/licenses/LICENSE-2.0
013:
014: Unless required by applicable law or agreed to in writing, software
015: distributed under the License is distributed on an "AS IS" BASIS,
016: WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017: See the License for the specific language governing permissions and
018: limitations under the License.
019:
020: */
021:
022: package org.apache.derby.impl.sql.execute;
023:
024: import org.apache.derby.iapi.store.access.SortObserver;
025: import org.apache.derby.iapi.services.io.Storable;
026:
027: import org.apache.derby.iapi.types.UserDataValue;
028:
029: import org.apache.derby.iapi.types.CloneableObject;
030:
031: import org.apache.derby.iapi.services.io.Storable;
032: import org.apache.derby.iapi.services.sanity.SanityManager;
033:
034: import org.apache.derby.iapi.error.StandardException;
035:
036: import org.apache.derby.iapi.sql.execute.ExecRow;
037:
038: import org.apache.derby.iapi.types.DataValueDescriptor;
039:
040: import java.util.Vector;
041:
042: /**
043: * This sort observer performs aggregation.
044: *
045: * @author jamie
046: */
047: public class AggregateSortObserver extends BasicSortObserver {
048:
049: protected GenericAggregator[] aggsToProcess;
050: protected GenericAggregator[] aggsToInitialize;
051:
052: private int firstAggregatorColumn;
053:
054: /**
055: * Simple constructor
056: *
057: * @param doClone If true, then rows that are retained
058: * by the sorter will be cloned. This is needed
059: * if language is reusing row wrappers.
060: *
061: * @param aggsToProcess the array of aggregates that
062: * need to be accumulated/merged in the sorter.
063: *
064: * @param aggsToInitialize the array of aggregates that
065: * need to be iniitialized as they are inserted
066: * into the sorter. This may be different than
067: * aggsToProcess in the case where some distinct
068: * aggregates are dropped in the initial pass of
069: * a two phase aggregation for scalar or vector
070: * distinct aggregation. The initialization process
071: * consists of replacing an empty UserValue with a new,
072: * initialized aggregate of the appropriate type.
073: * Note that for each row, only the first aggregate
074: * in this list is checked to see whether initialization
075: * is needed. If so, ALL aggregates are initialized;
076: * otherwise, NO aggregates are initialized.
077: *
078: * @param execRow ExecRow to use as source of clone for store.
079: */
080: public AggregateSortObserver(boolean doClone,
081: GenericAggregator[] aggsToProcess,
082: GenericAggregator[] aggsToInitialize, ExecRow execRow) {
083: super (doClone, false, execRow, true);
084: this .aggsToProcess = aggsToProcess;
085: this .aggsToInitialize = aggsToInitialize;
086:
087: /*
088: ** We expect aggsToInitialize and aggsToProcess to
089: ** be non null. However, if it is deemed ok for them
090: ** to be null, it shouldn't be too hard to add the
091: ** extra null checks herein.
092: */
093: if (SanityManager.DEBUG) {
094: SanityManager
095: .ASSERT(aggsToInitialize != null,
096: "aggsToInitialize argument to AggregateSortObserver is null");
097: SanityManager
098: .ASSERT(aggsToProcess != null,
099: "aggsToProcess argument to AggregateSortObserver is null");
100: }
101:
102: if (aggsToInitialize.length > 0) {
103: firstAggregatorColumn = aggsToInitialize[0].aggregatorColumnId;
104: }
105: }
106:
107: /**
108: * Called prior to inserting a distinct sort
109: * key.
110: *
111: * @param insertRow the current row that the sorter
112: * is on the verge of retaining
113: *
114: * @return the row to be inserted by the sorter. If null,
115: * then nothing is inserted by the sorter. Distinct
116: * sorts will want to return null.
117: *
118: * @exception StandardException never thrown
119: */
120: public DataValueDescriptor[] insertNonDuplicateKey(
121: DataValueDescriptor[] insertRow) throws StandardException {
122: DataValueDescriptor[] returnRow = super
123: .insertNonDuplicateKey(insertRow);
124:
125: /*
126: ** If we have an aggregator column that hasn't been
127: ** initialized, then initialize the entire row now.
128: */
129: if (aggsToInitialize.length > 0
130: && ((Storable) returnRow[firstAggregatorColumn])
131: .isNull()) {
132: for (int i = 0; i < aggsToInitialize.length; i++) {
133: GenericAggregator aggregator = aggsToInitialize[i];
134: UserDataValue wrapper = ((UserDataValue) returnRow[aggregator.aggregatorColumnId]);
135: if (SanityManager.DEBUG) {
136: if (!wrapper.isNull()) {
137: SanityManager
138: .THROWASSERT("during aggregate "
139: + "initialization, all wrappers expected to be empty; "
140: + "however, the wrapper for the following aggregate "
141: + "was not empty:" + aggregator
142: + ". The value stored is "
143: + wrapper.getObject());
144: }
145: }
146: wrapper.setValue(aggregator.getAggregatorInstance());
147: aggregator.accumulate(returnRow, returnRow);
148: }
149: }
150:
151: return returnRow;
152:
153: }
154:
155: /**
156: * Called prior to inserting a duplicate sort
157: * key. We do aggregation here.
158: *
159: * @param insertRow the current row that the sorter
160: * is on the verge of retaining. It is a duplicate
161: * of existingRow.
162: *
163: * @param existingRow the row that is already in the
164: * the sorter which is a duplicate of insertRow
165: *
166: * @exception StandardException never thrown
167: */
168: public DataValueDescriptor[] insertDuplicateKey(
169: DataValueDescriptor[] insertRow,
170: DataValueDescriptor[] existingRow) throws StandardException {
171: if (aggsToProcess.length == 0) {
172: return null;
173: }
174:
175: /*
176: ** If the other row already has an aggregator, then
177: ** we need to merge with it. Otherwise, accumulate
178: ** it.
179: */
180: for (int i = 0; i < aggsToProcess.length; i++) {
181: GenericAggregator aggregator = aggsToProcess[i];
182: if (((Storable) insertRow[aggregator.getColumnId()])
183: .isNull()) {
184: aggregator.accumulate(insertRow, existingRow);
185: } else {
186: aggregator.merge(insertRow, existingRow);
187: }
188: }
189: return null;
190: }
191: }
|