001: /*
002: * Copyright 2004-2006 the original author or authors.
003: *
004: * Licensed under the Apache License, Version 2.0 (the "License");
005: * you may not use this file except in compliance with the License.
006: * You may obtain a copy of the License at
007: *
008: * http://www.apache.org/licenses/LICENSE-2.0
009: *
010: * Unless required by applicable law or agreed to in writing, software
011: * distributed under the License is distributed on an "AS IS" BASIS,
012: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013: * See the License for the specific language governing permissions and
014: * limitations under the License.
015: */
016:
017: package org.compass.core.lucene.engine.store;
018:
019: import java.io.IOException;
020: import java.sql.Connection;
021: import java.util.Iterator;
022: import java.util.Map;
023: import java.util.concurrent.ConcurrentHashMap;
024: import javax.sql.DataSource;
025:
026: import org.apache.commons.logging.Log;
027: import org.apache.commons.logging.LogFactory;
028: import org.apache.lucene.store.Directory;
029: import org.apache.lucene.store.jdbc.JdbcDirectory;
030: import org.apache.lucene.store.jdbc.JdbcDirectorySettings;
031: import org.apache.lucene.store.jdbc.JdbcFileEntrySettings;
032: import org.apache.lucene.store.jdbc.JdbcStoreException;
033: import org.apache.lucene.store.jdbc.datasource.DataSourceUtils;
034: import org.apache.lucene.store.jdbc.datasource.TransactionAwareDataSourceProxy;
035: import org.apache.lucene.store.jdbc.dialect.Dialect;
036: import org.apache.lucene.store.jdbc.dialect.DialectResolver;
037: import org.apache.lucene.store.jdbc.index.FetchPerTransactionJdbcIndexInput;
038: import org.apache.lucene.store.jdbc.support.JdbcTable;
039: import org.compass.core.CompassException;
040: import org.compass.core.config.CompassConfigurable;
041: import org.compass.core.config.CompassEnvironment;
042: import org.compass.core.config.CompassSettings;
043: import org.compass.core.config.ConfigurationException;
044: import org.compass.core.engine.SearchEngine;
045: import org.compass.core.engine.SearchEngineException;
046: import org.compass.core.engine.event.SearchEngineEventManager;
047: import org.compass.core.engine.event.SearchEngineLifecycleEventListener;
048: import org.compass.core.lucene.LuceneEnvironment;
049: import org.compass.core.lucene.engine.store.jdbc.DataSourceProvider;
050: import org.compass.core.lucene.engine.store.jdbc.DriverManagerDataSourceProvider;
051: import org.compass.core.util.ClassUtils;
052:
053: /**
054: * @author kimchy
055: */
056: public class JdbcDirectoryStore extends AbstractDirectoryStore
057: implements CompassConfigurable {
058:
059: private static final Log log = LogFactory
060: .getLog(JdbcDirectoryStore.class);
061:
062: public static final String PROTOCOL = "jdbc://";
063:
064: private JdbcDirectorySettings jdbcSettings;
065:
066: private DataSource dataSource;
067:
068: private DataSourceProvider dataSourceProvider;
069:
070: private Dialect dialect;
071:
072: private boolean managed;
073:
074: private boolean disableSchemaOperation;
075:
076: private Map<String, JdbcTable> cachedJdbcTables = new ConcurrentHashMap<String, JdbcTable>();
077:
078: public void configure(CompassSettings settings)
079: throws CompassException {
080: String connection = settings
081: .getSetting(CompassEnvironment.CONNECTION);
082: String url = connection.substring(PROTOCOL.length());
083:
084: String dataSourceProviderClassName = settings.getSetting(
085: LuceneEnvironment.JdbcStore.DataSourceProvider.CLASS,
086: DriverManagerDataSourceProvider.class.getName());
087: try {
088: dataSourceProvider = (DataSourceProvider) ClassUtils
089: .forName(dataSourceProviderClassName,
090: settings.getClassLoader()).newInstance();
091: if (log.isDebugEnabled()) {
092: log
093: .debug("Using data source provider ["
094: + dataSourceProvider.getClass()
095: .getName() + "]");
096: }
097: dataSourceProvider.configure(url, settings);
098: this .dataSource = dataSourceProvider.getDataSource();
099: } catch (Exception e) {
100: throw new CompassException(
101: "Failed to configure data source provider ["
102: + dataSourceProviderClassName + "]", e);
103: }
104:
105: String dialectClassName = settings.getSetting(
106: LuceneEnvironment.JdbcStore.DIALECT, null);
107: if (dialectClassName == null) {
108: try {
109: dialect = new DialectResolver().getDialect(dataSource);
110: } catch (JdbcStoreException e) {
111: throw new ConfigurationException(
112: "Failed to auto detect dialect", e);
113: }
114: } else {
115: try {
116: dialect = (Dialect) ClassUtils.forName(
117: dialectClassName, settings.getClassLoader())
118: .newInstance();
119: } catch (Exception e) {
120: throw new ConfigurationException(
121: "Failed to configure dialect ["
122: + dialectClassName + "]");
123: }
124: }
125: if (log.isDebugEnabled()) {
126: log.debug("Using dialect [" + dialect.getClass().getName()
127: + "]");
128: }
129:
130: managed = settings.getSettingAsBoolean(
131: LuceneEnvironment.JdbcStore.MANAGED, false);
132: if (log.isDebugEnabled()) {
133: log.debug("Using managed [" + managed + "]");
134: }
135: if (!managed) {
136: this .dataSource = new TransactionAwareDataSourceProxy(
137: this .dataSource);
138: }
139:
140: disableSchemaOperation = settings.getSettingAsBoolean(
141: LuceneEnvironment.JdbcStore.DISABLE_SCHEMA_OPERATIONS,
142: false);
143: if (log.isDebugEnabled()) {
144: log.debug("Using disable schema operations ["
145: + disableSchemaOperation + "]");
146: }
147:
148: jdbcSettings = new JdbcDirectorySettings();
149: jdbcSettings.setNameColumnName(settings.getSetting(
150: LuceneEnvironment.JdbcStore.DDL.NAME_NAME, jdbcSettings
151: .getNameColumnName()));
152: jdbcSettings.setValueColumnName(settings.getSetting(
153: LuceneEnvironment.JdbcStore.DDL.VALUE_NAME,
154: jdbcSettings.getValueColumnName()));
155: jdbcSettings.setSizeColumnName(settings.getSetting(
156: LuceneEnvironment.JdbcStore.DDL.SIZE_NAME, jdbcSettings
157: .getSizeColumnName()));
158: jdbcSettings.setLastModifiedColumnName(settings.getSetting(
159: LuceneEnvironment.JdbcStore.DDL.LAST_MODIFIED_NAME,
160: jdbcSettings.getLastModifiedColumnName()));
161: jdbcSettings.setDeletedColumnName(settings.getSetting(
162: LuceneEnvironment.JdbcStore.DDL.DELETED_NAME,
163: jdbcSettings.getDeletedColumnName()));
164:
165: jdbcSettings.setNameColumnLength(settings.getSettingAsInt(
166: LuceneEnvironment.JdbcStore.DDL.NAME_LENGTH,
167: jdbcSettings.getNameColumnLength()));
168: jdbcSettings.setValueColumnLengthInK(settings.getSettingAsInt(
169: LuceneEnvironment.JdbcStore.DDL.VALUE_LENGTH,
170: jdbcSettings.getValueColumnLengthInK()));
171:
172: jdbcSettings
173: .setDeleteMarkDeletedDelta(settings
174: .getSettingAsLong(
175: LuceneEnvironment.JdbcStore.DELETE_MARK_DELETED_DELTA,
176: jdbcSettings
177: .getDeleteMarkDeletedDelta()));
178: if (log.isDebugEnabled()) {
179: log.debug("Using delete mark deleted older than ["
180: + jdbcSettings.getDeleteMarkDeletedDelta() + "ms]");
181: }
182: jdbcSettings.setQueryTimeout(settings.getSettingAsInt(
183: LuceneEnvironment.Transaction.LOCK_TIMEOUT,
184: jdbcSettings.getQueryTimeout()));
185: if (log.isDebugEnabled()) {
186: log
187: .debug("Using query timeout (transaction lock timeout) ["
188: + jdbcSettings.getQueryTimeout() + "ms]");
189: }
190:
191: try {
192: jdbcSettings.setLockClass(settings.getSettingAsClass(
193: LuceneEnvironment.JdbcStore.LOCK_TYPE, jdbcSettings
194: .getLockClass()));
195: } catch (ClassNotFoundException e) {
196: throw new CompassException(
197: "Failed to create jdbc lock class ["
198: + settings
199: .getSetting(LuceneEnvironment.JdbcStore.LOCK_TYPE)
200: + "]");
201: }
202: if (log.isDebugEnabled()) {
203: log.debug("Using lock strategy ["
204: + jdbcSettings.getLockClass().getName() + "]");
205: }
206:
207: if (dialect.supportTransactionalScopedBlobs()
208: && !"true"
209: .equalsIgnoreCase(settings
210: .getSetting(
211: LuceneEnvironment.JdbcStore.Connection.AUTO_COMMIT,
212: "false"))) {
213: // Use FetchPerTransaction is dialect supports it
214: jdbcSettings.getDefaultFileEntrySettings().setClassSetting(
215: JdbcFileEntrySettings.INDEX_INPUT_TYPE_SETTING,
216: FetchPerTransactionJdbcIndexInput.class);
217: if (log.isDebugEnabled()) {
218: log
219: .debug("Using transactional blobs (dialect supports it)");
220: }
221: } else {
222: if (log.isDebugEnabled()) {
223: log
224: .debug("Using non transactional blobs (dialect does not supports it)");
225: }
226: }
227:
228: Map fileEntries = settings
229: .getSettingGroups(LuceneEnvironment.JdbcStore.FileEntry.PREFIX);
230: for (Iterator it = fileEntries.keySet().iterator(); it
231: .hasNext();) {
232: String fileEntryName = (String) it.next();
233: CompassSettings compassFeSettings = (CompassSettings) fileEntries
234: .get(fileEntryName);
235: if (log.isInfoEnabled()) {
236: log
237: .info("Configuring file entry ["
238: + fileEntryName + "] with settings ["
239: + compassFeSettings + "]");
240: }
241: JdbcFileEntrySettings jdbcFileEntrySettings = jdbcSettings
242: .getFileEntrySettingsWithoutDefault(fileEntryName);
243: if (jdbcFileEntrySettings == null) {
244: jdbcFileEntrySettings = new JdbcFileEntrySettings();
245: }
246: // iterate over all the settings and copy them to the jdbc settings
247: for (Iterator feIt = compassFeSettings.keySet().iterator(); feIt
248: .hasNext();) {
249: String feSetting = (String) feIt.next();
250: jdbcFileEntrySettings.setSetting(feSetting,
251: compassFeSettings.getSetting(feSetting));
252: }
253: jdbcSettings.registerFileEntrySettings(fileEntryName,
254: jdbcFileEntrySettings);
255: }
256: }
257:
258: public Directory open(String subContext, String subIndex)
259: throws SearchEngineException {
260: String totalPath = subContext + "_" + subIndex;
261: JdbcTable jdbcTable = cachedJdbcTables.get(totalPath);
262: if (jdbcTable == null) {
263: jdbcTable = new JdbcTable(jdbcSettings, dialect, totalPath);
264: cachedJdbcTables.put(totalPath, jdbcTable);
265: }
266: JdbcDirectory dir = new JdbcDirectory(dataSource, jdbcTable);
267: if (!disableSchemaOperation) {
268: try {
269: dir.create();
270: } catch (IOException e) {
271: throw new SearchEngineException(
272: "Failed to create dir [" + totalPath + "]", e);
273: }
274: }
275: return dir;
276: }
277:
278: public Boolean indexExists(Directory dir)
279: throws SearchEngineException {
280: try {
281: // for databases that fail if there is no table (like postgres)
282: if (dialect.supportsTableExists()) {
283: boolean tableExists = ((JdbcDirectory) dir)
284: .tableExists();
285: if (!tableExists) {
286: return Boolean.FALSE;
287: }
288: }
289: } catch (IOException e) {
290: log.warn("Failed to check if index exists", e);
291: } catch (UnsupportedOperationException e) {
292: // do nothing, let the base class check for it
293: }
294: return null;
295: }
296:
297: public void deleteIndex(Directory dir, String subContext,
298: String subIndex) throws SearchEngineException {
299: try {
300: if (disableSchemaOperation) {
301: ((JdbcDirectory) dir).deleteContent();
302: } else {
303: ((JdbcDirectory) dir).delete();
304: }
305: } catch (IOException e) {
306: throw new SearchEngineException("Failed to delete index ["
307: + subIndex + "]", e);
308: }
309: }
310:
311: public void cleanIndex(Directory dir, String subContext,
312: String subIndex) throws SearchEngineException {
313: JdbcDirectory jdbcDirectory = (JdbcDirectory) dir;
314: try {
315: jdbcDirectory.deleteContent();
316: } catch (IOException e) {
317: throw new SearchEngineException(
318: "Failed to delete content of [" + subIndex + "]", e);
319: }
320: }
321:
322: public void performScheduledTasks(Directory dir, String subContext,
323: String subIndex) throws SearchEngineException {
324: try {
325: ((JdbcDirectory) dir).deleteMarkDeleted();
326: } catch (IOException e) {
327: throw new SearchEngineException(
328: "Failed to delete mark deleted with jdbc for ["
329: + subIndex + "]", e);
330: }
331: }
332:
333: public CopyFromHolder beforeCopyFrom(String subContext,
334: String subIndex, Directory dir)
335: throws SearchEngineException {
336: try {
337: ((JdbcDirectory) dir).deleteContent();
338: } catch (IOException e) {
339: throw new SearchEngineException(
340: "Failed to delete index content");
341: }
342: return new CopyFromHolder();
343: }
344:
345: public void registerEventListeners(SearchEngine searchEngine,
346: SearchEngineEventManager eventManager) {
347: if (managed) {
348: eventManager
349: .registerLifecycleListener(new ManagedEventListeners());
350: } else {
351: eventManager
352: .registerLifecycleListener(new NoneManagedEventListeners());
353: }
354: }
355:
356: public void close() {
357: this .dataSourceProvider.closeDataSource();
358: }
359:
360: private class ManagedEventListeners implements
361: SearchEngineLifecycleEventListener {
362:
363: public void beforeBeginTransaction()
364: throws SearchEngineException {
365:
366: }
367:
368: public void afterBeginTransaction()
369: throws SearchEngineException {
370:
371: }
372:
373: public void afterPrepare() throws SearchEngineException {
374:
375: }
376:
377: public void afterCommit(boolean onePhase)
378: throws SearchEngineException {
379: Connection conn;
380: try {
381: conn = DataSourceUtils.getConnection(dataSource);
382: } catch (JdbcStoreException e) {
383: throw new SearchEngineException(
384: "Failed to get connection", e);
385: }
386: FetchPerTransactionJdbcIndexInput.releaseBlobs(conn);
387: DataSourceUtils.releaseConnection(conn);
388: }
389:
390: public void afterRollback() throws SearchEngineException {
391: Connection conn;
392: try {
393: conn = DataSourceUtils.getConnection(dataSource);
394: } catch (JdbcStoreException e) {
395: throw new SearchEngineException(
396: "Failed to get connection", e);
397: }
398: FetchPerTransactionJdbcIndexInput.releaseBlobs(conn);
399: DataSourceUtils.releaseConnection(conn);
400: }
401:
402: public void close() throws SearchEngineException {
403:
404: }
405: }
406:
407: private class NoneManagedEventListeners implements
408: SearchEngineLifecycleEventListener {
409:
410: private Connection connection;
411:
412: public void beforeBeginTransaction()
413: throws SearchEngineException {
414: try {
415: connection = DataSourceUtils.getConnection(dataSource);
416: } catch (JdbcStoreException e) {
417: throw new SearchEngineException(
418: "Failed to open db connection", e);
419: }
420: }
421:
422: public void afterBeginTransaction()
423: throws SearchEngineException {
424: }
425:
426: public void afterPrepare() throws SearchEngineException {
427: }
428:
429: public void afterCommit(boolean onePhase)
430: throws SearchEngineException {
431: try {
432: DataSourceUtils.commitConnectionIfPossible(connection);
433: } catch (JdbcStoreException e) {
434: throw new SearchEngineException(
435: "Failed to commit database transcation", e);
436: } finally {
437: DataSourceUtils.releaseConnection(connection);
438: this .connection = null;
439: }
440: }
441:
442: public void afterRollback() throws SearchEngineException {
443: try {
444: DataSourceUtils
445: .rollbackConnectionIfPossible(connection);
446: } catch (JdbcStoreException e) {
447: throw new SearchEngineException(
448: "Failed to rollback database transcation", e);
449: } finally {
450: DataSourceUtils.releaseConnection(connection);
451: this .connection = null;
452: }
453: }
454:
455: public void close() throws SearchEngineException {
456: }
457: }
458: }
|