001: /*
002:
003: Derby - Class org.apache.derby.impl.store.access.sort.MergeSort
004:
005: Licensed to the Apache Software Foundation (ASF) under one or more
006: contributor license agreements. See the NOTICE file distributed with
007: this work for additional information regarding copyright ownership.
008: The ASF licenses this file to you under the Apache License, Version 2.0
009: (the "License"); you may not use this file except in compliance with
010: the License. You may obtain a copy of the License at
011:
012: http://www.apache.org/licenses/LICENSE-2.0
013:
014: Unless required by applicable law or agreed to in writing, software
015: distributed under the License is distributed on an "AS IS" BASIS,
016: WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017: See the License for the specific language governing permissions and
018: limitations under the License.
019:
020: */
021:
022: package org.apache.derby.impl.store.access.sort;
023:
024: import org.apache.derby.iapi.reference.SQLState;
025:
026: import org.apache.derby.iapi.services.io.FormatableBitSet;
027:
028: import org.apache.derby.iapi.services.io.Storable;
029: import org.apache.derby.iapi.services.sanity.SanityManager;
030: import org.apache.derby.iapi.error.StandardException;
031: import org.apache.derby.iapi.store.access.conglomerate.ScanControllerRowSource;
032: import org.apache.derby.iapi.store.access.conglomerate.Sort;
033: import org.apache.derby.iapi.store.access.conglomerate.SortFactory;
034: import org.apache.derby.iapi.store.access.conglomerate.TransactionManager;
035: import org.apache.derby.iapi.types.CloneableObject;
036: import org.apache.derby.iapi.store.access.ColumnOrdering;
037: import org.apache.derby.iapi.store.access.ConglomerateController;
038: import org.apache.derby.iapi.store.access.Qualifier;
039: import org.apache.derby.iapi.store.access.RowUtil;
040: import org.apache.derby.iapi.store.access.ScanController;
041: import org.apache.derby.iapi.store.access.SortObserver;
042: import org.apache.derby.iapi.store.access.SortController;
043: import org.apache.derby.iapi.store.access.TransactionController;
044:
045: import org.apache.derby.iapi.store.raw.StreamContainerHandle;
046: import org.apache.derby.iapi.store.raw.RawStoreFactory;
047: import org.apache.derby.iapi.store.raw.Transaction;
048:
049: import org.apache.derby.iapi.types.DataValueDescriptor;
050:
051: import org.apache.derby.iapi.types.Orderable;
052: import org.apache.derby.iapi.types.RowLocation;
053:
054: import java.util.Enumeration;
055: import java.util.Properties;
056: import java.util.Vector;
057:
058: /**
059:
060: A sort implementation which does the sort in-memory if it can,
061: but which can do an external merge sort so that it can sort an
062: arbitrary number of rows.
063:
064: **/
065:
066: public final class MergeSort implements Sort {
067:
068: /*
069: * Fields
070: */
071:
072: /**
073: **/
074: static final int STATE_CLOSED = 0;
075:
076: /**
077: **/
078: static final int STATE_INITIALIZED = 1;
079:
080: /**
081: **/
082: static final int STATE_INSERTING = 2;
083:
084: /**
085: **/
086: static final int STATE_DONE_INSERTING = 3;
087:
088: /**
089: **/
090: static final int STATE_SCANNING = 4;
091:
092: /**
093: **/
094: static final int STATE_DONE_SCANNING = 5;
095:
096: /**
097: Maintains the current state of the sort as defined in
098: the preceding values. Sorts start off and end up closed.
099: **/
100: protected int state = STATE_CLOSED;
101:
102: /**
103: The template as passed in on create. Valid when the state
104: is INITIALIZED through SCANNING, null otherwise.
105: **/
106: protected DataValueDescriptor[] template;
107:
108: /**
109: The column ordering as passed in on create. Valid when
110: the state is INITIALIZED through SCANNING, null otherwise.
111: May be null if there is no column ordering - this means
112: that all rows are considered to be duplicates, and the
113: sort will only emit a single row.
114: **/
115: protected ColumnOrdering columnOrdering[];
116:
117: /**
118: A lookup table to speed up lookup of a column associated with the i'th
119: column to compare. To find the column id to compare as the i'th column
120: look in columnOrderingMap[i].
121: **/
122: protected int columnOrderingMap[];
123:
124: /**
125: A lookup table to speed up lookup of Ascending state of a column,
126: **/
127: protected boolean columnOrderingAscendingMap[];
128:
129: /**
130: The sort observer. May be null. Used as a callback.
131: **/
132: protected SortObserver sortObserver;
133:
134: /**
135: Whether the rows are expected to be in order on insert,
136: as passed in on create.
137: **/
138: protected boolean alreadyInOrder;
139:
140: /**
141: The inserter that's being used to insert rows into the sort.
142: This field is only valid when the state is INSERTING.
143: **/
144: protected MergeInserter inserter = null;
145:
146: /**
147: The scan that's being used to return rows from the sort.
148: This field is only valid when the state is SCANNING.
149: **/
150: protected Scan scan = null;
151:
152: /**
153: A vector of merge runs, produced by the MergeInserter.
154: Might be null if no merge runs were produced.
155: It is a vector of container ids.
156: **/
157: protected Vector mergeRuns = null;
158:
159: /**
160: An ordered set of the leftover rows that didn't go
161: in the last merge run (might be all the rows if there
162: are no merge runs).
163: **/
164: protected SortBuffer sortBuffer = null;
165:
166: /**
167: The maximum number of entries a sort buffer can hold.
168: **/
169: protected int sortBufferMax;
170:
171: /**
172: The minimum number of entries a sort buffer can hold.
173: **/
174: protected int sortBufferMin;
175:
176: /**
177: Properties for mergeSort
178: **/
179: static Properties properties = null;
180:
181: /**
182: Static initializer for MergeSort, to initialize once the properties
183: for the sortBuffer.
184: **/
185: static {
186: properties = new Properties();
187: properties.put(
188: RawStoreFactory.STREAM_FILE_BUFFER_SIZE_PARAMETER,
189: "16384");
190: }
191:
192: /*
193: * Methods of Sort
194: */
195:
196: /**
197: Open a sort controller.
198: <p>
199: This implementation only supports a single sort controller
200: per sort.
201: @see Sort#open
202: **/
203: public SortController open(TransactionManager tran)
204: throws StandardException {
205: if (SanityManager.DEBUG)
206: SanityManager.ASSERT(state == STATE_INITIALIZED);
207:
208: // Ready to start inserting rows.
209: state = STATE_INSERTING;
210:
211: // Create and initialize an inserter. When the caller
212: // closes it, it will call back to inserterIsClosed().
213: this .inserter = new MergeInserter();
214: if (this .inserter.initialize(this , tran) == false) {
215: throw StandardException
216: .newException(SQLState.SORT_COULD_NOT_INIT);
217: }
218:
219: return this .inserter;
220: }
221:
222: /**
223: Open a scan controller.
224: @see Sort#openSortScan
225: **/
226:
227: public ScanController openSortScan(TransactionManager tran,
228: boolean hold) throws StandardException {
229: if (SanityManager.DEBUG)
230: SanityManager.ASSERT(state == STATE_DONE_INSERTING);
231:
232: if (mergeRuns == null || mergeRuns.size() == 0) {
233: // There were no merge runs so we can just return
234: // the rows from the sort buffer.
235: scan = new SortBufferScan(this , tran, sortBuffer, hold);
236:
237: // The scan now owns the sort buffer
238: sortBuffer = null;
239: } else {
240: // Dump the rows in the sort buffer to a merge run.
241: long containerId = createMergeRun(tran, sortBuffer);
242: mergeRuns.addElement(new Long(containerId));
243:
244: // If there are more merge runs than we can sort
245: // at once with our sort buffer, we have to reduce
246: // the number of merge runs
247: if (mergeRuns.size() > ExternalSortFactory.DEFAULT_MAX_MERGE_RUN
248: || mergeRuns.size() > sortBuffer.capacity())
249: multiStageMerge(tran);
250:
251: // There are now few enough merge runs to sort
252: // at once, so create a scan for them.
253: MergeScan mscan = new MergeScan(this , tran, sortBuffer,
254: mergeRuns, sortObserver, hold);
255:
256: if (!mscan.init(tran)) {
257: throw StandardException
258: .newException(SQLState.SORT_COULD_NOT_INIT);
259: }
260: scan = mscan;
261:
262: // The scan now owns the sort buffer and merge runs.
263: sortBuffer = null;
264: mergeRuns = null;
265: }
266:
267: // Ready to start retrieving rows.
268: this .state = STATE_SCANNING;
269:
270: return scan;
271: }
272:
273: /**
274: Open a row source to get rows out of the sorter.
275: @see Sort#openSortRowSource
276: **/
277: public ScanControllerRowSource openSortRowSource(
278: TransactionManager tran) throws StandardException {
279: if (SanityManager.DEBUG)
280: SanityManager.ASSERT(state == STATE_DONE_INSERTING);
281:
282: ScanControllerRowSource rowSource = null;
283:
284: if (mergeRuns == null || mergeRuns.size() == 0) {
285: // There were no merge runs so we can just return
286: // the rows from the sort buffer.
287: scan = new SortBufferRowSource(sortBuffer, tran,
288: sortObserver, false, sortBufferMax);
289: rowSource = (ScanControllerRowSource) scan;
290:
291: // The scan now owns the sort buffer
292: sortBuffer = null;
293: } else {
294: // Dump the rows in the sort buffer to a merge run.
295: long containerId = createMergeRun(tran, sortBuffer);
296: mergeRuns.addElement(new Long(containerId));
297:
298: // If there are more merge runs than we can sort
299: // at once with our sort buffer, we have to reduce
300: // the number of merge runs
301: if (mergeRuns.size() > ExternalSortFactory.DEFAULT_MAX_MERGE_RUN
302: || mergeRuns.size() > sortBuffer.capacity())
303: multiStageMerge(tran);
304:
305: // There are now few enough merge runs to sort
306: // at once, so create a rowSource for them.
307: MergeScanRowSource msRowSource = new MergeScanRowSource(
308: this , tran, sortBuffer, mergeRuns, sortObserver,
309: false);
310: if (!msRowSource.init(tran)) {
311: throw StandardException
312: .newException(SQLState.SORT_COULD_NOT_INIT);
313: }
314: scan = msRowSource;
315: rowSource = msRowSource;
316:
317: // The scan now owns the sort buffer and merge runs.
318: sortBuffer = null;
319: mergeRuns = null;
320: }
321:
322: // Ready to start retrieving rows.
323: this .state = STATE_SCANNING;
324:
325: return rowSource;
326: }
327:
328: /**
329: Drop the sort.
330: @see Sort#drop
331: **/
332: public void drop(TransactionController tran)
333: throws StandardException {
334: // Make sure the inserter is closed. Note this
335: // will cause the callback to doneInserting()
336: // which will give us any in-progress merge
337: // runs, if there are any.
338: if (inserter != null)
339: inserter.close();
340: inserter = null;
341:
342: // Make sure the scan is closed, if there is one.
343: // This will cause the callback to doneScanning().
344: if (scan != null) {
345: scan.close();
346: scan = null;
347: }
348:
349: // If we have a row set, get rid of it.
350: if (sortBuffer != null) {
351: sortBuffer.close();
352: sortBuffer = null;
353: }
354:
355: // Clean out the rest of the objects.
356: template = null;
357: columnOrdering = null;
358: sortObserver = null;
359:
360: // If there are any merge runs, drop them.
361: dropMergeRuns((TransactionManager) tran);
362:
363: // Whew!
364: state = STATE_CLOSED;
365: }
366:
367: /*
368: * Methods of MergeSort. Arranged alphabetically.
369: */
370:
371: /**
372: Check the column ordering against the template, making
373: sure that each column is present in the template,
374: implements Orderable, and is not mentioned more than
375: once. Intended to be called as part of a sanity check.
376: **/
377: protected boolean checkColumnOrdering(
378: DataValueDescriptor[] template,
379: ColumnOrdering columnOrdering[]) {
380: // Allocate an array to check that each column mentioned only once.
381: int templateNColumns = template.length;
382: boolean seen[] = new boolean[templateNColumns];
383:
384: // Check each column ordering.
385: for (int i = 0; i < columnOrdering.length; i++) {
386: int colid = columnOrdering[i].getColumnId();
387:
388: // Check that the column id is valid.
389: if (colid < 0 || colid >= templateNColumns)
390: return false;
391:
392: // Check that the column isn't mentioned more than once.
393: if (seen[colid])
394: return false;
395: seen[colid] = true;
396:
397: Object columnVal = RowUtil.getColumn(template,
398: (FormatableBitSet) null, colid);
399:
400: if (!(columnVal instanceof Orderable))
401: return false;
402: }
403:
404: return true;
405: }
406:
407: /**
408: Check that the columns in the row agree with the columns
409: in the template, both in number and in type.
410: <p>
411: XXX (nat) Currently checks that the classes implementing
412: each column are the same -- is this right?
413: **/
414: void checkColumnTypes(DataValueDescriptor[] row)
415: throws StandardException {
416: int nCols = row.length;
417: if (template.length != nCols) {
418: if (SanityManager.DEBUG) {
419: SanityManager
420: .THROWASSERT("template.length ("
421: + template.length
422: + ") expected to be = to nCols ("
423: + nCols + ")");
424: }
425: throw StandardException
426: .newException(SQLState.SORT_TYPE_MISMATCH);
427: }
428:
429: if (SanityManager.DEBUG) {
430: for (int colid = 0; colid < nCols; colid++) {
431: Object col1 = row[colid];
432: Object col2 = template[colid];
433: if (col1 == null) {
434: SanityManager.THROWASSERT("col[" + colid
435: + "] is null");
436: }
437:
438: if (!(col1 instanceof CloneableObject)) {
439: SanityManager.THROWASSERT("col[" + colid + "] ("
440: + col1.getClass().getName()
441: + ") is not a CloneableObject.");
442: }
443:
444: if (col1.getClass() != col2.getClass()) {
445: SanityManager
446: .THROWASSERT("col1.getClass() ("
447: + col1.getClass()
448: + ") expected to be the same as col2.getClass() ("
449: + col2.getClass() + ")");
450: }
451: }
452: }
453: }
454:
455: int compare(DataValueDescriptor[] r1, DataValueDescriptor[] r2)
456: throws StandardException {
457: // Get the number of columns we have to compare.
458: int colsToCompare = this .columnOrdering.length;
459: int r;
460:
461: // Compare the columns specified in the column
462: // ordering array.
463: for (int i = 0; i < colsToCompare; i++) {
464: // Get columns to compare.
465: int colid = this .columnOrderingMap[i];
466:
467: // If the columns don't compare equal, we're done.
468: // Return the sense of the comparison.
469: if ((r = r1[colid].compare(r2[colid])) != 0) {
470: if (this .columnOrderingAscendingMap[i])
471: return r;
472: else
473: return -r;
474: }
475: }
476:
477: // We made it through all the columns, and they must have
478: // all compared equal. So return that the rows compare equal.
479: return 0;
480: }
481:
482: /**
483: Go from the CLOSED to the INITIALIZED state.
484: **/
485: public void initialize(DataValueDescriptor[] template,
486: ColumnOrdering columnOrdering[], SortObserver sortObserver,
487: boolean alreadyInOrder, long estimatedRows,
488: int sortBufferMax) throws StandardException {
489: if (SanityManager.DEBUG) {
490: SanityManager.ASSERT(state == STATE_CLOSED);
491: }
492:
493: // Make sure the column ordering makes sense
494: if (SanityManager.DEBUG) {
495: SanityManager.ASSERT(checkColumnOrdering(template,
496: columnOrdering), "column ordering error");
497: }
498:
499: // Set user-defined parameters.
500: this .template = template;
501: this .columnOrdering = columnOrdering;
502: this .sortObserver = sortObserver;
503: this .alreadyInOrder = alreadyInOrder;
504:
505: // Cache results of columnOrdering calls, results are not allowed
506: // to change throughout a sort.
507: columnOrderingMap = new int[columnOrdering.length];
508: columnOrderingAscendingMap = new boolean[columnOrdering.length];
509: for (int i = 0; i < columnOrdering.length; i++) {
510: columnOrderingMap[i] = columnOrdering[i].getColumnId();
511: columnOrderingAscendingMap[i] = columnOrdering[i]
512: .getIsAscending();
513: }
514:
515: // No inserter or scan yet.
516: this .inserter = null;
517: this .scan = null;
518:
519: // We don't have any merge runs.
520: this .mergeRuns = null;
521: this .sortBuffer = null;
522: this .sortBufferMax = sortBufferMax;
523:
524: if (estimatedRows > sortBufferMax)
525: sortBufferMin = sortBufferMax;
526: else
527: sortBufferMin = (int) estimatedRows;
528: if (SanityManager.DEBUG) {
529: if (SanityManager.DEBUG_ON("testSort"))
530: sortBufferMin = sortBufferMax;
531: }
532:
533: this .state = STATE_INITIALIZED;
534: }
535:
536: /**
537: An inserter is closing.
538: **/
539: void doneInserting(MergeInserter inserter, SortBuffer sortBuffer,
540: Vector mergeRuns) {
541: if (SanityManager.DEBUG) {
542: SanityManager.ASSERT(state == STATE_INSERTING);
543: }
544:
545: this .sortBuffer = sortBuffer;
546: this .mergeRuns = mergeRuns;
547: this .inserter = null;
548:
549: this .state = STATE_DONE_INSERTING;
550: }
551:
552: void doneScanning(Scan scan, SortBuffer sortBuffer) {
553: if (SanityManager.DEBUG) {
554: // Make sure the scan we're getting back is the one we gave out
555:
556: if (this .scan != scan)
557: SanityManager.THROWASSERT("this.scan = " + this .scan
558: + " scan = " + scan);
559: }
560:
561: this .sortBuffer = sortBuffer;
562: this .scan = null;
563:
564: this .state = STATE_DONE_SCANNING;
565: }
566:
567: void doneScanning(Scan scan, SortBuffer sortBuffer, Vector mergeRuns) {
568: this .mergeRuns = mergeRuns;
569:
570: doneScanning(scan, sortBuffer);
571: }
572:
573: /**
574: Get rid of the merge runs, if there are any.
575: Must not cause any errors because it's called
576: during error processing.
577: **/
578: void dropMergeRuns(TransactionManager tran) {
579: if (mergeRuns != null) {
580: Enumeration e = mergeRuns.elements();
581:
582: try {
583: Transaction rawTran = tran.getRawStoreXact();
584: long segmentId = StreamContainerHandle.TEMPORARY_SEGMENT;
585:
586: while (e.hasMoreElements()) {
587: long containerId = ((Long) e.nextElement())
588: .longValue();
589: rawTran.dropStreamContainer(segmentId, containerId);
590: }
591: } catch (StandardException se) {
592: // Ignore problems with dropping, worst case
593: // the raw store will clean up at reboot.
594: }
595: mergeRuns = null;
596: }
597: }
598:
599: /* DEBUG (nat)
600: void printRunInfo(TransactionController tran)
601: throws StandardException
602: {
603: java.util.Enumeration e = mergeRuns.elements();
604: while (e.hasMoreElements())
605: {
606: long conglomid = ((Long) e.nextElement()).longValue();
607: ScanController sc = tran.openScan(conglomid, false,
608: false, null, null, 0, null,
609: null, 0);
610: System.out.println("Merge run: conglomid=" + conglomid);
611: while (sc.next())
612: {
613: sc.fetch(template);
614: System.out.println(template);
615: }
616: sc.close();
617: }
618: }
619: */
620:
621: private void multiStageMerge(TransactionManager tran)
622: throws StandardException {
623: Enumeration e;
624: //int iterations = 0; // DEBUG (nat)
625: int maxMergeRuns = sortBuffer.capacity();
626:
627: if (maxMergeRuns > ExternalSortFactory.DEFAULT_MAX_MERGE_RUN)
628: maxMergeRuns = ExternalSortFactory.DEFAULT_MAX_MERGE_RUN;
629:
630: Vector subset;
631: Vector leftovers;
632:
633: while (mergeRuns.size() > maxMergeRuns) {
634: // Move maxMergeRuns elements from the merge runs
635: // vector into a subset, leaving the rest.
636: subset = new Vector(maxMergeRuns);
637: leftovers = new Vector(mergeRuns.size() - maxMergeRuns);
638: e = mergeRuns.elements();
639: while (e.hasMoreElements()) {
640: Long containerId = (Long) e.nextElement();
641: if (subset.size() < maxMergeRuns)
642: subset.addElement(containerId);
643: else
644: leftovers.addElement(containerId);
645: }
646:
647: /* DEBUG (nat)
648: iterations++;
649: System.out.println(subset.size() + " elements in subset");
650: System.out.println(leftovers.size() + " elements in leftovers");
651: System.out.println(mergeRuns.size() + " elements in mergeRuns");
652: System.out.println("maxMergeRuns is " + maxMergeRuns);
653: System.out.println("iterations = " + iterations);
654: if (subset.size() == 0)
655: {
656: System.exit(1);
657: }
658: */
659:
660: mergeRuns = leftovers;
661:
662: // Open a merge scan on the subset.
663: MergeScanRowSource msRowSource = new MergeScanRowSource(
664: this , tran, sortBuffer, subset, sortObserver, false);
665:
666: if (!msRowSource.init(tran)) {
667: throw StandardException
668: .newException(SQLState.SORT_COULD_NOT_INIT);
669: }
670:
671: // Create and open another temporary stream conglomerate
672: // which will become
673: // a merge run made up with the merged runs from the subset.
674: Transaction rawTran = tran.getRawStoreXact();
675: int segmentId = StreamContainerHandle.TEMPORARY_SEGMENT;
676: long id = rawTran.addAndLoadStreamContainer(segmentId,
677: properties, msRowSource);
678:
679: mergeRuns.addElement(new Long(id));
680:
681: // Drop the conglomerates in the merge subset
682: e = subset.elements();
683: while (e.hasMoreElements()) {
684: Long containerId = (Long) e.nextElement();
685: rawTran.dropStreamContainer(segmentId, containerId
686: .longValue());
687: }
688: }
689: }
690:
691: /**
692: Remove all the rows from the sort buffer and store them
693: in a temporary conglomerate. The temporary conglomerate
694: is a "merge run". Returns the container id of the
695: merge run.
696: **/
697: long createMergeRun(TransactionManager tran, SortBuffer sortBuffer)
698: throws StandardException {
699: // this sort buffer is not a scan and is not tracked by any
700: // TransactionManager.
701: SortBufferRowSource rowSource = new SortBufferRowSource(
702: sortBuffer, (TransactionManager) null, sortObserver,
703: true, sortBufferMax);
704:
705: // Create a temporary stream conglomerate...
706: Transaction rawTran = tran.getRawStoreXact(); // get raw transaction
707: int segmentId = StreamContainerHandle.TEMPORARY_SEGMENT;
708: long id = rawTran.addAndLoadStreamContainer(segmentId,
709: properties, rowSource);
710:
711: // Don't close the sortBuffer, we just emptied it, the caller may reuse
712: // that sortBuffer for the next run.
713: rowSource = null;
714:
715: return id;
716: }
717: }
|