001: /*
002: * Copyright 2004-2006 the original author or authors.
003: *
004: * Licensed under the Apache License, Version 2.0 (the "License");
005: * you may not use this file except in compliance with the License.
006: * You may obtain a copy of the License at
007: *
008: * http://www.apache.org/licenses/LICENSE-2.0
009: *
010: * Unless required by applicable law or agreed to in writing, software
011: * distributed under the License is distributed on an "AS IS" BASIS,
012: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013: * See the License for the specific language governing permissions and
014: * limitations under the License.
015: */
016:
017: package org.compass.core.lucene.engine.transaction.readcommitted;
018:
019: import java.io.IOException;
020: import java.util.ArrayList;
021: import java.util.HashMap;
022: import java.util.Map;
023: import java.util.concurrent.Callable;
024:
025: import org.apache.commons.logging.Log;
026: import org.apache.commons.logging.LogFactory;
027: import org.apache.lucene.analysis.Analyzer;
028: import org.apache.lucene.index.IndexReader;
029: import org.apache.lucene.index.IndexWriter;
030: import org.apache.lucene.index.MultiReader;
031: import org.apache.lucene.index.Term;
032: import org.apache.lucene.index.TermDocs;
033: import org.apache.lucene.search.Filter;
034: import org.apache.lucene.search.Hits;
035: import org.apache.lucene.search.IndexSearcher;
036: import org.apache.lucene.search.MultiSearcher;
037: import org.apache.lucene.search.Query;
038: import org.apache.lucene.search.Searcher;
039: import org.apache.lucene.store.Directory;
040: import org.compass.core.Resource;
041: import org.compass.core.engine.SearchEngineException;
042: import org.compass.core.lucene.engine.DefaultLuceneSearchEngineHits;
043: import org.compass.core.lucene.engine.EmptyLuceneSearchEngineHits;
044: import org.compass.core.lucene.engine.LuceneSearchEngineHits;
045: import org.compass.core.lucene.engine.LuceneSearchEngineInternalSearch;
046: import org.compass.core.lucene.engine.LuceneSearchEngineQuery;
047: import org.compass.core.lucene.engine.manager.LuceneIndexHolder;
048: import org.compass.core.lucene.engine.transaction.AbstractTransaction;
049: import org.compass.core.lucene.util.ChainedFilter;
050: import org.compass.core.lucene.util.LuceneUtils;
051: import org.compass.core.spi.InternalResource;
052: import org.compass.core.spi.ResourceKey;
053: import org.compass.core.transaction.context.TransactionalCallable;
054: import org.compass.core.util.StringUtils;
055:
056: /**
057: * Read Committed transaction support. Allows to perform operations within a transaction and all
058: * operations will be "viewable" to the ongoing transaction, even search.
059: *
060: * @author kimchy
061: */
062: public class ReadCommittedTransaction extends AbstractTransaction {
063:
064: private static final Log log = LogFactory
065: .getLog(ReadCommittedTransaction.class);
066:
067: private TransIndexManager transIndexManager;
068:
069: private Map<String, IndexWriter> indexWriterBySubIndex = new HashMap<String, IndexWriter>();
070:
071: private BitSetByAliasFilter filter;
072:
073: private Map<String, LuceneIndexHolder> indexHoldersBySubIndex = new HashMap<String, LuceneIndexHolder>();
074:
075: protected void doBegin() throws SearchEngineException {
076: this .transIndexManager = new TransIndexManager(searchEngine
077: .getSearchEngineFactory());
078: this .transIndexManager.configure(searchEngine.getSettings());
079: this .filter = new BitSetByAliasFilter();
080: }
081:
082: protected void doRollback() throws SearchEngineException {
083: releaseHolders();
084: SearchEngineException lastException = null;
085: for (Map.Entry<String, IndexWriter> entry : indexWriterBySubIndex
086: .entrySet()) {
087: try {
088: entry.getValue().abort();
089: } catch (IOException e) {
090: Directory dir = indexManager.getStore().openDirectory(
091: entry.getKey());
092: try {
093: if (IndexReader.isLocked(dir)) {
094: IndexReader.unlock(dir);
095: }
096: } catch (Exception e1) {
097: log.warn(
098: "Failed to check for locks or unlock failed commit for sub index ["
099: + entry.getKey() + "]", e);
100: }
101: lastException = new SearchEngineException(
102: "Failed to rollback sub index ["
103: + entry.getKey() + "]", e);
104: }
105: }
106: if (lastException != null) {
107: throw lastException;
108: }
109: }
110:
111: protected void doPrepare() throws SearchEngineException {
112: releaseHolders();
113: ArrayList<Callable<Object>> prepareCallables = new ArrayList<Callable<Object>>();
114: for (String subIndex : indexWriterBySubIndex.keySet()) {
115: if (!transIndexManager.hasTransIndex(subIndex)) {
116: continue;
117: }
118: prepareCallables.add(new TransactionalCallable(indexManager
119: .getTransactionContext(), new PrepareCallable(
120: subIndex)));
121: }
122: indexManager.getExecutorManager()
123: .invokeAllWithLimitBailOnException(prepareCallables, 1);
124: }
125:
126: protected void doCommit(boolean onePhase)
127: throws SearchEngineException {
128: releaseHolders();
129: ArrayList<Callable<Object>> commitCallables = new ArrayList<Callable<Object>>();
130: for (Map.Entry<String, IndexWriter> entry : indexWriterBySubIndex
131: .entrySet()) {
132: commitCallables.add(new TransactionalCallable(indexManager
133: .getTransactionContext(), new CommitCallable(entry
134: .getKey(), entry.getValue(), onePhase)));
135: }
136: indexManager.getExecutorManager()
137: .invokeAllWithLimitBailOnException(commitCallables, 1);
138: }
139:
140: protected LuceneSearchEngineInternalSearch doInternalSearch(
141: String[] subIndexes, String[] aliases)
142: throws SearchEngineException {
143: ArrayList<LuceneIndexHolder> indexHoldersToClose = new ArrayList<LuceneIndexHolder>();
144: try {
145: String[] calcSubIndexes = indexManager.getStore()
146: .calcSubIndexes(subIndexes, aliases);
147: ArrayList<IndexSearcher> searchers = new ArrayList<IndexSearcher>();
148: for (String subIndex : calcSubIndexes) {
149: LuceneIndexHolder indexHolder = indexHoldersBySubIndex
150: .get(subIndex);
151: if (indexHolder == null) {
152: indexHolder = indexManager
153: .openIndexHolderBySubIndex(subIndex);
154: indexHoldersToClose.add(indexHolder);
155: }
156: if (indexHolder.getIndexReader().numDocs() > 0) {
157: searchers.add(indexHolder.getIndexSearcher());
158: }
159: if (transIndexManager.hasTransIndex(subIndex)) {
160: searchers.add(transIndexManager
161: .getSearcher(subIndex));
162: }
163: }
164: if (searchers.size() == 0) {
165: return new LuceneSearchEngineInternalSearch(null, null);
166: }
167: MultiSearcher indexSeracher = new MultiSearcher(searchers
168: .toArray(new Searcher[searchers.size()]));
169: return new LuceneSearchEngineInternalSearch(indexSeracher,
170: indexHoldersToClose);
171: } catch (IOException e) {
172: for (LuceneIndexHolder indexHolder : indexHoldersToClose) {
173: indexHolder.release();
174: }
175: throw new SearchEngineException(
176: "Failed to open Lucene reader/searcher", e);
177: }
178: }
179:
180: protected LuceneSearchEngineHits doFind(
181: LuceneSearchEngineQuery query) throws SearchEngineException {
182: LuceneSearchEngineInternalSearch internalSearch = (LuceneSearchEngineInternalSearch) internalSearch(
183: query.getSubIndexes(), query.getAliases());
184: if (internalSearch.isEmpty()) {
185: return new EmptyLuceneSearchEngineHits();
186: }
187: Filter qFilter = null;
188: if (filter.hasDeletes()) {
189: if (query.getFilter() == null) {
190: qFilter = filter;
191: } else {
192: qFilter = new ChainedFilter(new Filter[] { filter,
193: query.getFilter().getFilter() },
194: ChainedFilter.ChainedFilterType.AND);
195: }
196: } else {
197: if (query.getFilter() != null) {
198: qFilter = query.getFilter().getFilter();
199: }
200: }
201: Hits hits = findByQuery(internalSearch, query, qFilter);
202: return new DefaultLuceneSearchEngineHits(hits, searchEngine,
203: query, internalSearch);
204: }
205:
206: public Resource[] get(ResourceKey resourceKey)
207: throws SearchEngineException {
208: Searcher indexSearcher = null;
209: IndexReader indexReader = null;
210: LuceneIndexHolder indexHolder = null;
211: boolean releaseHolder = false;
212: boolean closeReaderAndSearcher = false;
213: try {
214: String subIndex = resourceKey.getSubIndex();
215: indexHolder = indexHoldersBySubIndex.get(subIndex);
216: if (indexHolder == null) {
217: indexHolder = indexManager
218: .openIndexHolderBySubIndex(subIndex);
219: releaseHolder = true;
220: } else {
221: releaseHolder = false;
222: }
223: if (transIndexManager.hasTransIndex(subIndex)) {
224: closeReaderAndSearcher = true;
225: indexReader = new MultiReader(new IndexReader[] {
226: indexHolder.getIndexReader(),
227: transIndexManager.getReader(subIndex) }, false);
228: // note, we need to create a multi searcher here instead of a searcher ontop of the MultiReader
229: // since our filter relies on specific reader per searcher
230: indexSearcher = new MultiSearcher(
231: new Searcher[] {
232: new IndexSearcher(indexHolder
233: .getIndexReader()),
234: transIndexManager.getSearcher(subIndex) });
235: } else {
236: indexReader = indexHolder.getIndexReader();
237: indexSearcher = indexHolder.getIndexSearcher();
238: }
239: if (filter.hasDeletes()) {
240: // TODO we can do better with HitCollector
241: Query query = LuceneUtils
242: .buildResourceLoadQuery(resourceKey);
243: Hits hits = indexSearcher.search(query, filter);
244: return LuceneUtils.hitsToResourceArray(hits,
245: searchEngine);
246: } else {
247: Term t = new Term(resourceKey.getUIDPath(), resourceKey
248: .buildUID());
249: TermDocs termDocs = null;
250: try {
251: termDocs = indexReader.termDocs(t);
252: if (termDocs != null) {
253: return LuceneUtils.hitsToResourceArray(
254: termDocs, indexReader, searchEngine);
255: } else {
256: return new Resource[0];
257: }
258: } catch (IOException e) {
259: throw new SearchEngineException(
260: "Failed to search for property ["
261: + resourceKey + "]", e);
262: } finally {
263: try {
264: if (termDocs != null) {
265: termDocs.close();
266: }
267: } catch (IOException e) {
268: // swallow it
269: }
270: }
271: }
272: } catch (IOException e) {
273: throw new SearchEngineException(
274: "Failed to find for alias ["
275: + resourceKey.getAlias()
276: + "] and ids ["
277: + StringUtils
278: .arrayToCommaDelimitedString(resourceKey
279: .getIds()) + "]", e);
280: } finally {
281: if (indexHolder != null && releaseHolder) {
282: indexHolder.release();
283: }
284: if (closeReaderAndSearcher) {
285: try {
286: indexSearcher.close();
287: } catch (Exception e) {
288: // ignore
289: }
290: try {
291: indexReader.close();
292: } catch (Exception e) {
293: // ignore
294: }
295: }
296: }
297: }
298:
299: protected void doCreate(InternalResource resource, Analyzer analyzer)
300: throws SearchEngineException {
301: try {
302: openIndexWriterIfNeeded(resource.getSubIndex());
303: transIndexManager.create(resource, analyzer);
304: } catch (IOException e) {
305: throw new SearchEngineException(
306: "Failed to create resource for alias ["
307: + resource.getAlias() + "] and resource "
308: + resource, e);
309: }
310:
311: }
312:
313: protected void doDelete(ResourceKey resourceKey)
314: throws SearchEngineException {
315: try {
316: openIndexWriterIfNeeded(resourceKey.getSubIndex());
317:
318: LuceneIndexHolder indexHolder = indexHoldersBySubIndex
319: .get(resourceKey.getSubIndex());
320: if (indexHolder == null) {
321: indexManager.refreshCache(resourceKey.getSubIndex());
322: indexHolder = indexManager
323: .openIndexHolderBySubIndex(resourceKey
324: .getSubIndex());
325: indexHoldersBySubIndex.put(resourceKey.getSubIndex(),
326: indexHolder);
327: }
328:
329: // mark the deleted term in the filter
330: Term deleteTerm = new Term(resourceKey.getUIDPath(),
331: resourceKey.buildUID());
332: TermDocs termDocs = null;
333: try {
334: termDocs = indexHolder.getIndexReader().termDocs(
335: deleteTerm);
336: if (termDocs != null) {
337: int maxDoc = indexHolder.getIndexReader().maxDoc();
338: try {
339: while (termDocs.next()) {
340: filter.markDelete(indexHolder
341: .getIndexReader(), termDocs.doc(),
342: maxDoc);
343: }
344: } catch (IOException e) {
345: throw new SearchEngineException(
346: "Failed to iterate data in order to delete",
347: e);
348: }
349: }
350: } catch (IOException e) {
351: throw new SearchEngineException(
352: "Failed to search for property [" + resourceKey
353: + "]", e);
354: } finally {
355: try {
356: if (termDocs != null) {
357: termDocs.close();
358: }
359: } catch (IOException e) {
360: // swallow it
361: }
362: }
363:
364: // delete from the original index (autoCommit is false, so won't be committed
365: indexWriterBySubIndex.get(resourceKey.getSubIndex())
366: .deleteDocuments(deleteTerm);
367:
368: // and delete it (if there) from the transactional index
369: transIndexManager.delete(resourceKey);
370: } catch (IOException e) {
371: throw new SearchEngineException("Failed to delete alias ["
372: + resourceKey.getAlias()
373: + "] and ids ["
374: + StringUtils
375: .arrayToCommaDelimitedString(resourceKey
376: .getIds()) + "]", e);
377: }
378: }
379:
380: public void flush() throws SearchEngineException {
381: // TODO maybe flush here the trans index manager?
382: }
383:
384: protected void openIndexWriterIfNeeded(String subIndex)
385: throws IOException {
386: if (indexWriterBySubIndex.containsKey(subIndex)) {
387: return;
388: }
389: IndexWriter indexWriter = indexManager.openIndexWriter(
390: searchEngine.getSettings(), subIndex, false);
391: indexWriterBySubIndex.put(subIndex, indexWriter);
392: }
393:
394: private void releaseHolders() {
395: for (LuceneIndexHolder indexHolder : indexHoldersBySubIndex
396: .values()) {
397: indexHolder.release();
398: }
399: indexHoldersBySubIndex.clear();
400: }
401:
402: private class PrepareCallable implements Callable {
403:
404: private String subIndex;
405:
406: private PrepareCallable(String subIndex) {
407: this .subIndex = subIndex;
408: }
409:
410: public Object call() throws Exception {
411: if (transIndexManager.hasTransIndex(subIndex)) {
412: transIndexManager.commit(subIndex);
413: }
414: return null;
415: }
416: }
417:
418: private class CommitCallable implements Callable {
419:
420: private String subIndex;
421:
422: private IndexWriter indexWriter;
423:
424: private PrepareCallable prepareCallable;
425:
426: public CommitCallable(String subIndex, IndexWriter indexWriter,
427: boolean onePhase) {
428: this .subIndex = subIndex;
429: this .indexWriter = indexWriter;
430: if (onePhase) {
431: prepareCallable = new PrepareCallable(subIndex);
432: }
433: }
434:
435: public Object call() throws Exception {
436: if (prepareCallable != null) {
437: prepareCallable.call();
438: }
439: // onlt add indexes if there is a transactional index
440: try {
441: if (transIndexManager.hasTransIndex(subIndex)) {
442: Directory transDir = transIndexManager
443: .getDirectory(subIndex);
444: indexWriter
445: .addIndexesNoOptimize(new Directory[] { transDir });
446: }
447: indexWriter.close();
448: } catch (IOException e) {
449: Directory dir = indexManager.getStore().openDirectory(
450: subIndex);
451: try {
452: if (IndexReader.isLocked(dir)) {
453: IndexReader.unlock(dir);
454: }
455: } catch (Exception e1) {
456: log.warn(
457: "Failed to check for locks or unlock failed commit for sub index ["
458: + subIndex + "]", e);
459: }
460: throw new SearchEngineException(
461: "Failed add transaction index to sub index ["
462: + subIndex + "]", e);
463: }
464: if (indexManager.getSettings().isClearCacheOnCommit()) {
465: indexManager.refreshCache(subIndex);
466: }
467: try {
468: transIndexManager.close(subIndex);
469: } catch (IOException e) {
470: log.warn(
471: "Failed to close transactional index for sub index ["
472: + subIndex + "], ignoring", e);
473: }
474: return null;
475: }
476: }
477: }
|