001: package net.bagaluten.jca.lucene.connector.impl;
002:
003: import java.io.File;
004: import java.io.IOException;
005: import java.io.PrintWriter;
006: import java.util.ArrayList;
007: import java.util.HashMap;
008: import java.util.HashSet;
009: import java.util.Iterator;
010: import java.util.List;
011: import java.util.Map;
012: import java.util.Set;
013: import java.util.logging.Level;
014: import java.util.logging.Logger;
015:
016: import javax.resource.NotSupportedException;
017: import javax.resource.ResourceException;
018: import javax.resource.spi.ConnectionEvent;
019: import javax.resource.spi.ConnectionEventListener;
020: import javax.resource.spi.ConnectionRequestInfo;
021: import javax.resource.spi.LocalTransaction;
022: import javax.resource.spi.ManagedConnection;
023: import javax.resource.spi.ManagedConnectionMetaData;
024: import javax.security.auth.Subject;
025: import javax.transaction.xa.XAResource;
026:
027: import net.bagaluten.jca.lucene.connector.Entries;
028: import net.bagaluten.jca.lucene.connector.Entry;
029: import net.bagaluten.jca.lucene.connector.Field;
030: import net.bagaluten.jca.lucene.connector.QueryField;
031:
032: import org.apache.lucene.analysis.standard.StandardAnalyzer;
033: import org.apache.lucene.document.DateTools;
034: import org.apache.lucene.document.Document;
035: import org.apache.lucene.index.IndexWriter;
036: import org.apache.lucene.queryParser.ParseException;
037: import org.apache.lucene.queryParser.QueryParser;
038: import org.apache.lucene.search.IndexSearcher;
039: import org.apache.lucene.search.Query;
040: import org.apache.lucene.search.TopDocs;
041: import org.apache.lucene.store.FSDirectory;
042:
043: public class LuceneManagedConnection implements ManagedConnection {
044:
045: private static Map<String, Integer> connectionCounter = new HashMap<String, Integer>();
046:
047: private static final Logger log = Logger
048: .getLogger(LuceneManagedConnection.class.getName());
049:
050: private synchronized static void count(boolean up, String key) {
051: int i;
052: if (connectionCounter.containsKey(key)) {
053: i = connectionCounter.get(key).intValue();
054: i = up ? i + 1 : i - 1;
055: } else {
056: i = 1;
057: }
058: connectionCounter.put(key, Integer.valueOf(i));
059: }
060:
061: private static void inc(String key) {
062: count(true, key);
063: }
064:
065: private static void dec(String key) {
066: count(false, key);
067: }
068:
069: private static void debug() {
070: int total = 0;
071: for (java.util.Map.Entry<String, Integer> p : connectionCounter
072: .entrySet()) {
073: log.fine(p.getKey() + ":" + p.getValue());
074: total += p.getValue();
075: }
076: log.fine("total: " + total);
077: }
078:
079: // This is in fact a horrifying construct, but i currently have no better
080: // idea
081: // until lucene provides a clean client interface
082: private static final Map<Field.DateFlag, DateTools.Resolution> DATEFLAGS = new HashMap<Field.DateFlag, DateTools.Resolution>();
083:
084: private static final String FIELD = "";
085:
086: private static final Map<Field.Index, org.apache.lucene.document.Field.Index> INDEXFLAGS = new HashMap<Field.Index, org.apache.lucene.document.Field.Index>();
087:
088: private static final Map<Field.Store, org.apache.lucene.document.Field.Store> STOREFLAGS = new HashMap<Field.Store, org.apache.lucene.document.Field.Store>();
089:
090: static {
091: DATEFLAGS.put(Field.DateFlag.YEAR, DateTools.Resolution.YEAR);
092: DATEFLAGS.put(Field.DateFlag.MONTH, DateTools.Resolution.MONTH);
093: DATEFLAGS.put(Field.DateFlag.DAY, DateTools.Resolution.DAY);
094: DATEFLAGS.put(Field.DateFlag.HOUR, DateTools.Resolution.HOUR);
095: DATEFLAGS.put(Field.DateFlag.MINUTE,
096: DateTools.Resolution.MINUTE);
097: DATEFLAGS.put(Field.DateFlag.SECOND,
098: DateTools.Resolution.SECOND);
099: DATEFLAGS.put(Field.DateFlag.MILLISECOND,
100: DateTools.Resolution.MILLISECOND);
101: // default
102: DATEFLAGS
103: .put(Field.DateFlag.UNSET, DateTools.Resolution.MINUTE);
104:
105: INDEXFLAGS.put(Field.Index.NO,
106: org.apache.lucene.document.Field.Index.NO);
107: INDEXFLAGS.put(Field.Index.NO_NORMS,
108: org.apache.lucene.document.Field.Index.NO_NORMS);
109: INDEXFLAGS.put(Field.Index.UN_TOKENIZED,
110: org.apache.lucene.document.Field.Index.UN_TOKENIZED);
111: INDEXFLAGS.put(Field.Index.TOKENIZED,
112: org.apache.lucene.document.Field.Index.TOKENIZED);
113: // default
114: INDEXFLAGS.put(Field.Index.UNSET,
115: org.apache.lucene.document.Field.Index.TOKENIZED);
116:
117: STOREFLAGS.put(Field.Store.YES,
118: org.apache.lucene.document.Field.Store.YES);
119: STOREFLAGS.put(Field.Store.NO,
120: org.apache.lucene.document.Field.Store.NO);
121: STOREFLAGS.put(Field.Store.COMPRESS,
122: org.apache.lucene.document.Field.Store.COMPRESS);
123: // default
124: STOREFLAGS.put(Field.Store.UNSET,
125: org.apache.lucene.document.Field.Store.NO);
126: }
127:
128: /** Set of application-level handlers */
129: private Set<LuceneConnectionImpl> connectionSet;
130:
131: private String index;
132:
133: private IndexSearcher indexSearcher;
134:
135: /** List of listeners */
136: private List<ConnectionEventListener> listeners;
137:
138: /** Log writer */
139: private PrintWriter logWriter;
140:
141: /** Path to the repository containing files to retireve */
142: private String path;
143:
144: /**
145: * The constructor
146: *
147: * @param conReqInfo
148: * {@link ConnectionRequestInfo}
149: * @param writer
150: * log writer of the factory that calls this constructor
151: * @param luceneIdx
152: * path to the lucene Index
153: * @throws ResourceException
154: * to be done
155: */
156: public LuceneManagedConnection(ConnectionRequestInfo conReqInfo,
157: PrintWriter writer, String luceneIdx)
158: throws ResourceException {
159: log.fine("creating new managed connection");
160: this .logWriter = writer;
161: this .listeners = new ArrayList<ConnectionEventListener>();
162: this .connectionSet = new HashSet<LuceneConnectionImpl>();
163: this .path = luceneIdx;
164: setIndex(conReqInfo);
165: if (log.isLoggable(Level.FINE)) {
166: inc(index);
167: debug();
168: }
169: }
170:
171: public boolean add(Entry entry) {
172: log.info("adding new entry to index '" + index + "'.");
173: IndexWriter writer = null;
174:
175: try {
176:
177: writer = getWriter();
178: writer.addDocument(createDocument(entry));
179: } catch (IOException e) {
180: log.log(Level.SEVERE, "can not write to index: ", e);
181: return false;
182: } finally {
183: if (writer != null) {
184: try {
185: writer.close();
186: } catch (IOException e) {
187: // there is nothing we can do right now
188: }
189: }
190: }
191:
192: return true;
193: }
194:
195: /**
196: * Adds application-level handler to handlers set
197: *
198: * @param con
199: * handler to add
200: */
201: void addConnection(LuceneConnectionImpl con) {
202: log.fine("adding connection to managed connection on index '"
203: + index + "'.");
204: synchronized (this .connectionSet) {
205: connectionSet.add(con);
206: }
207: }
208:
209: /*
210: * @see javax.resource.spi.ManagedConnection#addConnectionEventListener(javax.resource.spi.ConnectionEventListener)
211: */
212: public void addConnectionEventListener(
213: ConnectionEventListener listener) {
214: synchronized (this .listeners) {
215: listeners.add(listener);
216: }
217: }
218:
219: /*
220: * @see javax.resource.spi.ManagedConnection#associateConnection(java.lang.Object)
221: */
222: public void associateConnection(Object conn)
223: throws ResourceException {
224: log.fine("associating connection on index '" + index + "'.");
225: if (!(conn instanceof LuceneConnectionImpl)) {
226: log.severe("internal error - incorrect connection type: "
227: + conn.getClass().getName());
228: throw new ResourceException(
229: "Connection has an incorrect type");
230: }
231: ((LuceneConnectionImpl) conn).associateConnection(this );
232: }
233:
234: public int bulkAdd(Entries entries) throws ResourceException {
235: log.info("bulk adding entries to index '" + index + "'.");
236: IndexWriter writer = null;
237: int count = 0;
238: try {
239:
240: writer = getWriter();
241: for (Entry p : entries) {
242: writer.addDocument(createDocument(p));
243: count++;
244: }
245: } catch (IOException e) {
246: log.log(Level.SEVERE, "can not write to index: ", e);
247: throw new ResourceException("IO Error " + e.getMessage(), e);
248:
249: } finally {
250: if (writer != null) {
251: try {
252: writer.close();
253: } catch (IOException e) {
254: // there is nothing w can do right now
255: }
256: }
257: }
258: return count;
259: }
260:
261: /*
262: * @see javax.resource.spi.ManagedConnection#cleanup()
263: */
264: public void cleanup() throws ResourceException {
265: log.fine("cleaning up index '" + index + "'.");
266: invalidateAllConnections();
267: }
268:
269: /**
270: * Closes connection
271: *
272: * @param con
273: * connection to close
274: */
275: public void close(LuceneConnectionImpl con) {
276: log.fine("Closing connection to index '" + index + "'.");
277: ConnectionEvent event = new ConnectionEvent(this ,
278: ConnectionEvent.CONNECTION_CLOSED);
279: synchronized (this .listeners) {
280: Iterator itr = listeners.iterator();
281: while (itr.hasNext()) {
282: try {
283: ((ConnectionEventListener) itr.next())
284: .connectionClosed(event);
285: } catch (Throwable e) {
286: }
287: }
288: }
289: con.invalidate();
290: removeConnection(con);
291: }
292:
293: private synchronized void closeSearcher() {
294: log.info("pysically closing index connection '" + index + "'.");
295: if (indexSearcher != null) {
296: try {
297: indexSearcher.close();
298: } catch (IOException e) {
299: // nothing we can do about it
300: }
301: indexSearcher = null;
302: }
303: }
304:
305: private Document createDocument(Entry entry) {
306: Document doc = new Document();
307: for (Map.Entry<String, Field> item : entry.entrySet()) {
308: Field f = item.getValue();
309: String s;
310: if (f.isDate()) {
311: s = DateTools.dateToString(f.getDate(), DATEFLAGS.get(f
312: .getDateFlag()));
313: } else {
314: s = f.getString();
315: }
316:
317: doc.add(new org.apache.lucene.document.Field(item.getKey(),
318: s, STOREFLAGS.get(f.getStore()), INDEXFLAGS.get(f
319: .getIndex())));
320: }
321: return doc;
322: }
323:
324: /*
325: * @see javax.resource.spi.ManagedConnection#destroy()
326: */
327: public void destroy() throws ResourceException {
328: log.fine("destroying connection to index '" + index + "'.");
329: invalidateAllConnections();
330: synchronized (this .listeners) {
331: listeners = null;
332: }
333: path = null;
334: closeSearcher();
335: if (log.isLoggable(Level.FINE)) {
336: dec(index);
337: debug();
338: }
339: }
340:
341: /*
342: * @see javax.resource.spi.ManagedConnection#getConnection(javax.security.auth.Subject,
343: * javax.resource.spi.ConnectionRequestInfo)
344: */
345: public Object getConnection(Subject subj,
346: ConnectionRequestInfo conReqInfo) throws ResourceException {
347: log.fine("getting new connection for index '" + index + "'.");
348: setIndex(conReqInfo);
349: LuceneConnectionImpl conn = new LuceneConnectionImpl(this );
350: addConnection(conn);
351: return conn;
352: }
353:
354: protected String getIndex() {
355: return index;
356: }
357:
358: private String getIndexPath() {
359: if (index == null) {
360: return this .path;
361: } else {
362: return this .path + File.separator + index;
363: }
364: }
365:
366: /*
367: * @see javax.resource.spi.ManagedConnection#getLocalTransaction()
368: */
369: public LocalTransaction getLocalTransaction()
370: throws ResourceException {
371: throw new NotSupportedException(
372: "Transactions are not supported");
373: }
374:
375: /*
376: * @see javax.resource.spi.ManagedConnection#getLogWriter()
377: */
378: public PrintWriter getLogWriter() throws ResourceException {
379: return logWriter;
380: }
381:
382: /*
383: * @see javax.resource.spi.ManagedConnection#getMetaData()
384: */
385: public ManagedConnectionMetaData getMetaData()
386: throws ResourceException {
387: return new LuceneManagedConnectionMetaData();
388: }
389:
390: private synchronized IndexSearcher getSearcher() throws IOException {
391: if (indexSearcher == null) {
392: indexSearcher = new IndexSearcher(getIndexPath());
393: }
394: return indexSearcher;
395: }
396:
397: private IndexWriter getWriter() throws IOException {
398: boolean create = false;
399: File f = new File(getIndexPath());
400: if (!f.exists()) {
401: f.mkdirs();
402: create = true;
403: }
404: return new IndexWriter(getIndexPath(), new StandardAnalyzer(),
405: create);
406: }
407:
408: /*
409: * @see javax.resource.spi.ManagedConnection#getXAResource()
410: */
411: public XAResource getXAResource() throws ResourceException {
412: throw new NotSupportedException(
413: "XA transactions are not supported");
414: }
415:
416: /**
417: * Invalidate all application-level handlers and clears handlers set
418: */
419: void invalidateAllConnections() {
420: synchronized (this .connectionSet) {
421: Iterator itr = connectionSet.iterator();
422: while (itr.hasNext()) {
423: LuceneConnectionImpl con = (LuceneConnectionImpl) itr
424: .next();
425: con.invalidate();
426: }
427: connectionSet.clear();
428: }
429: }
430:
431: protected boolean matches(ConnectionRequestInfo reqInfo) {
432: if (reqInfo != null
433: && reqInfo instanceof LuceneConnectionRequestInfo) {
434: LuceneConnectionRequestInfo lc = (LuceneConnectionRequestInfo) reqInfo;
435: log.finer("matching: " + lc.getIndexName() + " == " + index
436: + " ?");
437: return lc.getIndexName().equals(index);
438: }
439: return false;
440: }
441:
442: public void optimize() throws ResourceException {
443: log.info("optimizing index '" + index + "'.");
444: IndexWriter writer = null;
445: try {
446: writer = getWriter();
447: writer.optimize();
448: } catch (IOException e) {
449: log.log(Level.SEVERE, "can not write to index: ", e);
450: throw new ResourceException("IO Error " + e.getMessage(), e);
451: } finally {
452: if (writer != null) {
453: try {
454: writer.close();
455: } catch (IOException e) {
456: // there is nothing w can do right now
457: }
458: }
459: }
460:
461: }
462:
463: /**
464: * Removes application-level handler from handlers set
465: *
466: * @param con
467: * handler to remove
468: */
469: void removeConnection(LuceneConnectionImpl con) {
470: log.fine("removing a connection from index '" + index + "'.");
471: synchronized (this .connectionSet) {
472: connectionSet.remove(con);
473: }
474: }
475:
476: /*
477: * @see javax.resource.spi.ManagedConnection#removeConnectionEventListener(javax.resource.spi.ConnectionEventListener)
478: */
479: public void removeConnectionEventListener(
480: ConnectionEventListener listener) {
481: synchronized (this .listeners) {
482: listeners.remove(listener);
483: }
484: }
485:
486: public Set<String> suggest(String query, QueryField field,
487: int suggestNumber) throws ResourceException {
488: CompositeQuerySuggester suggester;
489: try {
490: suggester = new CompositeQuerySuggester(field.toString(),
491: new StandardAnalyzer(new String[] {}), FSDirectory
492: .getDirectory(getIndexPath(), false),
493: suggestNumber);
494: } catch (IOException e) {
495: log.log(Level.SEVERE, "can not access index '" + index
496: + "':", e);
497: throw new ResourceException("IO Error " + e.getMessage(), e);
498: }
499: Set<String> result;
500: try {
501: result = suggester.suggest(query);
502: } catch (ParseException e) {
503: log.log(Level.WARNING, "parse exception on query '" + query
504: + "'.", e);
505: throw new ResourceException(
506: "Parse Error " + e.getMessage(), e);
507: }
508: return result;
509: }
510:
511: public Entries search(String query, Set<QueryField> fields,
512: int offset, int count) throws ResourceException {
513:
514: try {
515: IndexSearcher searcher = getSearcher();
516: QueryParser q = new QueryParser(FIELD,
517: new StandardAnalyzer(new String[] {}));
518: Query lq = q.parse(query);
519: TopDocs topDocs = searcher.search(lq, null, offset + count);
520: Entries entries = new Entries();
521: entries.setHits(topDocs.totalHits);
522: log
523: .info("searching index '" + index
524: + "' with query '" + query + "' found "
525: + topDocs.totalHits + " hits.");
526: if (topDocs.scoreDocs != null) {
527: for (int i = offset; i < topDocs.scoreDocs.length; i++) {
528: Document doc = searcher
529: .doc(topDocs.scoreDocs[i].doc);
530: Entry entry = entries.createEntry();
531: for (QueryField qf : fields) {
532: Field f;
533: if (qf.isDate()) {
534: try {
535: f = new Field(DateTools
536: .stringToDate(doc.get(qf
537: .getName())));
538: } catch (java.text.ParseException e) {
539: f = new Field(doc.get(qf.getName()));
540: }
541: } else {
542: f = new Field(doc.get(qf.getName()));
543: }
544: entry.put(qf.getName(), f);
545: entry.setRanking(topDocs.scoreDocs[i].score);
546: }
547: }
548: }
549: return entries;
550: } catch (IOException e) {
551: log.log(Level.SEVERE, "can not access index '" + index
552: + "':", e);
553: throw new ResourceException("IO Error " + e.getMessage(), e);
554: } catch (ParseException e) {
555: log.log(Level.WARNING, "parse exception on query '" + query
556: + "'.", e);
557: throw new ResourceException("Parse Error " + e.getMessage());
558: }
559: }
560:
561: protected void setIndex(ConnectionRequestInfo conReqInfo)
562: throws ResourceException {
563: if (this .path == null) {
564: throw new ResourceException("Path to a repository is null");
565: }
566: if (conReqInfo != null
567: && conReqInfo instanceof LuceneConnectionRequestInfo) {
568: index = ((LuceneConnectionRequestInfo) conReqInfo)
569: .getIndexName();
570: }
571: File repository = new File(getIndexPath());
572: if (!repository.exists() || !repository.isDirectory()) {
573: log.severe("Path does not point to an directory:"
574: + repository.getAbsolutePath());
575: throw new ResourceException("Path ["
576: + repository.getAbsolutePath()
577: + "] does not lead to a Lucene index");
578: }
579: }
580:
581: /*
582: * @see javax.resource.spi.ManagedConnection#setLogWriter(java.io.PrintWriter)
583: */
584: public void setLogWriter(PrintWriter out) throws ResourceException {
585: this.logWriter = out;
586: }
587:
588: }
|