001: package org.apache.lucene.index;
002:
003: /**
004: * Licensed to the Apache Software Foundation (ASF) under one or more
005: * contributor license agreements. See the NOTICE file distributed with
006: * this work for additional information regarding copyright ownership.
007: * The ASF licenses this file to You under the Apache License, Version 2.0
008: * (the "License"); you may not use this file except in compliance with
009: * the License. You may obtain a copy of the License at
010: *
011: * http://www.apache.org/licenses/LICENSE-2.0
012: *
013: * Unless required by applicable law or agreed to in writing, software
014: * distributed under the License is distributed on an "AS IS" BASIS,
015: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016: * See the License for the specific language governing permissions and
017: * limitations under the License.
018: */
019:
020: import java.io.File;
021: import java.io.IOException;
022: import java.io.Reader;
023: import java.util.ArrayList;
024: import java.util.HashMap;
025: import java.util.List;
026: import java.util.Map;
027: import java.util.Random;
028:
029: import org.apache.lucene.util.LuceneTestCase;
030:
031: import org.apache.lucene.analysis.Analyzer;
032: import org.apache.lucene.analysis.Token;
033: import org.apache.lucene.analysis.TokenFilter;
034: import org.apache.lucene.analysis.TokenStream;
035: import org.apache.lucene.analysis.WhitespaceAnalyzer;
036: import org.apache.lucene.analysis.WhitespaceTokenizer;
037: import org.apache.lucene.document.Document;
038: import org.apache.lucene.document.Field;
039: import org.apache.lucene.store.Directory;
040: import org.apache.lucene.store.FSDirectory;
041: import org.apache.lucene.store.RAMDirectory;
042:
043: public class TestPayloads extends LuceneTestCase {
044:
045: // Simple tests to test the Payload class
046: public void testPayload() throws Exception {
047: byte[] testData = "This is a test!".getBytes();
048: Payload payload = new Payload(testData);
049: assertEquals("Wrong payload length.", testData.length, payload
050: .length());
051:
052: // test copyTo()
053: byte[] target = new byte[testData.length - 1];
054: try {
055: payload.copyTo(target, 0);
056: fail("Expected exception not thrown");
057: } catch (Exception expected) {
058: // expected exception
059: }
060:
061: target = new byte[testData.length + 3];
062: payload.copyTo(target, 3);
063:
064: for (int i = 0; i < testData.length; i++) {
065: assertEquals(testData[i], target[i + 3]);
066: }
067:
068: // test toByteArray()
069: target = payload.toByteArray();
070: assertByteArrayEquals(testData, target);
071:
072: // test byteAt()
073: for (int i = 0; i < testData.length; i++) {
074: assertEquals(payload.byteAt(i), testData[i]);
075: }
076:
077: try {
078: payload.byteAt(testData.length + 1);
079: fail("Expected exception not thrown");
080: } catch (Exception expected) {
081: // expected exception
082: }
083:
084: Payload clone = (Payload) payload.clone();
085: assertEquals(payload.length(), clone.length());
086: for (int i = 0; i < payload.length(); i++) {
087: assertEquals(payload.byteAt(i), clone.byteAt(i));
088: }
089:
090: }
091:
092: // Tests whether the DocumentWriter and SegmentMerger correctly enable the
093: // payload bit in the FieldInfo
094: public void testPayloadFieldBit() throws Exception {
095: Directory ram = new RAMDirectory();
096: PayloadAnalyzer analyzer = new PayloadAnalyzer();
097: IndexWriter writer = new IndexWriter(ram, analyzer, true);
098: Document d = new Document();
099: // this field won't have any payloads
100: d.add(new Field("f1", "This field has no payloads",
101: Field.Store.NO, Field.Index.TOKENIZED));
102: // this field will have payloads in all docs, however not for all term positions,
103: // so this field is used to check if the DocumentWriter correctly enables the payloads bit
104: // even if only some term positions have payloads
105: d.add(new Field("f2", "This field has payloads in all docs",
106: Field.Store.NO, Field.Index.TOKENIZED));
107: d.add(new Field("f2", "This field has payloads in all docs",
108: Field.Store.NO, Field.Index.TOKENIZED));
109: // this field is used to verify if the SegmentMerger enables payloads for a field if it has payloads
110: // enabled in only some documents
111: d.add(new Field("f3", "This field has payloads in some docs",
112: Field.Store.NO, Field.Index.TOKENIZED));
113: // only add payload data for field f2
114: analyzer.setPayloadData("f2", 1, "somedata".getBytes(), 0, 1);
115: writer.addDocument(d);
116: // flush
117: writer.close();
118:
119: // only one segment in the index, so we can cast to SegmentReader
120: SegmentReader reader = (SegmentReader) IndexReader.open(ram);
121: FieldInfos fi = reader.fieldInfos();
122: assertFalse("Payload field bit should not be set.", fi
123: .fieldInfo("f1").storePayloads);
124: assertTrue("Payload field bit should be set.", fi
125: .fieldInfo("f2").storePayloads);
126: assertFalse("Payload field bit should not be set.", fi
127: .fieldInfo("f3").storePayloads);
128: reader.close();
129:
130: // now we add another document which has payloads for field f3 and verify if the SegmentMerger
131: // enabled payloads for that field
132: writer = new IndexWriter(ram, analyzer, true);
133: d = new Document();
134: d.add(new Field("f1", "This field has no payloads",
135: Field.Store.NO, Field.Index.TOKENIZED));
136: d.add(new Field("f2", "This field has payloads in all docs",
137: Field.Store.NO, Field.Index.TOKENIZED));
138: d.add(new Field("f2", "This field has payloads in all docs",
139: Field.Store.NO, Field.Index.TOKENIZED));
140: d.add(new Field("f3", "This field has payloads in some docs",
141: Field.Store.NO, Field.Index.TOKENIZED));
142: // add payload data for field f2 and f3
143: analyzer.setPayloadData("f2", "somedata".getBytes(), 0, 1);
144: analyzer.setPayloadData("f3", "somedata".getBytes(), 0, 3);
145: writer.addDocument(d);
146: // force merge
147: writer.optimize();
148: // flush
149: writer.close();
150:
151: // only one segment in the index, so we can cast to SegmentReader
152: reader = (SegmentReader) IndexReader.open(ram);
153: fi = reader.fieldInfos();
154: assertFalse("Payload field bit should not be set.", fi
155: .fieldInfo("f1").storePayloads);
156: assertTrue("Payload field bit should be set.", fi
157: .fieldInfo("f2").storePayloads);
158: assertTrue("Payload field bit should be set.", fi
159: .fieldInfo("f3").storePayloads);
160: reader.close();
161: }
162:
163: // Tests if payloads are correctly stored and loaded using both RamDirectory and FSDirectory
164: public void testPayloadsEncoding() throws Exception {
165: // first perform the test using a RAMDirectory
166: Directory dir = new RAMDirectory();
167: performTest(dir);
168:
169: // now use a FSDirectory and repeat same test
170: String dirName = "test_payloads";
171: dir = FSDirectory.getDirectory(dirName);
172: performTest(dir);
173: rmDir(dirName);
174: }
175:
176: // builds an index with payloads in the given Directory and performs
177: // different tests to verify the payload encoding
178: private void performTest(Directory dir) throws Exception {
179: PayloadAnalyzer analyzer = new PayloadAnalyzer();
180: IndexWriter writer = new IndexWriter(dir, analyzer, true);
181:
182: // should be in sync with value in TermInfosWriter
183: final int skipInterval = 16;
184:
185: final int numTerms = 5;
186: final String fieldName = "f1";
187:
188: int numDocs = skipInterval + 1;
189: // create content for the test documents with just a few terms
190: Term[] terms = generateTerms(fieldName, numTerms);
191: StringBuffer sb = new StringBuffer();
192: for (int i = 0; i < terms.length; i++) {
193: sb.append(terms[i].text);
194: sb.append(" ");
195: }
196: String content = sb.toString();
197:
198: int payloadDataLength = numTerms * numDocs * 2 + numTerms
199: * numDocs * (numDocs - 1) / 2;
200: byte[] payloadData = generateRandomData(payloadDataLength);
201:
202: Document d = new Document();
203: d.add(new Field(fieldName, content, Field.Store.NO,
204: Field.Index.TOKENIZED));
205: // add the same document multiple times to have the same payload lengths for all
206: // occurrences within two consecutive skip intervals
207: int offset = 0;
208: for (int i = 0; i < 2 * numDocs; i++) {
209: analyzer.setPayloadData(fieldName, payloadData, offset, 1);
210: offset += numTerms;
211: writer.addDocument(d);
212: }
213:
214: // make sure we create more than one segment to test merging
215: writer.flush();
216:
217: // now we make sure to have different payload lengths next at the next skip point
218: for (int i = 0; i < numDocs; i++) {
219: analyzer.setPayloadData(fieldName, payloadData, offset, i);
220: offset += i * numTerms;
221: writer.addDocument(d);
222: }
223:
224: writer.optimize();
225: // flush
226: writer.close();
227:
228: /*
229: * Verify the index
230: * first we test if all payloads are stored correctly
231: */
232: IndexReader reader = IndexReader.open(dir);
233:
234: byte[] verifyPayloadData = new byte[payloadDataLength];
235: offset = 0;
236: TermPositions[] tps = new TermPositions[numTerms];
237: for (int i = 0; i < numTerms; i++) {
238: tps[i] = reader.termPositions(terms[i]);
239: }
240:
241: while (tps[0].next()) {
242: for (int i = 1; i < numTerms; i++) {
243: tps[i].next();
244: }
245: int freq = tps[0].freq();
246:
247: for (int i = 0; i < freq; i++) {
248: for (int j = 0; j < numTerms; j++) {
249: tps[j].nextPosition();
250: tps[j].getPayload(verifyPayloadData, offset);
251: offset += tps[j].getPayloadLength();
252: }
253: }
254: }
255:
256: for (int i = 0; i < numTerms; i++) {
257: tps[i].close();
258: }
259:
260: assertByteArrayEquals(payloadData, verifyPayloadData);
261:
262: /*
263: * test lazy skipping
264: */
265: TermPositions tp = reader.termPositions(terms[0]);
266: tp.next();
267: tp.nextPosition();
268: // now we don't read this payload
269: tp.nextPosition();
270: assertEquals("Wrong payload length.", 1, tp.getPayloadLength());
271: byte[] payload = tp.getPayload(null, 0);
272: assertEquals(payload[0], payloadData[numTerms]);
273: tp.nextPosition();
274:
275: // we don't read this payload and skip to a different document
276: tp.skipTo(5);
277: tp.nextPosition();
278: assertEquals("Wrong payload length.", 1, tp.getPayloadLength());
279: payload = tp.getPayload(null, 0);
280: assertEquals(payload[0], payloadData[5 * numTerms]);
281:
282: /*
283: * Test different lengths at skip points
284: */
285: tp.seek(terms[1]);
286: tp.next();
287: tp.nextPosition();
288: assertEquals("Wrong payload length.", 1, tp.getPayloadLength());
289: tp.skipTo(skipInterval - 1);
290: tp.nextPosition();
291: assertEquals("Wrong payload length.", 1, tp.getPayloadLength());
292: tp.skipTo(2 * skipInterval - 1);
293: tp.nextPosition();
294: assertEquals("Wrong payload length.", 1, tp.getPayloadLength());
295: tp.skipTo(3 * skipInterval - 1);
296: tp.nextPosition();
297: assertEquals("Wrong payload length.", 3 * skipInterval - 2
298: * numDocs - 1, tp.getPayloadLength());
299:
300: /*
301: * Test multiple call of getPayload()
302: */
303: tp.getPayload(null, 0);
304: try {
305: // it is forbidden to call getPayload() more than once
306: // without calling nextPosition()
307: tp.getPayload(null, 0);
308: fail("Expected exception not thrown");
309: } catch (Exception expected) {
310: // expected exception
311: }
312:
313: reader.close();
314:
315: // test long payload
316: analyzer = new PayloadAnalyzer();
317: writer = new IndexWriter(dir, analyzer, true);
318: String singleTerm = "lucene";
319:
320: d = new Document();
321: d.add(new Field(fieldName, singleTerm, Field.Store.NO,
322: Field.Index.TOKENIZED));
323: // add a payload whose length is greater than the buffer size of BufferedIndexOutput
324: payloadData = generateRandomData(2000);
325: analyzer.setPayloadData(fieldName, payloadData, 100, 1500);
326: writer.addDocument(d);
327:
328: writer.optimize();
329: // flush
330: writer.close();
331:
332: reader = IndexReader.open(dir);
333: tp = reader.termPositions(new Term(fieldName, singleTerm));
334: tp.next();
335: tp.nextPosition();
336:
337: verifyPayloadData = new byte[tp.getPayloadLength()];
338: tp.getPayload(verifyPayloadData, 0);
339: byte[] portion = new byte[1500];
340: System.arraycopy(payloadData, 100, portion, 0, 1500);
341:
342: assertByteArrayEquals(portion, verifyPayloadData);
343: reader.close();
344:
345: }
346:
347: private static Random rnd = new Random();
348:
349: private static void generateRandomData(byte[] data) {
350: rnd.nextBytes(data);
351: }
352:
353: private static byte[] generateRandomData(int n) {
354: byte[] data = new byte[n];
355: generateRandomData(data);
356: return data;
357: }
358:
359: private Term[] generateTerms(String fieldName, int n) {
360: int maxDigits = (int) (Math.log(n) / Math.log(10));
361: Term[] terms = new Term[n];
362: StringBuffer sb = new StringBuffer();
363: for (int i = 0; i < n; i++) {
364: sb.setLength(0);
365: sb.append("t");
366: int zeros = maxDigits - (int) (Math.log(i) / Math.log(10));
367: for (int j = 0; j < zeros; j++) {
368: sb.append("0");
369: }
370: sb.append(i);
371: terms[i] = new Term(fieldName, sb.toString());
372: }
373: return terms;
374: }
375:
376: private void rmDir(String dir) {
377: File fileDir = new File(dir);
378: if (fileDir.exists()) {
379: File[] files = fileDir.listFiles();
380: if (files != null) {
381: for (int i = 0; i < files.length; i++) {
382: files[i].delete();
383: }
384: }
385: fileDir.delete();
386: }
387: }
388:
389: void assertByteArrayEquals(byte[] b1, byte[] b2) {
390: if (b1.length != b2.length) {
391: fail("Byte arrays have different lengths: " + b1.length
392: + ", " + b2.length);
393: }
394:
395: for (int i = 0; i < b1.length; i++) {
396: if (b1[i] != b2[i]) {
397: fail("Byte arrays different at index " + i + ": "
398: + b1[i] + ", " + b2[i]);
399: }
400: }
401: }
402:
403: /**
404: * This Analyzer uses an WhitespaceTokenizer and PayloadFilter.
405: */
406: private static class PayloadAnalyzer extends Analyzer {
407: Map fieldToData = new HashMap();
408:
409: void setPayloadData(String field, byte[] data, int offset,
410: int length) {
411: fieldToData.put(field, new PayloadData(0, data, offset,
412: length));
413: }
414:
415: void setPayloadData(String field, int numFieldInstancesToSkip,
416: byte[] data, int offset, int length) {
417: fieldToData.put(field, new PayloadData(
418: numFieldInstancesToSkip, data, offset, length));
419: }
420:
421: public TokenStream tokenStream(String fieldName, Reader reader) {
422: PayloadData payload = (PayloadData) fieldToData
423: .get(fieldName);
424: TokenStream ts = new WhitespaceTokenizer(reader);
425: if (payload != null) {
426: if (payload.numFieldInstancesToSkip == 0) {
427: ts = new PayloadFilter(ts, payload.data,
428: payload.offset, payload.length);
429: } else {
430: payload.numFieldInstancesToSkip--;
431: }
432: }
433: return ts;
434: }
435:
436: private static class PayloadData {
437: byte[] data;
438: int offset;
439: int length;
440: int numFieldInstancesToSkip;
441:
442: PayloadData(int skip, byte[] data, int offset, int length) {
443: numFieldInstancesToSkip = skip;
444: this .data = data;
445: this .offset = offset;
446: this .length = length;
447: }
448: }
449: }
450:
451: /**
452: * This Filter adds payloads to the tokens.
453: */
454: private static class PayloadFilter extends TokenFilter {
455: private byte[] data;
456: private int length;
457: private int offset;
458: Payload payload = new Payload();
459:
460: public PayloadFilter(TokenStream in, byte[] data, int offset,
461: int length) {
462: super (in);
463: this .data = data;
464: this .length = length;
465: this .offset = offset;
466: }
467:
468: public Token next(Token token) throws IOException {
469: token = input.next(token);
470: if (token != null) {
471: if (offset + length <= data.length) {
472: Payload p = null;
473: if (p == null) {
474: p = new Payload();
475: token.setPayload(p);
476: }
477: p.setData(data, offset, length);
478: offset += length;
479: } else {
480: token.setPayload(null);
481: }
482: }
483:
484: return token;
485: }
486: }
487:
488: public void testThreadSafety() throws IOException {
489: final int numThreads = 5;
490: final int numDocs = 50;
491: final ByteArrayPool pool = new ByteArrayPool(numThreads, 5);
492:
493: Directory dir = new RAMDirectory();
494: final IndexWriter writer = new IndexWriter(dir,
495: new WhitespaceAnalyzer());
496: final String field = "test";
497:
498: Thread[] ingesters = new Thread[numThreads];
499: for (int i = 0; i < numThreads; i++) {
500: ingesters[i] = new Thread() {
501: public void run() {
502: try {
503: for (int j = 0; j < numDocs; j++) {
504: Document d = new Document();
505: d
506: .add(new Field(
507: field,
508: new PoolingPayloadTokenStream(
509: pool)));
510: writer.addDocument(d);
511: }
512: } catch (Exception e) {
513: e.printStackTrace();
514: fail(e.toString());
515: }
516: }
517: };
518: ingesters[i].start();
519: }
520:
521: for (int i = 0; i < numThreads; i++) {
522: try {
523: ingesters[i].join();
524: } catch (InterruptedException e) {
525: }
526: }
527: writer.close();
528: IndexReader reader = IndexReader.open(dir);
529: TermEnum terms = reader.terms();
530: while (terms.next()) {
531: TermPositions tp = reader.termPositions(terms.term());
532: while (tp.next()) {
533: int freq = tp.freq();
534: for (int i = 0; i < freq; i++) {
535: tp.nextPosition();
536: String s = new String(tp.getPayload(new byte[5], 0));
537: assertEquals(s, terms.term().text);
538: }
539: }
540: tp.close();
541: }
542: terms.close();
543: reader.close();
544:
545: assertEquals(pool.size(), numThreads);
546: }
547:
548: private static class PoolingPayloadTokenStream extends TokenStream {
549: private byte[] payload;
550: private boolean first;
551: private ByteArrayPool pool;
552:
553: PoolingPayloadTokenStream(ByteArrayPool pool) {
554: this .pool = pool;
555: payload = pool.get();
556: generateRandomData(payload);
557: first = true;
558: }
559:
560: public Token next() throws IOException {
561: if (!first)
562: return null;
563: Token t = new Token(new String(payload), 0, 0);
564: t.setPayload(new Payload(payload));
565: return t;
566: }
567:
568: public void close() throws IOException {
569: pool.release(payload);
570: }
571:
572: }
573:
574: private static class ByteArrayPool {
575: private List pool;
576:
577: ByteArrayPool(int capacity, int size) {
578: pool = new ArrayList();
579: for (int i = 0; i < capacity; i++) {
580: pool.add(new byte[size]);
581: }
582: }
583:
584: synchronized byte[] get() {
585: return (byte[]) pool.remove(0);
586: }
587:
588: synchronized void release(byte[] b) {
589: pool.add(b);
590: }
591:
592: synchronized int size() {
593: return pool.size();
594: }
595: }
596: }
|