001: package it.unimi.dsi.mg4j.tool;
002:
003: /*
004: * MG4J: Managing Gigabytes for Java
005: *
006: * Copyright (C) 2005-2007 Sebastiano Vigna
007: *
008: * This library is free software; you can redistribute it and/or modify it
009: * under the terms of the GNU Lesser General Public License as published by the Free
010: * Software Foundation; either version 2.1 of the License, or (at your option)
011: * any later version.
012: *
013: * This library is distributed in the hope that it will be useful, but
014: * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
015: * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
016: * for more details.
017: *
018: * You should have received a copy of the GNU Lesser General Public License
019: * along with this program; if not, write to the Free Software
020: * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
021: *
022: */
023:
024: import it.unimi.dsi.fastutil.ints.AbstractIntComparator;
025: import it.unimi.dsi.fastutil.ints.IntHeapPriorityQueue;
026: import it.unimi.dsi.fastutil.ints.IntIterator;
027: import it.unimi.dsi.mg4j.index.BitStreamIndex;
028: import it.unimi.dsi.mg4j.index.CachingOutputBitStream;
029: import it.unimi.dsi.mg4j.index.Index;
030: import it.unimi.dsi.mg4j.index.IndexIterator;
031: import it.unimi.dsi.mg4j.index.CompressionFlags.Coding;
032: import it.unimi.dsi.mg4j.index.CompressionFlags.Component;
033: import it.unimi.dsi.io.InputBitStream;
034: import it.unimi.dsi.io.OutputBitStream;
035: import it.unimi.dsi.Util;
036:
037: import java.io.Closeable;
038: import java.io.File;
039: import java.io.IOException;
040: import java.lang.reflect.InvocationTargetException;
041: import java.net.URISyntaxException;
042: import java.util.Map;
043:
044: import org.apache.commons.configuration.ConfigurationException;
045: import org.apache.log4j.Logger;
046:
047: import com.martiansoftware.jsap.JSAPException;
048:
049: /** Pastes several indices.
050: *
051: * <p>Pasting is a very slow way of combining indices: we assume
052: * that not only documents, but also document occurrences might be scattered
053: * throughout several indices. When a document appears in several indices,
054: * its occurrences in a given index are combined by renumbering them starting
055: * from the sum of the sizes for the document in the previous indices.
056: *
057: * <p>Conceptually, this operation is equivalent to splitting a collection
058: * <em>vertically</em>: each document is divided into a fixed number <var>n</var>
059: * of consecutive segments (possibly of length 0), and a set of <var>n</var> indices
060: * is created using the <var>k</var>-th segment of all documents. Pasting the
061: * resulting indices will produce an index that is identical to the index generated
062: * by the original collection. The behaviour is analogous to that of the UN*X
063: * <samp>paste</samp> command if documents are single-line lists of words.
064: *
065: * <p>In pratice, pasting is usually applied to indices obtained from
066: * a {@linkplain it.unimi.dsi.mg4j.document.DocumentFactory.FieldType#VIRTUAL virtual field}
067: * (e.g., indices containing anchor text fragments).
068: *
069: * <p>Note that in case every document appears at most in one index pasting
070: * is equivalent to {@linkplain it.unimi.dsi.mg4j.tool.Merge merging}. It is, however,
071: * significantly slower, as the presence of the same document in several lists makes
072: * it necessary to scan completely the inverted lists to be pasted to compute the
073: * frequency.
074: *
075: * @author Sebastiano Vigna
076: * @since 1.0
077: */
078:
079: final public class Paste extends Combine {
080: @SuppressWarnings("unused")
081: private static final Logger LOGGER = Util.getLogger(Paste.class);
082:
083: /** The default size of the temporary bit stream buffer used while pasting. Posting lists larger
084: * than this size will be precomputed on disk and then added to the index. */
085: public final static int DEFAULT_MEMORY_BUFFER_SIZE = 16 * 1024 * 1024;
086:
087: /** The reference array of the document queue. */
088: protected int[] doc;
089: /** The queue containing document pointers (for remapped indices). */
090: protected IntHeapPriorityQueue documentQueue;
091: /** The temporary cache file {@link #combine(int)}. */
092: private File tempFile;
093: /** The temporary output bit stream for {@link #combine(int)}. */
094: private CachingOutputBitStream cacheBitStreamOut;
095: /** The temporary output bit stream for {@link #combine(int)}. */
096: private InputBitStream cacheBitStreamIn;
097: /** The input bit stream used to wrap directly {@link #cacheBitStreamOut}'s buffer. */
098: private InputBitStream cacheBitStreamInWrapper;
099: /** The size of the size list for each index. */
100: private final int[] sizesSize;
101:
102: public Paste(final String outputBasename,
103: final String[] inputBasename, final boolean metadataOnly,
104: final int bufferSize, final File tempFileDir,
105: final int tempBufferSize,
106: final Map<Component, Coding> writerFlags,
107: final boolean interleaved, final boolean skips,
108: final int quantum, final int height,
109: final int skipBufferSize, final long logInterval)
110: throws IOException, ConfigurationException,
111: URISyntaxException, ClassNotFoundException,
112: SecurityException, InstantiationException,
113: IllegalAccessException, InvocationTargetException,
114: NoSuchMethodException {
115: super (outputBasename, inputBasename, metadataOnly, bufferSize,
116: writerFlags, interleaved, skips, quantum, height,
117: skipBufferSize, logInterval);
118:
119: tempFile = File.createTempFile("MG4J", ".data", tempFileDir);
120: cacheBitStreamOut = new CachingOutputBitStream(tempFile,
121: tempBufferSize);
122: cacheBitStreamIn = new InputBitStream(tempFile, bufferSize);
123: cacheBitStreamInWrapper = new InputBitStream(cacheBitStreamOut
124: .buffer());
125: /* In this case, we must reallocate position as by merging occurences we might
126: * obtain an occurrence list as large as the concatenation of all largest
127: * lists. We use this estimate to allocate position, and update maxCount in
128: * combine() to get the real maxCount. */
129: int estimateForMaxCount = 0, tempSize = 0;
130: sizesSize = new int[numIndices];
131:
132: for (int i = 0; i < numIndices; i++) {
133: if (index[i].hasPayloads)
134: throw new IllegalArgumentException(
135: "You cannot paste indices with payloads");
136: sizesSize[i] = index[i].sizes.size();
137: estimateForMaxCount += index[i].maxCount;
138: tempSize = Math.max(tempSize, index[i].maxCount);
139: }
140:
141: position = new int[estimateForMaxCount];
142: doc = new int[numIndices];
143: documentQueue = new IntHeapPriorityQueue(numIndices,
144: new DocumentIndexComparator(doc));
145: }
146:
147: /** A comparator making an integer priority queue work much like an indirect
148: * priority queue, with the additional property of using the reference index as secondary key.
149: */
150:
151: private final static class DocumentIndexComparator extends
152: AbstractIntComparator {
153: private final int[] refArray;
154:
155: public DocumentIndexComparator(final int[] refArray) {
156: this .refArray = refArray;
157: }
158:
159: public int compare(final int i, final int j) {
160: final int t = refArray[i] - refArray[j];
161: return t != 0 ? t : i - j;
162: }
163: }
164:
165: /** Returns an index with given basename, loading document sizes.
166: *
167: * @param basename an index basename.
168: * @return an index loaded with document sizes.
169: */
170:
171: protected BitStreamIndex getIndex(final CharSequence basename)
172: throws ConfigurationException, IOException,
173: URISyntaxException, ClassNotFoundException,
174: SecurityException, InstantiationException,
175: IllegalAccessException, InvocationTargetException,
176: NoSuchMethodException {
177: return (BitStreamIndex) Index.getInstance(basename, false,
178: true, false);
179: }
180:
181: protected int combineNumberOfDocuments() {
182: int n = 0;
183: for (int i = 0; i < numIndices; i++)
184: n = Math.max(n, index[i].numberOfDocuments);
185: return n;
186: }
187:
188: protected int combineSizes() throws IOException {
189: int currDoc = 0, maxDocSize = 0;
190: for (int i = 0; i < numIndices; i++) {
191: final IntIterator sizes = sizes(i);
192: int s = 0;
193: int j = index[i].numberOfDocuments;
194: currDoc = 0;
195: while (j-- != 0) {
196: s = (size[currDoc++] += sizes.nextInt());
197: if (s > maxDocSize)
198: maxDocSize = s;
199: }
200: if (sizes instanceof Closeable)
201: ((Closeable) sizes).close();
202: }
203: return maxDocSize;
204: }
205:
206: protected int combine(final int numUsedIndices) throws IOException {
207: /* If we're merging just one list, merging is fine, and moreover
208: * maxCount need not be updated, as it is already initialised to
209: * the maximum over all indices. */
210: int currIndex, prevDoc = -1, currDoc, count;
211: int temp[];
212: OutputBitStream obs;
213: Index i;
214: IndexIterator ii;
215:
216: // Note that the total frequency can be computed only during the merge.
217: for (int k = numUsedIndices; k-- != 0;) {
218: currIndex = usedIndex[k];
219: frequency[currIndex] = indexIterator[currIndex].frequency();
220: doc[currIndex] = indexIterator[currIndex].nextDocument();
221: documentQueue.enqueue(currIndex);
222: }
223:
224: // First phase: we write the inverted list using a quick-and-dirty format in the cache.
225: cacheBitStreamOut.position(0);
226: int totalFrequency = 0, increment, totalCount, prevIndex;
227:
228: while (!documentQueue.isEmpty()) {
229: // We extract the smallest document pointer, and enqueue it in the new index.
230: currDoc = doc[currIndex = documentQueue.firstInt()];
231: cacheBitStreamOut.writeDelta(currDoc - prevDoc - 1);
232: totalFrequency++;
233:
234: totalCount = 0;
235: increment = 0;
236: prevIndex = 0;
237:
238: do {
239: while (prevIndex < currIndex) {
240: /* Note that some virtual documents could not exist at all in some index (in which
241: * case we extend the size list with zeroes). */
242: if (sizesSize[prevIndex] > currDoc)
243: increment += index[prevIndex].sizes
244: .getInt(currDoc);
245: prevIndex++;
246: }
247: i = index[currIndex];
248: ii = indexIterator[currIndex];
249:
250: if (i.hasCounts) {
251: count = ii.count();
252: if (i.hasPositions) {
253: temp = ii.positionArray();
254: for (int k = count; k-- != 0;)
255: position[totalCount + k] = temp[k]
256: + increment;
257: }
258: totalCount += count;
259: }
260:
261: // If we just wrote the last document pointer of this term in index j, we dequeue it.
262: if (--frequency[currIndex] == 0)
263: documentQueue.dequeue();
264: else {
265: doc[currIndex] = ii.nextDocument();
266: documentQueue.changed();
267: }
268: } while (!documentQueue.isEmpty()
269: && doc[currIndex = documentQueue.firstInt()] == currDoc);
270:
271: if (totalCount > maxCount)
272: maxCount = totalCount;
273:
274: if (hasCounts) {
275: cacheBitStreamOut.writeGamma(totalCount);
276: if (hasPositions) {
277: cacheBitStreamOut.writeDelta(position[0]);
278: for (int k = 1; k < totalCount; k++)
279: cacheBitStreamOut.writeDelta(position[k]
280: - position[k - 1] - 1);
281: }
282: }
283:
284: prevDoc = currDoc;
285: }
286:
287: // Finally, we pour the data into the actual index.
288:
289: indexWriter.newInvertedList();
290: indexWriter.writeFrequency(totalFrequency);
291: cacheBitStreamOut.align();
292: final InputBitStream ibs;
293:
294: if (cacheBitStreamOut.buffer() != null)
295: ibs = cacheBitStreamInWrapper;
296: else {
297: cacheBitStreamOut.flush();
298: ibs = cacheBitStreamIn;
299: ibs.flush();
300: }
301:
302: ibs.position(0);
303:
304: currDoc = -1;
305: for (int j = totalFrequency; j-- != 0;) {
306: obs = indexWriter.newDocumentRecord();
307: indexWriter.writeDocumentPointer(obs, currDoc = ibs
308: .readDelta()
309: + currDoc + 1);
310: count = ibs.readGamma();
311: if (hasCounts) {
312: indexWriter.writePositionCount(obs, count);
313: if (hasPositions) {
314: position[0] = ibs.readDelta();
315: for (int k = 1; k < count; k++)
316: position[k] = position[k - 1] + ibs.readDelta()
317: + 1;
318: indexWriter
319: .writeDocumentPositions(obs, position, 0,
320: count, size != null ? size[currDoc]
321: : -1);
322: }
323: }
324: }
325: return totalFrequency;
326: }
327:
328: public void run() throws ConfigurationException, IOException {
329: super .run();
330: cacheBitStreamOut.close();
331: tempFile.delete();
332: }
333:
334: public static void main(String arg[])
335: throws ConfigurationException, SecurityException,
336: JSAPException, IOException, URISyntaxException,
337: ClassNotFoundException, InstantiationException,
338: IllegalAccessException, InvocationTargetException,
339: NoSuchMethodException {
340: Combine.main(arg, Paste.class);
341: }
342: }
|