001: /**
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017:
018: /**
019: * @author yonik
020: */package org.apache.solr.update;
021:
022: import org.apache.lucene.index.IndexWriter;
023: import org.apache.lucene.index.IndexReader;
024: import org.apache.lucene.index.TermDocs;
025: import org.apache.lucene.index.Term;
026: import org.apache.lucene.document.Document;
027: import org.apache.lucene.search.Query;
028:
029: import java.util.HashSet;
030: import java.util.concurrent.Future;
031: import java.util.concurrent.ExecutionException;
032: import java.util.logging.Level;
033: import java.io.IOException;
034: import java.net.URL;
035:
036: import org.apache.solr.search.SolrIndexSearcher;
037: import org.apache.solr.search.QueryParsing;
038: import org.apache.solr.util.NamedList;
039: import org.apache.solr.util.SimpleOrderedMap;
040: import org.apache.solr.update.UpdateHandler;
041: import org.apache.solr.core.SolrCore;
042: import org.apache.solr.core.SolrException;
043:
044: /**
045: * <code>DirectUpdateHandler</code> implements an UpdateHandler where documents are added
046: * directly to the main lucene index as opposed to adding to a separate smaller index.
047: * For this reason, not all combinations to/from pending and committed are supported.
048: *
049: * @author yonik
050: * @version $Id: DirectUpdateHandler.java 542679 2007-05-29 22:28:21Z ryan $
051: * @since solr 0.9
052: */
053:
054: public class DirectUpdateHandler extends UpdateHandler {
055:
056: // the set of ids in the "pending set" (those docs that have been added, but
057: // that are not yet visible.
058: final HashSet<String> pset;
059: IndexWriter writer;
060: SolrIndexSearcher searcher;
061: int numAdds = 0; // number of docs added to the pending set
062: int numPending = 0; // number of docs currently in this pending set
063: int numDeleted = 0; // number of docs deleted or
064:
065: public DirectUpdateHandler(SolrCore core) throws IOException {
066: super (core);
067: pset = new HashSet<String>(256);
068: }
069:
070: protected void openWriter() throws IOException {
071: if (writer == null) {
072: writer = createMainIndexWriter("DirectUpdateHandler");
073: }
074: }
075:
076: protected void closeWriter() throws IOException {
077: try {
078: if (writer != null)
079: writer.close();
080: } finally {
081: // TODO: if an exception causes the writelock to not be
082: // released, we could delete it here.
083: writer = null;
084: }
085: }
086:
087: protected void openSearcher() throws IOException {
088: if (searcher == null) {
089: searcher = core.newSearcher("DirectUpdateHandler");
090: }
091: }
092:
093: protected void closeSearcher() throws IOException {
094: try {
095: if (searcher != null)
096: searcher.close();
097: } finally {
098: // TODO: if an exception causes the writelock to not be
099: // released, we could delete it here.
100: searcher = null;
101: }
102: }
103:
104: protected void doAdd(Document doc) throws IOException {
105: closeSearcher();
106: openWriter();
107: writer.addDocument(doc);
108: }
109:
110: protected boolean existsInIndex(String indexedId)
111: throws IOException {
112: if (idField == null)
113: throw new SolrException(
114: SolrException.ErrorCode.BAD_REQUEST,
115: "Operation requires schema to have a unique key field");
116:
117: closeWriter();
118: openSearcher();
119: IndexReader ir = searcher.getReader();
120: TermDocs tdocs = null;
121: boolean exists = false;
122: try {
123: tdocs = ir.termDocs(idTerm(indexedId));
124: if (tdocs.next())
125: exists = true;
126: } finally {
127: try {
128: if (tdocs != null)
129: tdocs.close();
130: } catch (Exception e) {
131: }
132: }
133: return exists;
134: }
135:
136: protected int deleteInIndex(String indexedId) throws IOException {
137: if (idField == null)
138: throw new SolrException(
139: SolrException.ErrorCode.BAD_REQUEST,
140: "Operation requires schema to have a unique key field");
141:
142: closeWriter();
143: openSearcher();
144: IndexReader ir = searcher.getReader();
145: TermDocs tdocs = null;
146: int num = 0;
147: try {
148: Term term = new Term(idField.getName(), indexedId);
149: num = ir.deleteDocuments(term);
150: if (SolrCore.log.isLoggable(Level.FINEST)) {
151: SolrCore.log.finest("deleted " + num
152: + " docs matching id "
153: + idFieldType.indexedToReadable(indexedId));
154: }
155: } finally {
156: try {
157: if (tdocs != null)
158: tdocs.close();
159: } catch (Exception e) {
160: }
161: }
162: return num;
163: }
164:
165: protected void overwrite(String indexedId, Document doc)
166: throws IOException {
167: if (indexedId == null)
168: indexedId = getIndexedId(doc);
169: deleteInIndex(indexedId);
170: doAdd(doc);
171: }
172:
173: /************** Direct update handler - pseudo code ***********
174: def add(doc, id, allowDups, overwritePending, overwriteCommitted):
175: if not overwritePending and not overwriteCommitted:
176: #special case... no need to check pending set, and we don't keep
177: #any state around about this addition
178: if allowDups:
179: committed[id]=doc #100
180: return
181: else:
182: #if no dups allowed, we must check the *current* index (pending and committed)
183: if not committed[id]: committed[id]=doc #000
184: return
185: #001 (searchd addConditionally)
186: if not allowDups and not overwritePending and pending[id]: return
187: del committed[id] #delete from pending and committed 111 011
188: committed[id]=doc
189: pending[id]=True
190: ****************************************************************/
191:
192: // could return the number of docs deleted, but is that always possible to know???
193: public void delete(DeleteUpdateCommand cmd) throws IOException {
194: if (!cmd.fromPending && !cmd.fromCommitted)
195: throw new SolrException(
196: SolrException.ErrorCode.BAD_REQUEST,
197: "meaningless command: " + cmd);
198: if (!cmd.fromPending || !cmd.fromCommitted)
199: throw new SolrException(
200: SolrException.ErrorCode.BAD_REQUEST,
201: "operation not supported" + cmd);
202: String indexedId = idFieldType.toInternal(cmd.id);
203: synchronized (this ) {
204: deleteInIndex(indexedId);
205: pset.remove(indexedId);
206: }
207: }
208:
209: // TODO - return number of docs deleted?
210: // Depending on implementation, we may not be able to immediately determine num...
211: public void deleteByQuery(DeleteUpdateCommand cmd)
212: throws IOException {
213: if (!cmd.fromPending && !cmd.fromCommitted)
214: throw new SolrException(
215: SolrException.ErrorCode.BAD_REQUEST,
216: "meaningless command: " + cmd);
217: if (!cmd.fromPending || !cmd.fromCommitted)
218: throw new SolrException(
219: SolrException.ErrorCode.BAD_REQUEST,
220: "operation not supported" + cmd);
221:
222: Query q = QueryParsing.parseQuery(cmd.query, schema);
223:
224: int totDeleted = 0;
225: synchronized (this ) {
226: closeWriter();
227: openSearcher();
228:
229: // if we want to count the number of docs that were deleted, then
230: // we need a new instance of the DeleteHitCollector
231: final DeleteHitCollector deleter = new DeleteHitCollector(
232: searcher);
233: searcher.search(q, null, deleter);
234: totDeleted = deleter.deleted;
235: }
236:
237: if (SolrCore.log.isLoggable(Level.FINE)) {
238: SolrCore.log.fine("docs deleted:" + totDeleted);
239: }
240:
241: }
242:
243: /**************** old hit collector... new one is in base class
244: // final DeleteHitCollector deleter = new DeleteHitCollector();
245: class DeleteHitCollector extends HitCollector {
246: public int deleted=0;
247: public void collect(int doc, float score) {
248: try {
249: searcher.getReader().delete(doc);
250: deleted++;
251: } catch (IOException e) {
252: try { closeSearcher(); } catch (Exception ee) { SolrException.log(SolrCore.log,ee); }
253: SolrException.log(SolrCore.log,e);
254: throw new SolrException( SolrException.StatusCode.SERVER_ERROR,"Error deleting doc# "+doc,e);
255: }
256: }
257: }
258: ***************************/
259:
260: public void commit(CommitUpdateCommand cmd) throws IOException {
261: Future[] waitSearcher = null;
262: if (cmd.waitSearcher) {
263: waitSearcher = new Future[1];
264: }
265:
266: synchronized (this ) {
267: pset.clear();
268: closeSearcher(); // flush any deletes
269: if (cmd.optimize) {
270: openWriter(); // writer needs to be open to optimize
271: writer.optimize();
272: }
273: closeWriter();
274:
275: callPostCommitCallbacks();
276: if (cmd.optimize) {
277: callPostOptimizeCallbacks();
278: }
279:
280: core.getSearcher(true, false, waitSearcher);
281: }
282:
283: if (waitSearcher != null && waitSearcher[0] != null) {
284: try {
285: waitSearcher[0].get();
286: } catch (InterruptedException e) {
287: SolrException.log(log, e);
288: } catch (ExecutionException e) {
289: SolrException.log(log, e);
290: }
291: }
292:
293: return;
294: }
295:
296: ///////////////////////////////////////////////////////////////////
297: /////////////////// helper method for each add type ///////////////
298: ///////////////////////////////////////////////////////////////////
299:
300: protected int addNoOverwriteNoDups(AddUpdateCommand cmd)
301: throws IOException {
302: if (cmd.indexedId == null) {
303: cmd.indexedId = getIndexedId(cmd.doc);
304: }
305: synchronized (this ) {
306: if (existsInIndex(cmd.indexedId))
307: return 0;
308: doAdd(cmd.doc);
309: }
310: return 1;
311: }
312:
313: protected int addConditionally(AddUpdateCommand cmd)
314: throws IOException {
315: if (cmd.indexedId == null) {
316: cmd.indexedId = getIndexedId(cmd.doc);
317: }
318: synchronized (this ) {
319: if (pset.contains(cmd.indexedId))
320: return 0;
321: // since case 001 is currently the only case to use pset, only add
322: // to it in that instance.
323: pset.add(cmd.indexedId);
324: overwrite(cmd.indexedId, cmd.doc);
325: return 1;
326: }
327: }
328:
329: // overwrite both pending and committed
330: protected synchronized int overwriteBoth(AddUpdateCommand cmd)
331: throws IOException {
332: overwrite(cmd.indexedId, cmd.doc);
333: return 1;
334: }
335:
336: // add without checking
337: protected synchronized int allowDups(AddUpdateCommand cmd)
338: throws IOException {
339: doAdd(cmd.doc);
340: return 1;
341: }
342:
343: public int addDoc(AddUpdateCommand cmd) throws IOException {
344:
345: // if there is no ID field, use allowDups
346: if (idField == null) {
347: cmd.allowDups = true;
348: cmd.overwriteCommitted = false;
349: cmd.overwritePending = false;
350: }
351:
352: if (!cmd.allowDups && !cmd.overwritePending
353: && !cmd.overwriteCommitted) {
354: return addNoOverwriteNoDups(cmd);
355: } else if (!cmd.allowDups && !cmd.overwritePending
356: && cmd.overwriteCommitted) {
357: return addConditionally(cmd);
358: } else if (!cmd.allowDups && cmd.overwritePending
359: && !cmd.overwriteCommitted) {
360: // return overwriteBoth(cmd);
361: throw new SolrException(
362: SolrException.ErrorCode.BAD_REQUEST,
363: "unsupported param combo:" + cmd);
364: } else if (!cmd.allowDups && cmd.overwritePending
365: && cmd.overwriteCommitted) {
366: return overwriteBoth(cmd);
367: } else if (cmd.allowDups && !cmd.overwritePending
368: && !cmd.overwriteCommitted) {
369: return allowDups(cmd);
370: } else if (cmd.allowDups && !cmd.overwritePending
371: && cmd.overwriteCommitted) {
372: // return overwriteBoth(cmd);
373: throw new SolrException(
374: SolrException.ErrorCode.BAD_REQUEST,
375: "unsupported param combo:" + cmd);
376: } else if (cmd.allowDups && cmd.overwritePending
377: && !cmd.overwriteCommitted) {
378: // return overwriteBoth(cmd);
379: throw new SolrException(
380: SolrException.ErrorCode.BAD_REQUEST,
381: "unsupported param combo:" + cmd);
382: } else if (cmd.allowDups && cmd.overwritePending
383: && cmd.overwriteCommitted) {
384: return overwriteBoth(cmd);
385: }
386: throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
387: "unsupported param combo:" + cmd);
388: }
389:
390: public void close() throws IOException {
391: synchronized (this ) {
392: closeSearcher();
393: closeWriter();
394: }
395: }
396:
397: /////////////////////////////////////////////////////////////////////
398: // SolrInfoMBean stuff: Statistics and Module Info
399: /////////////////////////////////////////////////////////////////////
400:
401: public String getName() {
402: return DirectUpdateHandler.class.getName();
403: }
404:
405: public String getVersion() {
406: return SolrCore.version;
407: }
408:
409: public String getDescription() {
410: return "Update handler that directly changes the on-disk main lucene index";
411: }
412:
413: public Category getCategory() {
414: return Category.CORE;
415: }
416:
417: public String getSourceId() {
418: return "$Id: DirectUpdateHandler.java 542679 2007-05-29 22:28:21Z ryan $";
419: }
420:
421: public String getSource() {
422: return "$URL: https://svn.apache.org/repos/asf/lucene/solr/branches/branch-1.2/src/java/org/apache/solr/update/DirectUpdateHandler.java $";
423: }
424:
425: public URL[] getDocs() {
426: return null;
427: }
428:
429: public NamedList getStatistics() {
430: NamedList lst = new SimpleOrderedMap();
431: return lst;
432: }
433:
434: }
|