001: /*
002:
003: Derby - Class org.apache.derby.impl.sql.execute.TemporaryRowHolderImpl
004:
005: Licensed to the Apache Software Foundation (ASF) under one or more
006: contributor license agreements. See the NOTICE file distributed with
007: this work for additional information regarding copyright ownership.
008: The ASF licenses this file to you under the Apache License, Version 2.0
009: (the "License"); you may not use this file except in compliance with
010: the License. You may obtain a copy of the License at
011:
012: http://www.apache.org/licenses/LICENSE-2.0
013:
014: Unless required by applicable law or agreed to in writing, software
015: distributed under the License is distributed on an "AS IS" BASIS,
016: WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017: See the License for the specific language governing permissions and
018: limitations under the License.
019:
020: */
021:
022: package org.apache.derby.impl.sql.execute;
023:
024: import org.apache.derby.iapi.services.sanity.SanityManager;
025: import org.apache.derby.iapi.error.StandardException;
026: import org.apache.derby.iapi.sql.execute.CursorResultSet;
027: import org.apache.derby.iapi.sql.execute.ExecRow;
028: import org.apache.derby.iapi.sql.execute.ExecutionFactory;
029: import org.apache.derby.iapi.sql.execute.TemporaryRowHolder;
030: import org.apache.derby.iapi.sql.Activation;
031: import org.apache.derby.iapi.sql.ResultDescription;
032: import org.apache.derby.iapi.store.access.ConglomerateController;
033: import org.apache.derby.iapi.store.access.ScanController;
034: import org.apache.derby.iapi.store.access.TransactionController;
035:
036: import org.apache.derby.iapi.types.CloneableObject;
037: import org.apache.derby.iapi.types.RowLocation;
038: import org.apache.derby.iapi.types.DataValueDescriptor;
039: import org.apache.derby.iapi.types.SQLRef;
040: import org.apache.derby.iapi.types.SQLLongint;
041:
042: import org.apache.derby.iapi.services.io.FormatableBitSet;
043: import java.util.Properties;
044:
045: /**
046: * This is a class that is used to temporarily
047: * (non-persistently) hold rows that are used in
048: * language execution. It will store them in an
049: * array, or a temporary conglomerate, depending
050: * on the number of rows.
051: * <p>
052: * It is used for deferred DML processing.
053: *
054: * @author jamie
055: */
056: class TemporaryRowHolderImpl implements TemporaryRowHolder {
057: public static final int DEFAULT_OVERFLOWTHRESHOLD = 5;
058:
059: protected static final int STATE_UNINIT = 0;
060: protected static final int STATE_INSERT = 1;
061: protected static final int STATE_DRAIN = 2;
062:
063: protected ExecRow[] rowArray;
064: protected int lastArraySlot;
065: private int numRowsIn;
066: protected int state = STATE_UNINIT;
067:
068: protected long CID;
069: private boolean conglomCreated;
070: private ConglomerateController cc;
071: private Properties properties;
072: private ScanController scan;
073: private ResultDescription resultDescription;
074: /** Activation object with local state information. */
075: Activation activation;
076:
077: private boolean isUniqueStream;
078:
079: /* beetle 3865 updateable cursor use index. A virtual memory heap is a heap that has in-memory
080: * part to get better performance, less overhead. No position index needed. We read from and write
081: * to the in-memory part as much as possible. And we can insert after we start retrieving results.
082: * Could be used for other things too.
083: */
084: private boolean isVirtualMemHeap;
085: private boolean uniqueIndexCreated;
086: private boolean positionIndexCreated;
087: private long uniqueIndexConglomId;
088: private long positionIndexConglomId;
089: private ConglomerateController uniqueIndex_cc;
090: private ConglomerateController positionIndex_cc;
091: private DataValueDescriptor[] uniqueIndexRow = null;
092: private DataValueDescriptor[] positionIndexRow = null;
093: private RowLocation destRowLocation; //row location in the temporary conglomerate
094: private SQLLongint position_sqllong;
095:
096: /**
097: * Uses the default overflow to
098: * a conglomerate threshold (5).
099: *
100: * @param activation the activation
101: * @param properties the properties of the original table. Used
102: * to help the store use optimal page size, etc.
103: * @param resultDescription the result description. Relevant for the getResultDescription
104: * call on the result set returned by getResultSet. May be null
105: */
106: public TemporaryRowHolderImpl(Activation activation,
107: Properties properties, ResultDescription resultDescription) {
108: this (activation, properties, resultDescription,
109: DEFAULT_OVERFLOWTHRESHOLD, false, false);
110: }
111:
112: /**
113: * Uses the default overflow to
114: * a conglomerate threshold (5).
115: *
116: * @param activation the activation
117: * @param properties the properties of the original table. Used
118: * to help the store use optimal page size, etc.
119: * @param resultDescription the result description. Relevant for the getResultDescription
120: * call on the result set returned by getResultSet. May be null
121: * @param isUniqueStream - true , if it has to be temporary row holder unique stream
122: */
123: public TemporaryRowHolderImpl(Activation activation,
124: Properties properties, ResultDescription resultDescription,
125: boolean isUniqueStream) {
126: this (activation, properties, resultDescription, 1,
127: isUniqueStream, false);
128: }
129:
130: /**
131: * Create a temporary row holder with the defined overflow to conglom
132: *
133: * @param activation the activation
134: * @param properties the properties of the original table. Used
135: * to help the store use optimal page size, etc.
136: * @param resultDescription the result description. Relevant for the getResultDescription
137: * call on the result set returned by getResultSet. May be null
138: * @param overflowToConglomThreshold on an attempt to insert
139: * this number of rows, the rows will be put
140: * into a temporary conglomerate.
141: */
142: public TemporaryRowHolderImpl(Activation activation,
143: Properties properties, ResultDescription resultDescription,
144: int overflowToConglomThreshold, boolean isUniqueStream,
145: boolean isVirtualMemHeap) {
146: if (SanityManager.DEBUG) {
147: if (overflowToConglomThreshold <= 0) {
148: SanityManager
149: .THROWASSERT("It is assumed that "
150: + "the overflow threshold is > 0. "
151: + "If you you need to change this you have to recode some of "
152: + "this class.");
153: }
154: }
155:
156: this .activation = activation;
157: this .properties = properties;
158: this .resultDescription = resultDescription;
159: this .isUniqueStream = isUniqueStream;
160: this .isVirtualMemHeap = isVirtualMemHeap;
161: rowArray = new ExecRow[overflowToConglomThreshold];
162: lastArraySlot = -1;
163: }
164:
165: /* Avoid materializing a stream just because it goes through a temp table. It is OK to
166: * have a stream in the temp table (in memory or spilled to disk). The assumption is
167: * that one stream does not appear in two rows. For "update", one stream can be in two
168: * rows and the materialization is done in UpdateResultSet. Note to future users of this
169: * class who may insert a stream into this temp holder: (1) As mentioned above, one
170: * un-materialized stream can't appear in two rows; you need to objectify it first otherwise.
171: * (2) If you need to retrieve a un-materialized stream more than once from the temp holder,
172: * you need to either materialize the stream the first time, or, if there's a memory constraint,
173: * in the first time create a RememberBytesInputStream with the byte holder being
174: * BackingStoreByteHolder, finish it, and reset it after usage.
175: * beetle 4896.
176: */
177: private ExecRow cloneRow(ExecRow inputRow) {
178: DataValueDescriptor[] cols = inputRow.getRowArray();
179: int ncols = cols.length;
180: ExecRow cloned = ((ValueRow) inputRow).cloneMe();
181: for (int i = 0; i < ncols; i++) {
182: if (cols[i] != null) {
183: /* Rows are 1-based, cols[] is 0-based */
184: cloned
185: .setColumn(
186: i + 1,
187: (DataValueDescriptor) ((CloneableObject) cols[i])
188: .cloneObject());
189: }
190: }
191: if (inputRow instanceof IndexValueRow)
192: return new IndexValueRow(cloned);
193: else
194: return cloned;
195: }
196:
197: /**
198: * Insert a row
199: *
200: * @param inputRow the row to insert
201: *
202: * @exception StandardException on error
203: */
204: public void insert(ExecRow inputRow) throws StandardException {
205:
206: if (SanityManager.DEBUG) {
207: if (!isUniqueStream && !isVirtualMemHeap)
208: SanityManager
209: .ASSERT(state != STATE_DRAIN,
210: "you cannot insert rows after starting to drain");
211: }
212: if (!isVirtualMemHeap)
213: state = STATE_INSERT;
214:
215: if (uniqueIndexCreated) {
216: if (isRowAlreadyExist(inputRow))
217: return;
218: }
219:
220: numRowsIn++;
221:
222: if (lastArraySlot + 1 < rowArray.length) {
223: rowArray[++lastArraySlot] = cloneRow(inputRow);
224:
225: //In case of unique stream we push every thing into the
226: // conglomerates for time being, we keep one row in the array for
227: // the template.
228: if (!isUniqueStream)
229: return;
230: }
231:
232: if (!conglomCreated) {
233: TransactionController tc = activation
234: .getTransactionController();
235:
236: /*
237: ** Create the conglomerate with the template row.
238: */
239: CID = tc.createConglomerate("heap", inputRow.getRowArray(),
240: null, //column sort order - not required for heap
241: properties, TransactionController.IS_TEMPORARY
242: | TransactionController.IS_KEPT);
243: conglomCreated = true;
244:
245: cc = tc.openConglomerate(CID, false,
246: TransactionController.OPENMODE_FORUPDATE,
247: TransactionController.MODE_TABLE,
248: TransactionController.ISOLATION_SERIALIZABLE);
249: if (isUniqueStream)
250: destRowLocation = cc.newRowLocationTemplate();
251:
252: }
253:
254: int status = 0;
255: if (isUniqueStream) {
256: cc.insertAndFetchLocation(inputRow.getRowArray(),
257: destRowLocation);
258: insertToPositionIndex(numRowsIn - 1, destRowLocation);
259: //create the unique index based on input row ROW Location
260: if (!uniqueIndexCreated)
261: isRowAlreadyExist(inputRow);
262:
263: } else {
264: status = cc.insert(inputRow.getRowArray());
265: if (isVirtualMemHeap)
266: state = STATE_INSERT;
267: }
268:
269: if (SanityManager.DEBUG) {
270: if (status != 0) {
271: SanityManager.THROWASSERT("got funky status (" + status
272: + ") back from "
273: + "ConglomerateConstroller.insert()");
274: }
275: }
276: }
277:
278: /**
279: * Maintain an unique index based on the input row's row location in the
280: * base table, this index make sures that we don't insert duplicate rows
281: * into the temporary heap.
282: * @param inputRow the row we are inserting to temporary row holder
283: * @exception StandardException on error
284: */
285:
286: private boolean isRowAlreadyExist(ExecRow inputRow)
287: throws StandardException {
288: DataValueDescriptor rlColumn;
289: RowLocation baseRowLocation;
290: rlColumn = inputRow.getColumn(inputRow.nColumns());
291:
292: if (CID != 0 && rlColumn instanceof SQLRef) {
293: baseRowLocation = (RowLocation) (rlColumn).getObject();
294:
295: if (!uniqueIndexCreated) {
296: TransactionController tc = activation
297: .getTransactionController();
298: int numKeys = 2;
299: uniqueIndexRow = new DataValueDescriptor[numKeys];
300: uniqueIndexRow[0] = baseRowLocation;
301: uniqueIndexRow[1] = baseRowLocation;
302: Properties props = makeIndexProperties(uniqueIndexRow,
303: CID);
304: uniqueIndexConglomId = tc.createConglomerate("BTREE",
305: uniqueIndexRow, null, props,
306: TransactionController.IS_TEMPORARY
307: | TransactionController.IS_KEPT);
308: uniqueIndex_cc = tc.openConglomerate(
309: uniqueIndexConglomId, false,
310: TransactionController.OPENMODE_FORUPDATE,
311: TransactionController.MODE_TABLE,
312: TransactionController.ISOLATION_SERIALIZABLE);
313: uniqueIndexCreated = true;
314: }
315:
316: uniqueIndexRow[0] = baseRowLocation;
317: uniqueIndexRow[1] = baseRowLocation;
318: // Insert the row into the secondary index.
319: int status;
320: if ((status = uniqueIndex_cc.insert(uniqueIndexRow)) != 0) {
321: if (status == ConglomerateController.ROWISDUPLICATE) {
322: return true; // okay; we don't insert duplicates
323: } else {
324: if (SanityManager.DEBUG) {
325: if (status != 0) {
326: SanityManager
327: .THROWASSERT("got funky status ("
328: + status + ") back from "
329: + "Unique Index insert()");
330: }
331: }
332: }
333: }
334: }
335:
336: return false;
337: }
338:
339: /**
340: * Maintain an index that will allow us to read from the
341: * temporary heap in the order we inserted.
342: * @param position - the number of the row we are inserting into heap
343: * @param rl the row to Location in the temporary heap
344: * @exception StandardException on error
345: */
346:
347: private void insertToPositionIndex(int position, RowLocation rl)
348: throws StandardException {
349: if (!positionIndexCreated) {
350: TransactionController tc = activation
351: .getTransactionController();
352: int numKeys = 2;
353: position_sqllong = new SQLLongint();
354: positionIndexRow = new DataValueDescriptor[numKeys];
355: positionIndexRow[0] = position_sqllong;
356: positionIndexRow[1] = rl;
357: Properties props = makeIndexProperties(positionIndexRow,
358: CID);
359: positionIndexConglomId = tc.createConglomerate("BTREE",
360: positionIndexRow, null, props,
361: TransactionController.IS_TEMPORARY
362: | TransactionController.IS_KEPT);
363: positionIndex_cc = tc.openConglomerate(
364: positionIndexConglomId, false,
365: TransactionController.OPENMODE_FORUPDATE,
366: TransactionController.MODE_TABLE,
367: TransactionController.ISOLATION_SERIALIZABLE);
368: positionIndexCreated = true;
369: }
370:
371: position_sqllong.setValue(position);
372: positionIndexRow[0] = position_sqllong;
373: positionIndexRow[1] = rl;
374: //insert the row location to position index
375: positionIndex_cc.insert(positionIndexRow);
376: }
377:
378: /**
379: * Get a result set for scanning what has been inserted
380: * so far.
381: *
382: * @return a result set to use
383: */
384: public CursorResultSet getResultSet() {
385: state = STATE_DRAIN;
386: TransactionController tc = activation
387: .getTransactionController();
388: if (isUniqueStream) {
389: return new TemporaryRowHolderResultSet(tc, rowArray,
390: resultDescription, isVirtualMemHeap, true,
391: positionIndexConglomId, this );
392: } else {
393: return new TemporaryRowHolderResultSet(tc, rowArray,
394: resultDescription, isVirtualMemHeap, this );
395:
396: }
397: }
398:
399: /**
400: * Purge the row holder of all its rows.
401: * Resets the row holder so that it can
402: * accept new inserts. A cheap way to
403: * recycle a row holder.
404: *
405: * @exception StandardException on error
406: */
407: public void truncate() throws StandardException {
408: close();
409:
410: for (int i = 0; i < rowArray.length; i++) {
411: rowArray[i] = null;
412: }
413: lastArraySlot = -1;
414: numRowsIn = 0;
415: state = STATE_UNINIT;
416:
417: /*
418: ** We are not expecting this to be called
419: ** when we have a temporary conglomerate
420: ** but just to be on the safe side, drop
421: ** it. We'd like do something cheaper,
422: ** but there is no truncate on congloms.
423: */
424: if (conglomCreated) {
425: TransactionController tc = activation
426: .getTransactionController();
427: tc.dropConglomerate(CID);
428: conglomCreated = false;
429: }
430: }
431:
432: public long getTemporaryConglomId() {
433: return CID;
434: }
435:
436: public long getPositionIndexConglomId() {
437: return positionIndexConglomId;
438: }
439:
440: private Properties makeIndexProperties(
441: DataValueDescriptor[] indexRowArray, long conglomId)
442: throws StandardException {
443: int nCols = indexRowArray.length;
444: Properties props = new Properties();
445: props.put("allowDuplicates", "false");
446: // all columns form the key, (currently) required
447: props.put("nKeyFields", String.valueOf(nCols));
448: props.put("nUniqueColumns", String.valueOf(nCols - 1));
449: props.put("rowLocationColumn", String.valueOf(nCols - 1));
450: props.put("baseConglomerateId", String.valueOf(conglomId));
451: return props;
452: }
453:
454: public void setRowHolderTypeToUniqueStream() {
455: isUniqueStream = true;
456: }
457:
458: /**
459: * Clean up
460: *
461: * @exception StandardException on error
462: */
463: public void close() throws StandardException {
464: if (scan != null) {
465: scan.close();
466: scan = null;
467: }
468:
469: if (cc != null) {
470: cc.close();
471: cc = null;
472: }
473:
474: if (uniqueIndex_cc != null) {
475: uniqueIndex_cc.close();
476: uniqueIndex_cc = null;
477: }
478:
479: if (positionIndex_cc != null) {
480: positionIndex_cc.close();
481: positionIndex_cc = null;
482: }
483:
484: TransactionController tc = activation
485: .getTransactionController();
486:
487: if (uniqueIndexCreated) {
488: tc.dropConglomerate(uniqueIndexConglomId);
489: uniqueIndexCreated = false;
490: }
491:
492: if (positionIndexCreated) {
493: tc.dropConglomerate(positionIndexConglomId);
494: uniqueIndexCreated = false;
495: }
496:
497: if (conglomCreated) {
498: tc.dropConglomerate(CID);
499: conglomCreated = false;
500: }
501:
502: state = STATE_UNINIT;
503: lastArraySlot = -1;
504: }
505: }
|