001: /*
002:
003: Derby - Class org.apache.derby.impl.store.access.sort.MergeScan
004:
005: Licensed to the Apache Software Foundation (ASF) under one or more
006: contributor license agreements. See the NOTICE file distributed with
007: this work for additional information regarding copyright ownership.
008: The ASF licenses this file to you under the Apache License, Version 2.0
009: (the "License"); you may not use this file except in compliance with
010: the License. You may obtain a copy of the License at
011:
012: http://www.apache.org/licenses/LICENSE-2.0
013:
014: Unless required by applicable law or agreed to in writing, software
015: distributed under the License is distributed on an "AS IS" BASIS,
016: WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017: See the License for the specific language governing permissions and
018: limitations under the License.
019:
020: */
021:
022: package org.apache.derby.impl.store.access.sort;
023:
024: import java.util.Enumeration;
025: import java.util.Vector;
026:
027: import org.apache.derby.iapi.services.sanity.SanityManager;
028: import org.apache.derby.iapi.services.io.Storable;
029: import org.apache.derby.iapi.error.StandardException;
030: import org.apache.derby.iapi.store.access.conglomerate.TransactionManager;
031: import org.apache.derby.iapi.store.access.conglomerate.ScanManager;
032: import org.apache.derby.iapi.store.access.ScanController;
033: import org.apache.derby.iapi.store.access.SortObserver;
034: import org.apache.derby.iapi.store.access.TransactionController;
035: import org.apache.derby.iapi.store.raw.StreamContainerHandle;
036: import org.apache.derby.iapi.store.raw.Transaction;
037:
038: import org.apache.derby.iapi.types.DataValueDescriptor;
039:
040: /**
041: A sort scan that is capable of merging as many merge runs
042: as will fit in the passed-in sort buffer.
043: **/
044:
045: public class MergeScan extends SortScan {
046: /**
047: The sort buffer we will use.
048: **/
049: protected SortBuffer sortBuffer;
050:
051: /**
052: The merge runs.
053: **/
054: protected Vector mergeRuns;
055:
056: /**
057: Array of scan controllers for the merge runs.
058: Entries in the array become null as the last
059: row is pulled out and the scan is closed.
060: **/
061: protected StreamContainerHandle openScans[];
062:
063: private SortObserver sortObserver;
064:
065: /*
066: * Constructors.
067: */
068:
069: MergeScan(MergeSort sort, TransactionManager tran,
070: SortBuffer sortBuffer, Vector mergeRuns,
071: SortObserver sortObserver, boolean hold) {
072: super (sort, tran, hold);
073: this .sortBuffer = sortBuffer;
074: this .mergeRuns = mergeRuns;
075: this .tran = tran;
076: this .sortObserver = sortObserver;
077: }
078:
079: /*
080: * Methods of MergeSortScan
081: */
082:
083: /**
084: Move to the next position in the scan.
085: @see ScanController#next
086: **/
087: public boolean next() throws StandardException {
088: current = sortBuffer.removeFirst();
089: if (current != null)
090: mergeARow(sortBuffer.getLastAux());
091: return (current != null);
092: }
093:
094: /**
095: Close the scan.
096: @see ScanController#close
097: **/
098: public void close() {
099: if (openScans != null) {
100: for (int i = 0; i < openScans.length; i++) {
101: if (openScans[i] != null) {
102: openScans[i].close();
103: }
104: openScans[i] = null;
105: }
106: openScans = null;
107: }
108:
109: // Hand sort buffer and remaining merge runs to sort.
110: if (super .sort != null) {
111: sort.doneScanning(this , sortBuffer, mergeRuns);
112: sortBuffer = null;
113: mergeRuns = null;
114: }
115:
116: // Sets sort to null
117: super .close();
118: }
119:
120: /**
121: Close the scan.
122: @see ScanManager#closeForEndTransaction
123: **/
124: public boolean closeForEndTransaction(boolean closeHeldScan) {
125: if (!hold || closeHeldScan) {
126: close();
127: return (true);
128: } else {
129: return (false);
130: }
131: }
132:
133: /*
134: * Methods of MergeScan
135: */
136:
137: /**
138: Initialize the scan, returning false if there
139: was some error.
140: **/
141: public boolean init(TransactionManager tran)
142: throws StandardException {
143: if (SanityManager.DEBUG) {
144: // We really expect to have at least one
145: // merge run.
146: SanityManager.ASSERT(mergeRuns != null);
147: SanityManager.ASSERT(mergeRuns.size() > 0);
148:
149: // This sort scan also expects that the
150: // caller has ensured that the sort buffer
151: // capacity will hold a row from all the
152: // merge runs.
153: SanityManager.ASSERT(sortBuffer.capacity() >= mergeRuns
154: .size());
155: }
156:
157: // Clear the sort buffer.
158: sortBuffer.reset();
159:
160: // Create an array to hold a scan controller
161: // for each merge run.
162: openScans = new StreamContainerHandle[mergeRuns.size()];
163: if (openScans == null)
164: return false;
165:
166: // Open a scan on each merge run.
167: int scanindex = 0;
168: Enumeration e = mergeRuns.elements();
169: while (e.hasMoreElements()) {
170: // get the container id
171: long id = ((Long) e.nextElement()).longValue();
172:
173: Transaction rawTran = tran.getRawStoreXact(); // get raw transaction
174: int segmentId = StreamContainerHandle.TEMPORARY_SEGMENT;
175: openScans[scanindex++] = rawTran.openStreamContainer(
176: segmentId, id, hold);
177: }
178:
179: // Load the initial rows.
180: for (scanindex = 0; scanindex < openScans.length; scanindex++)
181: mergeARow(scanindex);
182:
183: // Success!
184: return true;
185: }
186:
187: /**
188: Insert rows while we keep getting duplicates
189: from the merge run whose scan is in the
190: open scan array entry indexed by scanindex.
191: **/
192: void mergeARow(int scanindex) throws StandardException {
193: if (SanityManager.DEBUG) {
194: // Unless there's a bug, the scan index will refer
195: // to an open scan. That's because we never put
196: // a scan index for a closed scan into the sort
197: // buffer (via setNextAux).
198: SanityManager.ASSERT(openScans[scanindex] != null);
199: }
200:
201: DataValueDescriptor[] row;
202:
203: // Read rows from the merge run and stuff them into the
204: // sort buffer for as long as we encounter duplicates.
205: do {
206: row = sortObserver.getArrayClone();
207:
208: // Fetch the row from the merge run.
209: if (!openScans[scanindex].fetchNext(row)) {
210: // If we're out of rows in the merge run, close the scan.
211:
212: openScans[scanindex].close();
213: openScans[scanindex] = null;
214: return;
215: }
216:
217: // Save the index of this merge run with
218: // the row we're putting in the sort buffer.
219: sortBuffer.setNextAux(scanindex);
220: } while (sortBuffer.insert(row) == SortBuffer.INSERT_DUPLICATE);
221: }
222: }
|