001: /*
002: * Copyright 2004-2008 H2 Group. Licensed under the H2 License, Version 1.0
003: * (http://h2database.com/html/license.html).
004: * Initial Developer: H2 Group
005: */
006: package org.h2.result;
007:
008: import java.io.ByteArrayOutputStream;
009: import java.sql.SQLException;
010:
011: import org.h2.constant.SysProperties;
012: import org.h2.engine.Constants;
013: import org.h2.engine.Database;
014: import org.h2.engine.Session;
015: import org.h2.message.Message;
016: import org.h2.store.DataPage;
017: import org.h2.store.FileStore;
018: import org.h2.util.ObjectArray;
019: import org.h2.value.Value;
020:
021: /**
022: * This class implements the disk buffer for the LocalResult class.
023: */
024: class ResultDiskBuffer implements ResultExternal {
025:
026: private static final int READ_AHEAD = 128;
027:
028: private DataPage rowBuff;
029: private FileStore file;
030: private ObjectArray tapes;
031: private ResultDiskTape mainTape;
032: private SortOrder sort;
033: private int columnCount;
034:
035: /**
036: * Represents a virtual disk tape for the merge sort algorithm.
037: * Each virtual disk tape is a region of the temp file.
038: */
039: private static class ResultDiskTape {
040: long start, end, pos;
041: ObjectArray buffer = new ObjectArray();
042: }
043:
044: public ResultDiskBuffer(Session session, SortOrder sort,
045: int columnCount) throws SQLException {
046: this .sort = sort;
047: this .columnCount = columnCount;
048: Database db = session.getDatabase();
049: rowBuff = DataPage.create(db, Constants.DEFAULT_DATA_PAGE_SIZE);
050: String fileName = session.getDatabase().createTempFile();
051: file = session.getDatabase().openFile(fileName, "rw", false);
052: file.setCheckedWriting(false);
053: file.autoDelete();
054: file.seek(FileStore.HEADER_LENGTH);
055: if (sort != null) {
056: tapes = new ObjectArray();
057: } else {
058: mainTape = new ResultDiskTape();
059: mainTape.pos = FileStore.HEADER_LENGTH;
060: }
061: }
062:
063: public void addRows(ObjectArray rows) throws SQLException {
064: if (sort != null) {
065: sort.sort(rows);
066: }
067: DataPage buff = rowBuff;
068: long start = file.getFilePointer();
069: ByteArrayOutputStream buffer = new ByteArrayOutputStream();
070: int bufferLen = 0;
071: int maxBufferSize = SysProperties.LARGE_RESULT_BUFFER_SIZE;
072: for (int i = 0; i < rows.size(); i++) {
073: buff.reset();
074: buff.writeInt(0);
075: Value[] row = (Value[]) rows.get(i);
076: for (int j = 0; j < columnCount; j++) {
077: buff.writeValue(row[j]);
078: }
079: buff.fillAligned();
080: int len = buff.length();
081: buff.setInt(0, len);
082: buff.updateChecksum();
083: if (maxBufferSize > 0) {
084: buffer.write(buff.getBytes(), 0, len);
085: bufferLen += len;
086: if (bufferLen > maxBufferSize) {
087: byte[] data = buffer.toByteArray();
088: buffer.reset();
089: file.write(data, 0, data.length);
090: bufferLen = 0;
091: }
092: } else {
093: file.write(buff.getBytes(), 0, len);
094: }
095: }
096: if (bufferLen > 0) {
097: byte[] data = buffer.toByteArray();
098: file.write(data, 0, data.length);
099: }
100: if (sort != null) {
101: ResultDiskTape tape = new ResultDiskTape();
102: tape.start = start;
103: tape.end = file.getFilePointer();
104: tapes.add(tape);
105: } else {
106: mainTape.end = file.getFilePointer();
107: }
108: }
109:
110: public void done() throws SQLException {
111: file.seek(FileStore.HEADER_LENGTH);
112: }
113:
114: public void reset() {
115: if (sort != null) {
116: for (int i = 0; i < tapes.size(); i++) {
117: ResultDiskTape tape = getTape(i);
118: tape.pos = tape.start;
119: tape.buffer = new ObjectArray();
120: }
121: } else {
122: mainTape.pos = FileStore.HEADER_LENGTH;
123: }
124: }
125:
126: private void readRow(ResultDiskTape tape) throws SQLException {
127: int min = Constants.FILE_BLOCK_SIZE;
128: DataPage buff = rowBuff;
129: buff.reset();
130: file.readFully(buff.getBytes(), 0, min);
131: int len = buff.readInt();
132: buff.checkCapacity(len);
133: if (len - min > 0) {
134: file.readFully(buff.getBytes(), min, len - min);
135: }
136: buff.check(len);
137: tape.pos += len;
138: Value[] row = new Value[columnCount];
139: for (int k = 0; k < columnCount; k++) {
140: row[k] = buff.readValue();
141: }
142: tape.buffer.add(row);
143: }
144:
145: public Value[] next() throws SQLException {
146: return sort != null ? nextSorted() : nextUnsorted();
147: }
148:
149: private Value[] nextUnsorted() throws SQLException {
150: file.seek(mainTape.pos);
151: if (mainTape.buffer.size() == 0) {
152: for (int j = 0; mainTape.pos < mainTape.end
153: && j < READ_AHEAD; j++) {
154: readRow(mainTape);
155: }
156: }
157: Value[] row = (Value[]) mainTape.buffer.get(0);
158: mainTape.buffer.remove(0);
159: return row;
160: }
161:
162: private Value[] nextSorted() throws SQLException {
163: int next = -1;
164: for (int i = 0; i < tapes.size(); i++) {
165: ResultDiskTape tape = getTape(i);
166: if (tape.buffer.size() == 0 && tape.pos < tape.end) {
167: file.seek(tape.pos);
168: for (int j = 0; tape.pos < tape.end && j < READ_AHEAD; j++) {
169: readRow(tape);
170: }
171: }
172: if (tape.buffer.size() > 0) {
173: if (next == -1) {
174: next = i;
175: } else if (compareTapes(tape, getTape(next)) < 0) {
176: next = i;
177: }
178: }
179: }
180: ResultDiskTape t = getTape(next);
181: Value[] row = (Value[]) t.buffer.get(0);
182: t.buffer.remove(0);
183: return row;
184: }
185:
186: private ResultDiskTape getTape(int i) {
187: return (ResultDiskTape) tapes.get(i);
188: }
189:
190: private int compareTapes(ResultDiskTape a, ResultDiskTape b)
191: throws SQLException {
192: Value[] va = (Value[]) a.buffer.get(0);
193: Value[] vb = (Value[]) b.buffer.get(0);
194: return sort.compare(va, vb);
195: }
196:
197: protected void finalize() {
198: if (!SysProperties.runFinalize) {
199: return;
200: }
201: close();
202: }
203:
204: public void close() {
205: if (file != null) {
206: file.closeAndDeleteSilently();
207: file = null;
208: }
209: }
210:
211: public int removeRow(Value[] values) {
212: throw Message.getInternalError();
213: }
214:
215: public boolean contains(Value[] values) throws SQLException {
216: throw Message.getInternalError();
217: }
218:
219: public int addRow(Value[] values) {
220: throw Message.getInternalError();
221: }
222:
223: }
|