001: /*-
002: * See the file LICENSE for redistribution information.
003: *
004: * Copyright (c) 2002,2008 Oracle. All rights reserved.
005: *
006: * $Id: JoinCursor.java,v 1.15.2.3 2008/01/07 15:14:08 cwl Exp $
007: */
008:
009: package com.sleepycat.je;
010:
011: import java.util.Arrays;
012: import java.util.Comparator;
013: import java.util.logging.Level;
014:
015: import com.sleepycat.je.dbi.GetMode;
016: import com.sleepycat.je.dbi.CursorImpl.SearchMode;
017: import com.sleepycat.je.txn.Locker;
018: import com.sleepycat.je.utilint.DatabaseUtil;
019:
020: /**
021: * Javadoc for this public class is generated
022: * via the doc templates in the doc_src directory.
023: */
024: public class JoinCursor {
025:
026: private JoinConfig config;
027: private Database priDb;
028: private Cursor priCursor;
029: private Cursor[] secCursors;
030: private DatabaseEntry[] cursorScratchEntries;
031: private DatabaseEntry scratchEntry;
032:
033: /**
034: * Creates a join cursor without parameter checking.
035: */
036: JoinCursor(Locker locker, Database primaryDb,
037: final Cursor[] cursors, JoinConfig configParam)
038: throws DatabaseException {
039:
040: priDb = primaryDb;
041: config = (configParam != null) ? configParam.cloneConfig()
042: : JoinConfig.DEFAULT;
043: scratchEntry = new DatabaseEntry();
044: cursorScratchEntries = new DatabaseEntry[cursors.length];
045: Cursor[] sortedCursors = new Cursor[cursors.length];
046: System.arraycopy(cursors, 0, sortedCursors, 0, cursors.length);
047:
048: if (!config.getNoSort()) {
049:
050: /*
051: * Sort ascending by duplicate count. Collect counts before
052: * sorting so that countInternal() is called only once per cursor.
053: * Use READ_UNCOMMITTED to avoid blocking writers.
054: */
055: final int[] counts = new int[cursors.length];
056: for (int i = 0; i < cursors.length; i += 1) {
057: counts[i] = cursors[i]
058: .countInternal(LockMode.READ_UNCOMMITTED);
059: assert counts[i] >= 0;
060: }
061: Arrays.sort(sortedCursors, new Comparator() {
062: public int compare(Object o1, Object o2) {
063: int count1 = -1;
064: int count2 = -1;
065:
066: /*
067: * Scan for objects in cursors not sortedCursors since
068: * sortedCursors is being sorted in place.
069: */
070: for (int i = 0; i < cursors.length
071: && (count1 < 0 || count2 < 0); i += 1) {
072: if (cursors[i] == o1) {
073: count1 = counts[i];
074: } else if (cursors[i] == o2) {
075: count2 = counts[i];
076: }
077: }
078: assert count1 >= 0 && count2 >= 0;
079: return (count1 - count2);
080: }
081: });
082: }
083:
084: /*
085: * Open and dup cursors last. If an error occurs before the
086: * constructor is complete, close them and ignore exceptions during
087: * close.
088: */
089: try {
090: priCursor = new Cursor(priDb, locker, null);
091: secCursors = new Cursor[cursors.length];
092: for (int i = 0; i < cursors.length; i += 1) {
093: secCursors[i] = new Cursor(sortedCursors[i], true);
094: }
095: } catch (DatabaseException e) {
096: close(e); /* will throw e */
097: }
098: }
099:
100: /**
101: * Javadoc for this public method is generated via
102: * the doc templates in the doc_src directory.
103: */
104: public void close() throws DatabaseException {
105:
106: if (priCursor == null) {
107: throw new DatabaseException("Already closed");
108: }
109: close(null);
110: }
111:
112: /**
113: * Close all cursors we own, throwing only the first exception that occurs.
114: *
115: * @param firstException an exception that has already occured, or null.
116: */
117: private void close(DatabaseException firstException)
118: throws DatabaseException {
119:
120: if (priCursor != null) {
121: try {
122: priCursor.close();
123: } catch (DatabaseException e) {
124: if (firstException == null) {
125: firstException = e;
126: }
127: }
128: priCursor = null;
129: }
130: for (int i = 0; i < secCursors.length; i += 1) {
131: if (secCursors[i] != null) {
132: try {
133: secCursors[i].close();
134: } catch (DatabaseException e) {
135: if (firstException == null) {
136: firstException = e;
137: }
138: }
139: secCursors[i] = null;
140: }
141: }
142: if (firstException != null) {
143: throw firstException;
144: }
145: }
146:
147: /**
148: * For unit testing.
149: */
150: Cursor[] getSortedCursors() {
151: return secCursors;
152: }
153:
154: /**
155: * Javadoc for this public method is generated via
156: * the doc templates in the doc_src directory.
157: */
158: public Database getDatabase() {
159:
160: return priDb;
161: }
162:
163: /**
164: * Javadoc for this public method is generated via
165: * the doc templates in the doc_src directory.
166: */
167: public JoinConfig getConfig() {
168:
169: return config.cloneConfig();
170: }
171:
172: /**
173: * Javadoc for this public method is generated via
174: * the doc templates in the doc_src directory.
175: */
176: public OperationStatus getNext(DatabaseEntry key, LockMode lockMode)
177: throws DatabaseException {
178:
179: priCursor.checkEnv();
180: DatabaseUtil.checkForNullDbt(key, "key", false);
181: priCursor.trace(Level.FINEST, "JoinCursor.getNext(key): ",
182: lockMode);
183:
184: return retrieveNext(key, null, lockMode);
185: }
186:
187: /**
188: * Javadoc for this public method is generated via
189: * the doc templates in the doc_src directory.
190: */
191: public OperationStatus getNext(DatabaseEntry key,
192: DatabaseEntry data, LockMode lockMode)
193: throws DatabaseException {
194:
195: priCursor.checkEnv();
196: DatabaseUtil.checkForNullDbt(key, "key", false);
197: DatabaseUtil.checkForNullDbt(data, "data", false);
198: priCursor.trace(Level.FINEST, "JoinCursor.getNext(key,data): ",
199: lockMode);
200:
201: return retrieveNext(key, data, lockMode);
202: }
203:
204: /**
205: * Internal version of getNext(), with an optional data param.
206: * <p>
207: * Since duplicates are always sorted and duplicate-duplicates are not
208: * allowed, a natural join can be implemented by simply traversing through
209: * the duplicates of the first cursor to find candidate keys, and then
210: * looking for each candidate key in the duplicate set of the other
211: * cursors, without ever reseting a cursor to the beginning of the
212: * duplicate set.
213: * <p>
214: * This only works when the same duplicate comparison method is used for
215: * all cursors. We don't check for that, we just assume the user won't
216: * violate that rule.
217: * <p>
218: * A future optimization would be to add a SearchMode.BOTH_DUPS operation
219: * and use it instead of using SearchMode.BOTH. This would be the
220: * equivalent of the undocumented DB_GET_BOTHC operation used by DB core's
221: * join() implementation.
222: */
223: private OperationStatus retrieveNext(DatabaseEntry keyParam,
224: DatabaseEntry dataParam, LockMode lockMode)
225: throws DatabaseException {
226:
227: outerLoop: while (true) {
228:
229: /* Process the first cursor to get a candidate key. */
230: Cursor secCursor = secCursors[0];
231: DatabaseEntry candidateKey = cursorScratchEntries[0];
232: OperationStatus status;
233: if (candidateKey == null) {
234: /* Get first duplicate at initial cursor position. */
235: candidateKey = new DatabaseEntry();
236: cursorScratchEntries[0] = candidateKey;
237: status = secCursor.getCurrentInternal(scratchEntry,
238: candidateKey, lockMode);
239: } else {
240: /* Already initialized, move to the next candidate key. */
241: status = secCursor.retrieveNext(scratchEntry,
242: candidateKey, lockMode, GetMode.NEXT_DUP);
243: }
244: if (status != OperationStatus.SUCCESS) {
245: /* No more candidate keys. */
246: return status;
247: }
248:
249: /* Process the second and following cursors. */
250: for (int i = 1; i < secCursors.length; i += 1) {
251: secCursor = secCursors[i];
252: DatabaseEntry secKey = cursorScratchEntries[i];
253: if (secKey == null) {
254: secKey = new DatabaseEntry();
255: cursorScratchEntries[i] = secKey;
256: status = secCursor.getCurrentInternal(secKey,
257: scratchEntry, lockMode);
258: assert status == OperationStatus.SUCCESS;
259: }
260: scratchEntry.setData(secKey.getData(), secKey
261: .getOffset(), secKey.getSize());
262: status = secCursor.search(scratchEntry, candidateKey,
263: lockMode, SearchMode.BOTH);
264: if (status != OperationStatus.SUCCESS) {
265: /* No match, get another candidate key. */
266: continue outerLoop;
267: }
268: }
269:
270: /* The candidate key was found for all cursors. */
271: if (dataParam != null) {
272: status = priCursor.search(candidateKey, dataParam,
273: lockMode, SearchMode.SET);
274: if (status != OperationStatus.SUCCESS) {
275: throw new DatabaseException("Secondary corrupt");
276: }
277: }
278: keyParam.setData(candidateKey.getData(), candidateKey
279: .getOffset(), candidateKey.getSize());
280: return OperationStatus.SUCCESS;
281: }
282: }
283: }
|