001: /*
002: * This file is part of PFIXCORE.
003: *
004: * PFIXCORE is free software; you can redistribute it and/or modify
005: * it under the terms of the GNU Lesser General Public License as published by
006: * the Free Software Foundation; either version 2 of the License, or
007: * (at your option) any later version.
008: *
009: * PFIXCORE is distributed in the hope that it will be useful,
010: * but WITHOUT ANY WARRANTY; without even the implied warranty of
011: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
012: * GNU Lesser General Public License for more details.
013: *
014: * You should have received a copy of the GNU Lesser General Public License
015: * along with PFIXCORE; if not, write to the Free Software
016: * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
017: *
018: */
019:
020: package de.schlund.pfixcore.lucefix;
021:
022: import java.io.File;
023: import java.io.IOException;
024: import java.util.Collection;
025: import java.util.LinkedList;
026: import java.util.Queue;
027: import java.util.Vector;
028:
029: import org.apache.log4j.Logger;
030: import org.apache.lucene.analysis.Analyzer;
031: import org.apache.lucene.document.DateField;
032: import org.apache.lucene.document.Document;
033: import org.apache.lucene.index.IndexReader;
034: import org.apache.lucene.index.IndexWriter;
035: import org.apache.lucene.index.Term;
036: import org.apache.lucene.search.Hits;
037: import org.apache.lucene.search.IndexSearcher;
038: import org.apache.lucene.search.TermQuery;
039: import org.xml.sax.SAXException;
040:
041: import de.schlund.pfixxml.XMLException;
042: import de.schlund.pfixxml.config.GlobalConfig;
043:
044: /**
045: * @author schuppi
046: * @date Jun 24, 2005
047: */
048: public class PfixQueueManager implements Runnable {
049:
050: private static PfixQueueManager _instance = null;
051: private final static Logger LOG = Logger
052: .getLogger((PfixQueueManager.class));
053: public static final String WAITMS_PROP = "lucefix.queueidle";
054: public static String lucene_data_path;
055: private Queue<Tripel> queue = new LinkedList<Tripel>();
056: private DocumentCache cache = null;
057: private IndexReader reader = null;
058: private IndexSearcher searcher = null;
059: private IndexWriter writer = null;
060: private Collection<Document> documents2write = null;
061: private int waitms = -1;
062: private Analyzer analyzer = PreDoc.ANALYZER;
063: protected Object mutex = new Object();
064:
065: /**
066: * @param p
067: * @throws XMLException
068: */
069: public PfixQueueManager(Integer idletime) {
070:
071: waitms = idletime;
072: lucene_data_path = (new File(GlobalConfig.getDocroot(),
073: ".index")).getAbsolutePath();
074:
075: documents2write = new Vector<Document>();
076: }
077:
078: /*
079: * @see java.lang.Runnable#run()
080: */
081: public void run() {
082: Tripel current;
083: long startLoop, stopLoop;
084: int added, updated, removed, size;
085: cache = new DocumentCache();
086: while (true) {
087: startLoop = System.currentTimeMillis();
088: added = updated = removed = size = 0;
089: queueloop: while ((current = queue.poll()) != null) {
090: try {
091: if (current.getType() == Tripel.Type.INSERT
092: || current.getType() == Tripel.Type.EDITORUPDATE) {
093: try {
094: if (reader == null)
095: reader = IndexReader
096: .open(lucene_data_path);
097: } catch (IOException e) {
098: createDB();
099: reader = IndexReader.open(lucene_data_path);
100: }
101: if (size == 0)
102: size = reader.numDocs();
103: if (searcher == null)
104: searcher = new IndexSearcher(reader);
105:
106: Term term = new Term("path", current.getPath());
107: TermQuery query = new TermQuery(term);
108: Hits hits = searcher.search(query);
109: if (hits.length() == 0) {
110: // current queued is NOT indexed
111: Document newdoc = cache
112: .getDocument(current);
113: if (newdoc == null) {
114: // this just happens too often to log it
115: // LOG.debug("wanted to work on " + current + " but there is no part for it...");
116: continue queueloop;
117: }
118: documents2write.add(newdoc);
119: added++;
120: cache.remove(newdoc);
121: } else if (hits.length() == 1) {
122: File f = new File(
123: GlobalConfig.getDocroot(), current
124: .getFilename());
125:
126: // File f = new File(current.getPath());
127: if (f.lastModified() == DateField
128: .stringToTime(hits.doc(0).get(
129: "lasttouch"))) {
130: cache.remove(hits.doc(0));
131: LOG
132: .debug("TS is ok, discarding action: "
133: + term);
134: } else {
135: // ts differs, remove outdaten from index
136: // and
137: // add the new
138: Document newDoc = cache
139: .getDocument(current);
140: reader.delete(term);
141: if (newDoc == null) {
142: continue queueloop;
143: }
144: documents2write.add(newDoc);
145: cache.remove(newDoc);
146: updated++;
147: }
148: } else {
149: LOG.error("multihit for unique term: "
150: + term);
151: }
152: } else if (current.getType() == Tripel.Type.DELETE) {
153: if (reader == null)
154: reader = IndexReader.open(lucene_data_path);
155: if (size == 0)
156: size = reader.numDocs();
157: reader.delete(new Term("path", current
158: .getPath()));
159: removed++;
160: } else {
161: LOG.error("unsupported tripeltype, discarding");
162: }
163: } catch (IOException e) {
164: LOG.error("error in " + getClass(), e);
165: } catch (SAXException e) {
166: LOG.error("error parsing " + current.getPath(), e);
167: }
168: }
169:
170: try {
171: if (searcher != null)
172: searcher.close();
173: searcher = null;
174: if (reader != null)
175: reader.close();
176: reader = null;
177:
178: if (documents2write.size() > 0) {
179: if (writer == null)
180: writer = new IndexWriter(lucene_data_path,
181: analyzer, false);
182:
183: for (Document doc : documents2write) {
184: writer.addDocument(doc);
185: }
186: writer.optimize();
187: writer.close();
188: writer = null;
189: }
190:
191: } catch (IOException e) {
192: LOG.error("error writing new index", e);
193: try {
194: if (writer != null)
195: writer.close();
196: } catch (IOException e1) {
197: LOG.error("unable to close indexwriter...", e1);
198: e1.printStackTrace();
199: }
200: }
201:
202: documents2write.clear();
203: cache.flush();
204: size += added - removed;
205: size = Math.abs(size);
206: stopLoop = System.currentTimeMillis();
207: long needed = stopLoop - startLoop;
208: if (added != 0 || updated != 0 || removed != 0) {
209: LOG.debug(needed + "ms | " + added + " new docs, "
210: + updated + " updated docs, " + removed
211: + " deleted docs | indexsize: " + (size)
212: + " | cacheratio: " + cache.getFound() + "/"
213: + cache.getMissed());
214: }
215: cache.resetStatistic();
216: // }
217: try {
218: Thread.sleep(waitms);
219: } catch (InterruptedException e) {
220: }
221: }
222: }
223:
224: private void createDB() throws IOException {
225: LOG.debug("created db");
226: writer = new IndexWriter(lucene_data_path, analyzer, true);
227: writer.optimize();
228: writer.close();
229: writer = null;
230: }
231:
232: /**
233: * @param p
234: * @return
235: * @throws XMLException
236: */
237: public static synchronized PfixQueueManager getInstance(
238: Integer idletime) {
239: if (_instance == null)
240: _instance = new PfixQueueManager(idletime);
241: return _instance;
242: }
243:
244: public void queue(Tripel newTripel) {
245: synchronized (queue) {
246: queue.offer(newTripel);
247: }
248: }
249: }
|