Source Code Cross Referenced for MergeSort.java in  » Database-DBMS » db-derby-10.2 » org » apache » derby » impl » store » access » sort » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Database DBMS » db derby 10.2 » org.apache.derby.impl.store.access.sort 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        /*
002:
003:           Derby - Class org.apache.derby.impl.store.access.sort.MergeSort
004:
005:           Licensed to the Apache Software Foundation (ASF) under one or more
006:           contributor license agreements.  See the NOTICE file distributed with
007:           this work for additional information regarding copyright ownership.
008:           The ASF licenses this file to you under the Apache License, Version 2.0
009:           (the "License"); you may not use this file except in compliance with
010:           the License.  You may obtain a copy of the License at
011:
012:              http://www.apache.org/licenses/LICENSE-2.0
013:
014:           Unless required by applicable law or agreed to in writing, software
015:           distributed under the License is distributed on an "AS IS" BASIS,
016:           WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017:           See the License for the specific language governing permissions and
018:           limitations under the License.
019:
020:         */
021:
022:        package org.apache.derby.impl.store.access.sort;
023:
024:        import org.apache.derby.iapi.reference.SQLState;
025:
026:        import org.apache.derby.iapi.services.io.FormatableBitSet;
027:
028:        import org.apache.derby.iapi.services.io.Storable;
029:        import org.apache.derby.iapi.services.sanity.SanityManager;
030:        import org.apache.derby.iapi.error.StandardException;
031:        import org.apache.derby.iapi.store.access.conglomerate.ScanControllerRowSource;
032:        import org.apache.derby.iapi.store.access.conglomerate.Sort;
033:        import org.apache.derby.iapi.store.access.conglomerate.SortFactory;
034:        import org.apache.derby.iapi.store.access.conglomerate.TransactionManager;
035:        import org.apache.derby.iapi.types.CloneableObject;
036:        import org.apache.derby.iapi.store.access.ColumnOrdering;
037:        import org.apache.derby.iapi.store.access.ConglomerateController;
038:        import org.apache.derby.iapi.store.access.Qualifier;
039:        import org.apache.derby.iapi.store.access.RowUtil;
040:        import org.apache.derby.iapi.store.access.ScanController;
041:        import org.apache.derby.iapi.store.access.SortObserver;
042:        import org.apache.derby.iapi.store.access.SortController;
043:        import org.apache.derby.iapi.store.access.TransactionController;
044:
045:        import org.apache.derby.iapi.store.raw.StreamContainerHandle;
046:        import org.apache.derby.iapi.store.raw.RawStoreFactory;
047:        import org.apache.derby.iapi.store.raw.Transaction;
048:
049:        import org.apache.derby.iapi.types.DataValueDescriptor;
050:
051:        import org.apache.derby.iapi.types.Orderable;
052:        import org.apache.derby.iapi.types.RowLocation;
053:
054:        import java.util.Enumeration;
055:        import java.util.Properties;
056:        import java.util.Vector;
057:
058:        /**
059:
060:         A sort implementation which does the sort in-memory if it can,
061:         but which can do an external merge sort so that it can sort an
062:         arbitrary number of rows.
063:
064:         **/
065:
066:        public final class MergeSort implements  Sort {
067:
068:            /*
069:             * Fields
070:             */
071:
072:            /**
073:             **/
074:            static final int STATE_CLOSED = 0;
075:
076:            /**
077:             **/
078:            static final int STATE_INITIALIZED = 1;
079:
080:            /**
081:             **/
082:            static final int STATE_INSERTING = 2;
083:
084:            /**
085:             **/
086:            static final int STATE_DONE_INSERTING = 3;
087:
088:            /**
089:             **/
090:            static final int STATE_SCANNING = 4;
091:
092:            /**
093:             **/
094:            static final int STATE_DONE_SCANNING = 5;
095:
096:            /**
097:            Maintains the current state of the sort as defined in
098:            the preceding values.  Sorts start off and end up closed.
099:             **/
100:            protected int state = STATE_CLOSED;
101:
102:            /**
103:            The template as passed in on create.  Valid when the state
104:            is INITIALIZED through SCANNING, null otherwise.
105:             **/
106:            protected DataValueDescriptor[] template;
107:
108:            /**
109:            The column ordering as passed in on create.  Valid when
110:            the state is INITIALIZED through SCANNING, null otherwise.
111:            May be null if there is no column ordering - this means
112:            that all rows are considered to be duplicates, and the
113:            sort will only emit a single row.
114:             **/
115:            protected ColumnOrdering columnOrdering[];
116:
117:            /**
118:            A lookup table to speed up lookup of a column associated with the i'th
119:            column to compare.  To find the column id to compare as the i'th column
120:            look in columnOrderingMap[i].
121:             **/
122:            protected int columnOrderingMap[];
123:
124:            /**
125:            A lookup table to speed up lookup of Ascending state of a column, 
126:             **/
127:            protected boolean columnOrderingAscendingMap[];
128:
129:            /**
130:            The sort observer.  May be null.  Used as a callback.
131:             **/
132:            protected SortObserver sortObserver;
133:
134:            /**
135:            Whether the rows are expected to be in order on insert,
136:            as passed in on create.
137:             **/
138:            protected boolean alreadyInOrder;
139:
140:            /**
141:            The inserter that's being used to insert rows into the sort.
142:            This field is only valid when the state is INSERTING.
143:             **/
144:            protected MergeInserter inserter = null;
145:
146:            /**
147:            The scan that's being used to return rows from the sort.
148:            This field is only valid when the state is SCANNING.
149:             **/
150:            protected Scan scan = null;
151:
152:            /**
153:            A vector of merge runs, produced by the MergeInserter.
154:            Might be null if no merge runs were produced.
155:            It is a vector of container ids.
156:             **/
157:            protected Vector mergeRuns = null;
158:
159:            /**
160:            An ordered set of the leftover rows that didn't go
161:            in the last merge run (might be all the rows if there
162:            are no merge runs).
163:             **/
164:            protected SortBuffer sortBuffer = null;
165:
166:            /**
167:            The maximum number of entries a sort buffer can hold.
168:             **/
169:            protected int sortBufferMax;
170:
171:            /**
172:            The minimum number of entries a sort buffer can hold.
173:             **/
174:            protected int sortBufferMin;
175:
176:            /**
177:            Properties for mergeSort
178:             **/
179:            static Properties properties = null;
180:
181:            /**
182:            Static initializer for MergeSort, to initialize once the properties
183:            for the sortBuffer.  
184:             **/
185:            static {
186:                properties = new Properties();
187:                properties.put(
188:                        RawStoreFactory.STREAM_FILE_BUFFER_SIZE_PARAMETER,
189:                        "16384");
190:            }
191:
192:            /*
193:             * Methods of Sort
194:             */
195:
196:            /**
197:            Open a sort controller.
198:            <p>
199:            This implementation only supports a single sort controller
200:            per sort.
201:            @see Sort#open
202:             **/
203:            public SortController open(TransactionManager tran)
204:                    throws StandardException {
205:                if (SanityManager.DEBUG)
206:                    SanityManager.ASSERT(state == STATE_INITIALIZED);
207:
208:                // Ready to start inserting rows.
209:                state = STATE_INSERTING;
210:
211:                // Create and initialize an inserter.  When the caller
212:                // closes it, it will call back to inserterIsClosed().
213:                this .inserter = new MergeInserter();
214:                if (this .inserter.initialize(this , tran) == false) {
215:                    throw StandardException
216:                            .newException(SQLState.SORT_COULD_NOT_INIT);
217:                }
218:
219:                return this .inserter;
220:            }
221:
222:            /**
223:            Open a scan controller.
224:            @see Sort#openSortScan
225:             **/
226:
227:            public ScanController openSortScan(TransactionManager tran,
228:                    boolean hold) throws StandardException {
229:                if (SanityManager.DEBUG)
230:                    SanityManager.ASSERT(state == STATE_DONE_INSERTING);
231:
232:                if (mergeRuns == null || mergeRuns.size() == 0) {
233:                    // There were no merge runs so we can just return
234:                    // the rows from the sort buffer.
235:                    scan = new SortBufferScan(this , tran, sortBuffer, hold);
236:
237:                    // The scan now owns the sort buffer
238:                    sortBuffer = null;
239:                } else {
240:                    // Dump the rows in the sort buffer to a merge run.
241:                    long containerId = createMergeRun(tran, sortBuffer);
242:                    mergeRuns.addElement(new Long(containerId));
243:
244:                    // If there are more merge runs than we can sort
245:                    // at once with our sort buffer, we have to reduce
246:                    // the number of merge runs
247:                    if (mergeRuns.size() > ExternalSortFactory.DEFAULT_MAX_MERGE_RUN
248:                            || mergeRuns.size() > sortBuffer.capacity())
249:                        multiStageMerge(tran);
250:
251:                    // There are now few enough merge runs to sort
252:                    // at once, so create a scan for them.
253:                    MergeScan mscan = new MergeScan(this , tran, sortBuffer,
254:                            mergeRuns, sortObserver, hold);
255:
256:                    if (!mscan.init(tran)) {
257:                        throw StandardException
258:                                .newException(SQLState.SORT_COULD_NOT_INIT);
259:                    }
260:                    scan = mscan;
261:
262:                    // The scan now owns the sort buffer and merge runs.
263:                    sortBuffer = null;
264:                    mergeRuns = null;
265:                }
266:
267:                // Ready to start retrieving rows.
268:                this .state = STATE_SCANNING;
269:
270:                return scan;
271:            }
272:
273:            /**
274:            Open a row source to get rows out of the sorter.
275:            @see Sort#openSortRowSource
276:             **/
277:            public ScanControllerRowSource openSortRowSource(
278:                    TransactionManager tran) throws StandardException {
279:                if (SanityManager.DEBUG)
280:                    SanityManager.ASSERT(state == STATE_DONE_INSERTING);
281:
282:                ScanControllerRowSource rowSource = null;
283:
284:                if (mergeRuns == null || mergeRuns.size() == 0) {
285:                    // There were no merge runs so we can just return
286:                    // the rows from the sort buffer.
287:                    scan = new SortBufferRowSource(sortBuffer, tran,
288:                            sortObserver, false, sortBufferMax);
289:                    rowSource = (ScanControllerRowSource) scan;
290:
291:                    // The scan now owns the sort buffer
292:                    sortBuffer = null;
293:                } else {
294:                    // Dump the rows in the sort buffer to a merge run.
295:                    long containerId = createMergeRun(tran, sortBuffer);
296:                    mergeRuns.addElement(new Long(containerId));
297:
298:                    // If there are more merge runs than we can sort
299:                    // at once with our sort buffer, we have to reduce
300:                    // the number of merge runs
301:                    if (mergeRuns.size() > ExternalSortFactory.DEFAULT_MAX_MERGE_RUN
302:                            || mergeRuns.size() > sortBuffer.capacity())
303:                        multiStageMerge(tran);
304:
305:                    // There are now few enough merge runs to sort
306:                    // at once, so create a rowSource for them.
307:                    MergeScanRowSource msRowSource = new MergeScanRowSource(
308:                            this , tran, sortBuffer, mergeRuns, sortObserver,
309:                            false);
310:                    if (!msRowSource.init(tran)) {
311:                        throw StandardException
312:                                .newException(SQLState.SORT_COULD_NOT_INIT);
313:                    }
314:                    scan = msRowSource;
315:                    rowSource = msRowSource;
316:
317:                    // The scan now owns the sort buffer and merge runs.
318:                    sortBuffer = null;
319:                    mergeRuns = null;
320:                }
321:
322:                // Ready to start retrieving rows.
323:                this .state = STATE_SCANNING;
324:
325:                return rowSource;
326:            }
327:
328:            /**
329:            Drop the sort.
330:            @see Sort#drop
331:             **/
332:            public void drop(TransactionController tran)
333:                    throws StandardException {
334:                // Make sure the inserter is closed.  Note this
335:                // will cause the callback to doneInserting()
336:                // which will give us any in-progress merge
337:                // runs, if there are any.
338:                if (inserter != null)
339:                    inserter.close();
340:                inserter = null;
341:
342:                // Make sure the scan is closed, if there is one.
343:                // This will cause the callback to doneScanning().
344:                if (scan != null) {
345:                    scan.close();
346:                    scan = null;
347:                }
348:
349:                // If we have a row set, get rid of it.
350:                if (sortBuffer != null) {
351:                    sortBuffer.close();
352:                    sortBuffer = null;
353:                }
354:
355:                // Clean out the rest of the objects.
356:                template = null;
357:                columnOrdering = null;
358:                sortObserver = null;
359:
360:                // If there are any merge runs, drop them.
361:                dropMergeRuns((TransactionManager) tran);
362:
363:                // Whew!
364:                state = STATE_CLOSED;
365:            }
366:
367:            /*
368:             * Methods of MergeSort.  Arranged alphabetically.
369:             */
370:
371:            /**
372:            Check the column ordering against the template, making
373:            sure that each column is present in the template,
374:            implements Orderable, and is not mentioned more than
375:            once.  Intended to be called as part of a sanity check.
376:             **/
377:            protected boolean checkColumnOrdering(
378:                    DataValueDescriptor[] template,
379:                    ColumnOrdering columnOrdering[]) {
380:                // Allocate an array to check that each column mentioned only once.
381:                int templateNColumns = template.length;
382:                boolean seen[] = new boolean[templateNColumns];
383:
384:                // Check each column ordering.
385:                for (int i = 0; i < columnOrdering.length; i++) {
386:                    int colid = columnOrdering[i].getColumnId();
387:
388:                    // Check that the column id is valid.
389:                    if (colid < 0 || colid >= templateNColumns)
390:                        return false;
391:
392:                    // Check that the column isn't mentioned more than once.
393:                    if (seen[colid])
394:                        return false;
395:                    seen[colid] = true;
396:
397:                    Object columnVal = RowUtil.getColumn(template,
398:                            (FormatableBitSet) null, colid);
399:
400:                    if (!(columnVal instanceof  Orderable))
401:                        return false;
402:                }
403:
404:                return true;
405:            }
406:
407:            /**
408:            Check that the columns in the row agree with the columns
409:            in the template, both in number and in type.
410:            <p>
411:            XXX (nat) Currently checks that the classes implementing
412:            each column are the same -- is this right?
413:             **/
414:            void checkColumnTypes(DataValueDescriptor[] row)
415:                    throws StandardException {
416:                int nCols = row.length;
417:                if (template.length != nCols) {
418:                    if (SanityManager.DEBUG) {
419:                        SanityManager
420:                                .THROWASSERT("template.length ("
421:                                        + template.length
422:                                        + ") expected to be = to nCols ("
423:                                        + nCols + ")");
424:                    }
425:                    throw StandardException
426:                            .newException(SQLState.SORT_TYPE_MISMATCH);
427:                }
428:
429:                if (SanityManager.DEBUG) {
430:                    for (int colid = 0; colid < nCols; colid++) {
431:                        Object col1 = row[colid];
432:                        Object col2 = template[colid];
433:                        if (col1 == null) {
434:                            SanityManager.THROWASSERT("col[" + colid
435:                                    + "]  is null");
436:                        }
437:
438:                        if (!(col1 instanceof  CloneableObject)) {
439:                            SanityManager.THROWASSERT("col[" + colid + "] ("
440:                                    + col1.getClass().getName()
441:                                    + ") is not a CloneableObject.");
442:                        }
443:
444:                        if (col1.getClass() != col2.getClass()) {
445:                            SanityManager
446:                                    .THROWASSERT("col1.getClass() ("
447:                                            + col1.getClass()
448:                                            + ") expected to be the same as col2.getClass() ("
449:                                            + col2.getClass() + ")");
450:                        }
451:                    }
452:                }
453:            }
454:
455:            int compare(DataValueDescriptor[] r1, DataValueDescriptor[] r2)
456:                    throws StandardException {
457:                // Get the number of columns we have to compare.
458:                int colsToCompare = this .columnOrdering.length;
459:                int r;
460:
461:                // Compare the columns specified in the column
462:                // ordering array.
463:                for (int i = 0; i < colsToCompare; i++) {
464:                    // Get columns to compare.
465:                    int colid = this .columnOrderingMap[i];
466:
467:                    // If the columns don't compare equal, we're done.
468:                    // Return the sense of the comparison.
469:                    if ((r = r1[colid].compare(r2[colid])) != 0) {
470:                        if (this .columnOrderingAscendingMap[i])
471:                            return r;
472:                        else
473:                            return -r;
474:                    }
475:                }
476:
477:                // We made it through all the columns, and they must have
478:                // all compared equal.  So return that the rows compare equal.
479:                return 0;
480:            }
481:
482:            /**
483:            Go from the CLOSED to the INITIALIZED state.
484:             **/
485:            public void initialize(DataValueDescriptor[] template,
486:                    ColumnOrdering columnOrdering[], SortObserver sortObserver,
487:                    boolean alreadyInOrder, long estimatedRows,
488:                    int sortBufferMax) throws StandardException {
489:                if (SanityManager.DEBUG) {
490:                    SanityManager.ASSERT(state == STATE_CLOSED);
491:                }
492:
493:                // Make sure the column ordering makes sense
494:                if (SanityManager.DEBUG) {
495:                    SanityManager.ASSERT(checkColumnOrdering(template,
496:                            columnOrdering), "column ordering error");
497:                }
498:
499:                // Set user-defined parameters.
500:                this .template = template;
501:                this .columnOrdering = columnOrdering;
502:                this .sortObserver = sortObserver;
503:                this .alreadyInOrder = alreadyInOrder;
504:
505:                // Cache results of columnOrdering calls, results are not allowed
506:                // to change throughout a sort.
507:                columnOrderingMap = new int[columnOrdering.length];
508:                columnOrderingAscendingMap = new boolean[columnOrdering.length];
509:                for (int i = 0; i < columnOrdering.length; i++) {
510:                    columnOrderingMap[i] = columnOrdering[i].getColumnId();
511:                    columnOrderingAscendingMap[i] = columnOrdering[i]
512:                            .getIsAscending();
513:                }
514:
515:                // No inserter or scan yet.
516:                this .inserter = null;
517:                this .scan = null;
518:
519:                // We don't have any merge runs.
520:                this .mergeRuns = null;
521:                this .sortBuffer = null;
522:                this .sortBufferMax = sortBufferMax;
523:
524:                if (estimatedRows > sortBufferMax)
525:                    sortBufferMin = sortBufferMax;
526:                else
527:                    sortBufferMin = (int) estimatedRows;
528:                if (SanityManager.DEBUG) {
529:                    if (SanityManager.DEBUG_ON("testSort"))
530:                        sortBufferMin = sortBufferMax;
531:                }
532:
533:                this .state = STATE_INITIALIZED;
534:            }
535:
536:            /**
537:            An inserter is closing.
538:             **/
539:            void doneInserting(MergeInserter inserter, SortBuffer sortBuffer,
540:                    Vector mergeRuns) {
541:                if (SanityManager.DEBUG) {
542:                    SanityManager.ASSERT(state == STATE_INSERTING);
543:                }
544:
545:                this .sortBuffer = sortBuffer;
546:                this .mergeRuns = mergeRuns;
547:                this .inserter = null;
548:
549:                this .state = STATE_DONE_INSERTING;
550:            }
551:
552:            void doneScanning(Scan scan, SortBuffer sortBuffer) {
553:                if (SanityManager.DEBUG) {
554:                    // Make sure the scan we're getting back is the one we gave out
555:
556:                    if (this .scan != scan)
557:                        SanityManager.THROWASSERT("this.scan = " + this .scan
558:                                + " scan = " + scan);
559:                }
560:
561:                this .sortBuffer = sortBuffer;
562:                this .scan = null;
563:
564:                this .state = STATE_DONE_SCANNING;
565:            }
566:
567:            void doneScanning(Scan scan, SortBuffer sortBuffer, Vector mergeRuns) {
568:                this .mergeRuns = mergeRuns;
569:
570:                doneScanning(scan, sortBuffer);
571:            }
572:
573:            /**
574:            Get rid of the merge runs, if there are any.
575:            Must not cause any errors because it's called
576:            during error processing.
577:             **/
578:            void dropMergeRuns(TransactionManager tran) {
579:                if (mergeRuns != null) {
580:                    Enumeration e = mergeRuns.elements();
581:
582:                    try {
583:                        Transaction rawTran = tran.getRawStoreXact();
584:                        long segmentId = StreamContainerHandle.TEMPORARY_SEGMENT;
585:
586:                        while (e.hasMoreElements()) {
587:                            long containerId = ((Long) e.nextElement())
588:                                    .longValue();
589:                            rawTran.dropStreamContainer(segmentId, containerId);
590:                        }
591:                    } catch (StandardException se) {
592:                        // Ignore problems with dropping, worst case
593:                        // the raw store will clean up at reboot.
594:                    }
595:                    mergeRuns = null;
596:                }
597:            }
598:
599:            /* DEBUG (nat)
600:            void printRunInfo(TransactionController tran)
601:            	throws StandardException
602:            {
603:            	java.util.Enumeration e = mergeRuns.elements();
604:            	while (e.hasMoreElements())
605:            	{
606:            		long conglomid = ((Long) e.nextElement()).longValue();
607:            		ScanController sc = tran.openScan(conglomid, false,
608:            								false, null, null, 0, null,
609:            								null, 0);
610:            		System.out.println("Merge run: conglomid=" + conglomid);
611:            		while (sc.next())
612:            		{
613:            			sc.fetch(template);
614:            			System.out.println(template);
615:            		}
616:            		sc.close();
617:            	}
618:            }
619:             */
620:
621:            private void multiStageMerge(TransactionManager tran)
622:                    throws StandardException {
623:                Enumeration e;
624:                //int iterations = 0; // DEBUG (nat)
625:                int maxMergeRuns = sortBuffer.capacity();
626:
627:                if (maxMergeRuns > ExternalSortFactory.DEFAULT_MAX_MERGE_RUN)
628:                    maxMergeRuns = ExternalSortFactory.DEFAULT_MAX_MERGE_RUN;
629:
630:                Vector subset;
631:                Vector leftovers;
632:
633:                while (mergeRuns.size() > maxMergeRuns) {
634:                    // Move maxMergeRuns elements from the merge runs
635:                    // vector into a subset, leaving the rest.
636:                    subset = new Vector(maxMergeRuns);
637:                    leftovers = new Vector(mergeRuns.size() - maxMergeRuns);
638:                    e = mergeRuns.elements();
639:                    while (e.hasMoreElements()) {
640:                        Long containerId = (Long) e.nextElement();
641:                        if (subset.size() < maxMergeRuns)
642:                            subset.addElement(containerId);
643:                        else
644:                            leftovers.addElement(containerId);
645:                    }
646:
647:                    /* DEBUG (nat)
648:                    iterations++;
649:                    	System.out.println(subset.size() + " elements in subset");
650:                    	System.out.println(leftovers.size() + " elements in leftovers");
651:                    	System.out.println(mergeRuns.size() + " elements in mergeRuns");
652:                    	System.out.println("maxMergeRuns is " + maxMergeRuns);
653:                    	System.out.println("iterations = " + iterations);
654:                    if (subset.size() == 0)
655:                    {
656:                    	System.exit(1);
657:                    }
658:                     */
659:
660:                    mergeRuns = leftovers;
661:
662:                    // Open a merge scan on the subset.
663:                    MergeScanRowSource msRowSource = new MergeScanRowSource(
664:                            this , tran, sortBuffer, subset, sortObserver, false);
665:
666:                    if (!msRowSource.init(tran)) {
667:                        throw StandardException
668:                                .newException(SQLState.SORT_COULD_NOT_INIT);
669:                    }
670:
671:                    // Create and open another temporary stream conglomerate
672:                    // which will become
673:                    // a merge run made up with the merged runs from the subset.
674:                    Transaction rawTran = tran.getRawStoreXact();
675:                    int segmentId = StreamContainerHandle.TEMPORARY_SEGMENT;
676:                    long id = rawTran.addAndLoadStreamContainer(segmentId,
677:                            properties, msRowSource);
678:
679:                    mergeRuns.addElement(new Long(id));
680:
681:                    // Drop the conglomerates in the merge subset
682:                    e = subset.elements();
683:                    while (e.hasMoreElements()) {
684:                        Long containerId = (Long) e.nextElement();
685:                        rawTran.dropStreamContainer(segmentId, containerId
686:                                .longValue());
687:                    }
688:                }
689:            }
690:
691:            /**
692:            Remove all the rows from the sort buffer and store them
693:            in a temporary conglomerate.  The temporary conglomerate
694:            is a "merge run".  Returns the container id of the
695:            merge run.
696:             **/
697:            long createMergeRun(TransactionManager tran, SortBuffer sortBuffer)
698:                    throws StandardException {
699:                // this sort buffer is not a scan and is not tracked by any
700:                // TransactionManager. 
701:                SortBufferRowSource rowSource = new SortBufferRowSource(
702:                        sortBuffer, (TransactionManager) null, sortObserver,
703:                        true, sortBufferMax);
704:
705:                // Create a temporary stream conglomerate...
706:                Transaction rawTran = tran.getRawStoreXact(); // get raw transaction
707:                int segmentId = StreamContainerHandle.TEMPORARY_SEGMENT;
708:                long id = rawTran.addAndLoadStreamContainer(segmentId,
709:                        properties, rowSource);
710:
711:                // Don't close the sortBuffer, we just emptied it, the caller may reuse
712:                // that sortBuffer for the next run.
713:                rowSource = null;
714:
715:                return id;
716:            }
717:        }
w___w___w.__j___a__v___a_2s__.___co__m__ | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.