0001: /**********************************************************************************
0002: * $URL: https://source.sakaiproject.org/svn/search/tags/sakai_2-4-1/search-impl/impl/src/java/org/sakaiproject/search/index/impl/JDBCClusterIndexStore.java $
0003: * $Id: JDBCClusterIndexStore.java 29635 2007-04-26 14:44:09Z ajpoland@iupui.edu $
0004: ***********************************************************************************
0005: *
0006: * Copyright (c) 2003, 2004, 2005, 2006 The Sakai Foundation.
0007: *
0008: * Licensed under the Educational Community License, Version 1.0 (the "License");
0009: * you may not use this file except in compliance with the License.
0010: * You may obtain a copy of the License at
0011: *
0012: * http://www.opensource.org/licenses/ecl1.php
0013: *
0014: * Unless required by applicable law or agreed to in writing, software
0015: * distributed under the License is distributed on an "AS IS" BASIS,
0016: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0017: * See the License for the specific language governing permissions and
0018: * limitations under the License.
0019: *
0020: **********************************************************************************/package org.sakaiproject.search.index.impl;
0021:
0022: import java.io.File;
0023: import java.io.FileInputStream;
0024: import java.io.FileOutputStream;
0025: import java.io.IOException;
0026: import java.io.InputStream;
0027: import java.nio.channels.FileChannel;
0028: import java.sql.Connection;
0029: import java.sql.PreparedStatement;
0030: import java.sql.ResultSet;
0031: import java.sql.SQLException;
0032: import java.util.ArrayList;
0033: import java.util.Date;
0034: import java.util.Iterator;
0035: import java.util.List;
0036:
0037: import javax.sql.DataSource;
0038:
0039: import org.apache.commons.logging.Log;
0040: import org.apache.commons.logging.LogFactory;
0041: import org.apache.lucene.index.IndexReader;
0042: import org.sakaiproject.search.api.SearchService;
0043: import org.sakaiproject.search.index.ClusterFilesystem;
0044: import org.sakaiproject.search.index.SegmentInfo;
0045:
0046: /**
0047: * This is a JDBC implementation of the ClusterFilesystem. It syncronizes the
0048: * local search index segments with the database, by sipping each segment and
0049: * pushing it to the database. Each Segment has an extra file that contains an
0050: * MD5 of the segment and a time stamp of the last update. If any segments are
0051: * missing in the local segment store (including no segments at all, as with a
0052: * new cluster node) the missing segments are downloaded from the JDBC store. If
0053: * any of the segments on the local store are found to be dammaged they are
0054: * reloaded from the database.
0055: *
0056: * @author ieb
0057: */
0058: public class JDBCClusterIndexStore implements ClusterFilesystem {
0059:
0060: private static final Log log = LogFactory
0061: .getLog(JDBCClusterIndexStore.class);
0062:
0063: private DataSource dataSource = null;
0064:
0065: private String searchIndexDirectory = null;
0066:
0067: private static final String TEMP_INDEX_NAME = "tempindex";
0068:
0069: private static final String INDEX_PATCHNAME = "indexpatch";
0070:
0071: private boolean autoDdl = false;
0072:
0073: private boolean parallelIndex = false;
0074:
0075: /**
0076: * If validate is true, all segments will be checked on initial startup and
0077: * upload. This can take a long time. If its false, only when an index is
0078: * updated is the MD5 checked. Recomendation is to leave this false.
0079: */
0080: private boolean validate = false;
0081:
0082: /**
0083: * This will be set to after the first update of a JVM run has been
0084: * completed, as its possible that IndexReaders may have open references to
0085: * the Segments that we try and remove. Update: 2007/02/27 Actualy since we
0086: * need to merge segments we not need to remove them so this is no longer
0087: * the case, okToRemove is always true
0088: */
0089: private static boolean okToRemove = true;
0090:
0091: private String sharedSegments = null;
0092:
0093: private boolean debug = false;
0094:
0095: /**
0096: * locatStructuredStorage causes local segments to be placed into structured
0097: * storage on the local disk
0098: */
0099: private boolean localStructuredStorage = false;
0100:
0101: /**
0102: * sharedStructuredStorage causes the shared segments to be placed into
0103: * shared structured storage in the shared location
0104: */
0105: private boolean sharedStructuredStorage = false;
0106:
0107: private ClusterSegmentsStorage clusterStorage = null;
0108:
0109: private boolean localSegmentsOnly = false;
0110:
0111: private SearchService searchService;
0112:
0113: public void init() {
0114: try {
0115: log.info(this + ":init() ");
0116: clusterStorage = new ClusterSegmentsStorage(searchService,
0117: searchIndexDirectory, this , localStructuredStorage,
0118: debug);
0119:
0120: // TODO: We should migrate to the correct storage format, on the
0121: // local
0122: // and shared space, by looking at the DB and then checking what is
0123: // there
0124: // followed by a move.
0125: // Since we are doing a move, it should be ok to have this happend
0126: // on
0127: // the fly.
0128:
0129: try {
0130: migrateLocalSegments();
0131: migrateSharedSegments();
0132: } catch (IOException ex) {
0133: log
0134: .error(
0135: "Failed to migrate search content to new format, the instance should not continue to run ",
0136: ex);
0137: System.exit(-1);
0138: }
0139:
0140: /*
0141: * The storage is created by hibernate now try { if (autoDdl) {
0142: * SqlService.getInstance().ddl(this.getClass().getClassLoader(),
0143: * "search_cluster"); } } catch (Exception ex) { log.error("Failed
0144: * to init JDBCClusterIndexStorage", ex); }
0145: */
0146: log.info(this + ":init() Ok ");
0147: } catch (Exception ex) {
0148: log.error("Failed to start Cluster Index store", ex);
0149: System.exit(-1);
0150: }
0151:
0152: }
0153:
0154: /**
0155: * update the local Segmetns from the DB
0156: */
0157: public List<SegmentInfo> updateSegments() {
0158: Connection connection = null;
0159: List<SegmentInfo> segmentList = new ArrayList<SegmentInfo>();
0160: try {
0161: connection = dataSource.getConnection();
0162: List dbSegments = getDBSegments(connection);
0163: if (log.isDebugEnabled())
0164: log.debug("Update: DB Segments = " + dbSegments.size());
0165: // remove files not in the dbSegmentList
0166: List<SegmentInfo> localSegments = getLocalSegments();
0167:
0168: List<SegmentInfo> badLocalSegments = getBadLocalSegments();
0169: // delete any bad local segments before we load so that they get
0170: // updated
0171: // from the db
0172: deleteAllSegments(badLocalSegments);
0173:
0174: List<SegmentInfo> deletedSegments = getDeletedLocalSegments();
0175: // delete any segments marked as for deletion
0176: deleteAllSegments(deletedSegments);
0177:
0178: if (log.isDebugEnabled())
0179: log.debug("Update: Local Segments = "
0180: + localSegments.size());
0181:
0182: // which of the dbSegments are not present locally
0183:
0184: List<SegmentInfo> updateLocalSegments = new ArrayList<SegmentInfo>();
0185: for (Iterator i = dbSegments.iterator(); i.hasNext();) {
0186: SegmentInfo db_si = (SegmentInfo) i.next();
0187: boolean found = false;
0188: String name = db_si.getName();
0189: for (Iterator j = localSegments.iterator(); j.hasNext();) {
0190: SegmentInfo local_si = (SegmentInfo) j.next();
0191: if (name.equals(local_si.getName())) {
0192: found = true;
0193: break;
0194: }
0195: }
0196: if (!found) {
0197: updateLocalSegments.add(db_si);
0198: if (log.isDebugEnabled())
0199: log.debug("Missing Will update " + db_si);
0200: } else {
0201: if (log.isDebugEnabled())
0202: log.debug("Present Will Not update " + db_si);
0203: }
0204: }
0205:
0206: // which of the dbsegmetnts are newer than local versions
0207: for (Iterator i = localSegments.iterator(); i.hasNext();) {
0208: SegmentInfo current_si = (SegmentInfo) i.next();
0209: boolean found = false;
0210: String name = current_si.getName();
0211: long version = current_si.getVersion();
0212: for (Iterator j = dbSegments.iterator(); j.hasNext();) {
0213: SegmentInfo db_si = (SegmentInfo) j.next();
0214: if (name.equals(db_si.getName())
0215: && db_si.getVersion() > version) {
0216: updateLocalSegments.add(db_si);
0217: if (log.isDebugEnabled())
0218: log.debug("Newer will Update " + db_si);
0219: found = true;
0220: break;
0221: }
0222: }
0223: if (!found) {
0224: if (log.isDebugEnabled())
0225: log.debug("Ok will not update " + current_si);
0226: }
0227: }
0228:
0229: // which if the currentSegments need updating
0230: // process the remove list
0231: // we can only perform a remove, IF there is no other activity.
0232: // ie only on the first time in any 1 JVM run
0233: if (okToRemove) {
0234: okToRemove = true;
0235: // with merge we need to remove local segments
0236: // that are not present. This may cause problems with
0237: // an open index, as it will suddenly see segments dissapear
0238: // Update 2007/02/27 It does cause problems. We need to make it
0239: // possible for
0240: // search to recover, or to delay the removal of segments until
0241: // the reload is complete.
0242:
0243: List<SegmentInfo> removeLocalSegments = new ArrayList<SegmentInfo>();
0244:
0245: // which segments exist locally but not in the DB, these should
0246: // be
0247: // removed
0248: for (Iterator i = localSegments.iterator(); i.hasNext();) {
0249:
0250: SegmentInfo local_si = (SegmentInfo) i.next();
0251: // only check local segments that are not new and not
0252: if (local_si.isCreated()) {
0253: boolean found = false;
0254: String name = local_si.getName();
0255: for (Iterator j = dbSegments.iterator(); j
0256: .hasNext();) {
0257: SegmentInfo db_si = (SegmentInfo) j.next();
0258: if (name.equals(db_si.getName())) {
0259: found = true;
0260: break;
0261: }
0262: }
0263: if (!found) {
0264: removeLocalSegments.add(local_si);
0265: if (log.isDebugEnabled())
0266: log.debug("Will remove " + local_si);
0267: } else {
0268: if (log.isDebugEnabled())
0269: log.debug("Ok Will not remove "
0270: + local_si);
0271: }
0272: }
0273: }
0274:
0275: // if we could mark the local segment for deletion so that
0276: // its is only deleted some time later.
0277: for (Iterator i = removeLocalSegments.iterator(); i
0278: .hasNext();) {
0279: SegmentInfo rmsi = (SegmentInfo) i.next();
0280: removeLocalSegment(rmsi);
0281: }
0282: }
0283:
0284: // process the get list
0285: for (Iterator i = updateLocalSegments.iterator(); i
0286: .hasNext();) {
0287: SegmentInfo addsi = (SegmentInfo) i.next();
0288: try {
0289: updateLocalSegment(connection, addsi);
0290: } catch (Exception ex) {
0291: // ignore failures to unpack a local segment. It may have
0292: // been removed by
0293: // annother node
0294: log.info("Segment was not unpacked "
0295: + ex.getClass().getName() + ":"
0296: + ex.getMessage());
0297: }
0298:
0299: }
0300: // if we made any modifications, we also need to process the patch
0301: if (updateLocalSegments.size() > 0) {
0302: updateLocalPatch(connection);
0303: }
0304:
0305: // build the list putting the current segment at the end
0306: for (Iterator i = dbSegments.iterator(); i.hasNext();) {
0307: SegmentInfo si = (SegmentInfo) i.next();
0308: File f = si.getSegmentLocation();
0309: if (f.exists()) {
0310: // only add those segments that exist after the sync
0311: segmentList.add(si);
0312: }
0313: if (log.isDebugEnabled())
0314: log.debug("Segment Present at " + f.getName());
0315: }
0316:
0317: connection.commit();
0318: } catch (Exception sqle) {
0319: log.error("Failed to update segments ", sqle);
0320: try {
0321: connection.rollback();
0322: } catch (Exception ex) {
0323: }
0324: } finally {
0325: try {
0326: connection.close();
0327: } catch (Exception e) {
0328: }
0329: }
0330: return segmentList;
0331: }
0332:
0333: private void deleteAllSegments(List<SegmentInfo> badLocalSegments) {
0334: for (Iterator<SegmentInfo> i = badLocalSegments.iterator(); i
0335: .hasNext();) {
0336:
0337: SegmentInfo s = i.next();
0338: s.doFinalDelete();
0339: }
0340: }
0341:
0342: /**
0343: * save the local segments to the DB
0344: */
0345: public List<SegmentInfo> saveSegments() {
0346: Connection connection = null;
0347: List<SegmentInfo> segmentList = new ArrayList<SegmentInfo>();
0348: try {
0349: connection = dataSource.getConnection();
0350: List<SegmentInfo> dbSegments = getDBSegments(connection);
0351: // remove files not in the dbSegmentList
0352: List<SegmentInfo> localSegments = getLocalSegments();
0353: List<SegmentInfo> badLocalSegments = getBadLocalSegments();
0354:
0355: // find the dbSegments that are not present locally
0356:
0357: List<SegmentInfo> removeDBSegments = new ArrayList<SegmentInfo>();
0358: List<SegmentInfo> currentDBSegments = new ArrayList<SegmentInfo>();
0359:
0360: // which segments exist inthe db but not locally
0361: for (Iterator i = dbSegments.iterator(); i.hasNext();) {
0362: SegmentInfo db_si = (SegmentInfo) i.next();
0363: boolean found = false;
0364: String name = db_si.getName();
0365: for (Iterator j = localSegments.iterator(); j.hasNext();) {
0366: SegmentInfo local_si = (SegmentInfo) j.next();
0367: if (name.equals(local_si.getName())) {
0368: found = true;
0369: break;
0370: }
0371: }
0372: // dont delete bad segments from the DB
0373: if (!found) {
0374: for (Iterator j = badLocalSegments.iterator(); j
0375: .hasNext();) {
0376: File local_file = (File) j.next();
0377: if (name.equals(local_file.getName())) {
0378: found = true;
0379: break;
0380: }
0381: }
0382: }
0383: if (!found) {
0384: removeDBSegments.add(db_si);
0385: if (log.isDebugEnabled())
0386: log.debug("Will remove from the DB " + db_si);
0387: } else {
0388: currentDBSegments.add(db_si);
0389: if (log.isDebugEnabled())
0390: log.debug("In the DB will not remove " + db_si);
0391: }
0392: }
0393:
0394: List<SegmentInfo> updateDBSegments = new ArrayList<SegmentInfo>();
0395: // which of the localSegments are not in the db
0396:
0397: for (Iterator i = localSegments.iterator(); i.hasNext();) {
0398: SegmentInfo local_si = (SegmentInfo) i.next();
0399: boolean found = false;
0400: String name = local_si.getName();
0401: for (Iterator j = dbSegments.iterator(); j.hasNext();) {
0402: SegmentInfo db_si = (SegmentInfo) j.next();
0403: if (name.equals(db_si.getName())) {
0404: found = true;
0405: break;
0406: }
0407: }
0408: if (!found) {
0409: updateDBSegments.add(local_si);
0410: if (log.isDebugEnabled())
0411: log.debug(" Will update to the DB " + local_si);
0412: } else {
0413: if (log.isDebugEnabled())
0414: log.debug(" Will NOT update to the DB "
0415: + local_si);
0416:
0417: }
0418: }
0419:
0420: // which of the localSegments have been modified
0421: for (Iterator i = localSegments.iterator(); i.hasNext();) {
0422: SegmentInfo local_si = (SegmentInfo) i.next();
0423: boolean found = false;
0424: String name = local_si.getName();
0425: long version = local_si.getVersion();
0426: for (Iterator j = dbSegments.iterator(); j.hasNext();) {
0427: SegmentInfo db_si = (SegmentInfo) j.next();
0428: if (name.equals(db_si.getName())
0429: && version > db_si.getVersion()) {
0430: updateDBSegments.add(db_si);
0431: if (log.isDebugEnabled())
0432: log.debug("Will update modified to the DB "
0433: + db_si);
0434: found = true;
0435: break;
0436: }
0437: }
0438: if (!found) {
0439: if (log.isDebugEnabled())
0440: log.debug("Will not update the DB, matches "
0441: + local_si);
0442:
0443: }
0444: }
0445:
0446: // process the remove list
0447: for (Iterator i = removeDBSegments.iterator(); i.hasNext();) {
0448: SegmentInfo rmsi = (SegmentInfo) i.next();
0449: removeDBSegment(connection, rmsi);
0450: }
0451: // process the get list
0452: for (Iterator i = updateDBSegments.iterator(); i.hasNext();) {
0453: SegmentInfo addsi = (SegmentInfo) i.next();
0454: updateDBSegment(connection, addsi);
0455: }
0456: // build the list putting the current segment at the end
0457: updateDBPatch(connection);
0458:
0459: for (Iterator i = updateDBSegments.iterator(); i.hasNext();) {
0460: SegmentInfo si = (SegmentInfo) i.next();
0461: File f = si.getSegmentLocation();
0462: segmentList.add(si);
0463: if (log.isDebugEnabled())
0464: log.debug("Segments saved " + f.getName());
0465:
0466: }
0467: connection.commit();
0468: deleteAllSegments(badLocalSegments);
0469: } catch (Exception ex) {
0470: log.error(
0471: "Failed to Save Segments back to Central Storage",
0472: ex);
0473: try {
0474: connection.rollback();
0475: } catch (Exception e) {
0476: }
0477: recoverFromFailure();
0478: } finally {
0479: try {
0480: connection.close();
0481: } catch (Exception e) {
0482: }
0483: }
0484: return segmentList;
0485: }
0486:
0487: public List<SegmentInfo> saveAllSegments() {
0488: Connection connection = null;
0489: List<SegmentInfo> segmentList = new ArrayList<SegmentInfo>();
0490: try {
0491: connection = dataSource.getConnection();
0492: List<SegmentInfo> dbSegments = getDBSegments(connection);
0493: // remove files not in the dbSegmentList
0494: List<SegmentInfo> localSegments = getLocalSegments();
0495: List<SegmentInfo> badLocalSegments = getBadLocalSegments();
0496:
0497: // find the dbSegments that are not present locally
0498:
0499: List<SegmentInfo> updateDBSegments = new ArrayList<SegmentInfo>();
0500: // which of the localSegments are not in the db
0501:
0502: for (Iterator i = localSegments.iterator(); i.hasNext();) {
0503: SegmentInfo local_si = (SegmentInfo) i.next();
0504:
0505: boolean found = false;
0506: String name = local_si.getName();
0507: for (Iterator j = dbSegments.iterator(); j.hasNext();) {
0508: SegmentInfo db_si = (SegmentInfo) j.next();
0509: if (name.equals(db_si.getName())) {
0510: found = true;
0511: break;
0512: }
0513: }
0514: // dont delete bad segments from the DB
0515: if (!found) {
0516: for (Iterator j = badLocalSegments.iterator(); j
0517: .hasNext();) {
0518: File local_file = (File) j.next();
0519: if (name.equals(local_file.getName())) {
0520: found = true;
0521: break;
0522: }
0523: }
0524: }
0525:
0526: if (!found) {
0527: updateDBSegments.add(local_si);
0528: }
0529: }
0530:
0531: // the db segments
0532: for (Iterator i = localSegments.iterator(); i.hasNext();) {
0533: SegmentInfo local_si = (SegmentInfo) i.next();
0534: String name = local_si.getName();
0535: long version = local_si.getVersion();
0536: for (Iterator j = dbSegments.iterator(); j.hasNext();) {
0537: SegmentInfo db_si = (SegmentInfo) j.next();
0538: if (name.equals(db_si.getName())) {
0539: updateDBSegments.add(db_si);
0540: break;
0541: }
0542: }
0543: }
0544:
0545: // process the get list
0546: for (Iterator i = updateDBSegments.iterator(); i.hasNext();) {
0547: SegmentInfo addsi = (SegmentInfo) i.next();
0548: updateDBSegment(connection, addsi);
0549: }
0550: // build the list putting the current segment at the end
0551:
0552: for (Iterator i = updateDBSegments.iterator(); i.hasNext();) {
0553: SegmentInfo si = (SegmentInfo) i.next();
0554: segmentList.add(si);
0555: }
0556: connection.commit();
0557:
0558: deleteAllSegments(badLocalSegments);
0559: } catch (Exception ex) {
0560: log.error(
0561: "Failed to Save Segments back to Central Storage",
0562: ex);
0563: try {
0564: connection.rollback();
0565: } catch (Exception e) {
0566: }
0567: recoverFromFailure();
0568: } finally {
0569: try {
0570: connection.close();
0571: } catch (Exception e) {
0572: }
0573: }
0574: return segmentList;
0575: }
0576:
0577: protected void updateLocalSegment(Connection connection,
0578: SegmentInfo addsi) throws SQLException, IOException {
0579: if (searchService.hasDiagnostics()) {
0580: log.info("\tUpdate Local Segment from Database " + addsi);
0581: }
0582: if (localSegmentsOnly) {
0583: log
0584: .warn("Update Local Segment Requested with inactive Shared Storage "
0585: + addsi);
0586: } else {
0587: if (sharedSegments == null || sharedSegments.length() == 0) {
0588: updateLocalSegmentBLOB(connection, addsi);
0589: } else {
0590: updateLocalSegmentFilesystem(connection, addsi);
0591: }
0592: }
0593:
0594: }
0595:
0596: /**
0597: * updte a segment from the database
0598: *
0599: * @param connection
0600: * @param addsi
0601: */
0602: protected void updateLocalSegmentBLOB(Connection connection,
0603: SegmentInfo addsi) throws SQLException, IOException {
0604: if (log.isDebugEnabled())
0605: log.debug("Updating local segment from databse " + addsi);
0606: PreparedStatement segmentSelect = null;
0607: ResultSet resultSet = null;
0608: try {
0609: segmentSelect = connection
0610: .prepareStatement("select version_, packet_ from search_segments where name_ = ?");
0611: segmentSelect.clearParameters();
0612: segmentSelect.setString(1, addsi.getName());
0613: resultSet = segmentSelect.executeQuery();
0614: if (resultSet.next()) {
0615: InputStream packetStream = null;
0616: try {
0617: long version = resultSet.getLong(1);
0618: packetStream = resultSet.getBinaryStream(2);
0619: clusterStorage.unpackSegment(addsi, packetStream,
0620: version);
0621: if (log.isDebugEnabled())
0622: log.debug("Updated Packet from DB to versiob "
0623: + version);
0624: } finally {
0625: try {
0626: packetStream.close();
0627: } catch (Exception ex) {
0628: }
0629: }
0630: } else {
0631: log.error("Didnt find segment in database");
0632: }
0633: } finally {
0634: try {
0635: resultSet.close();
0636: } catch (Exception ex) {
0637: }
0638: try {
0639: segmentSelect.close();
0640: } catch (Exception ex) {
0641: }
0642: }
0643:
0644: }
0645:
0646: /**
0647: * remove a local segment
0648: *
0649: * @param rmsi
0650: */
0651: public void removeLocalSegment(SegmentInfo rmsi) {
0652:
0653: rmsi.setDeleted();
0654: if (searchService.hasDiagnostics()) {
0655: log.info("\tMarked Local Segment for deletion " + rmsi);
0656: }
0657: }
0658:
0659: /**
0660: * get a list of all DB segments ordered by version
0661: *
0662: * @param connection
0663: * @return
0664: */
0665: private List<SegmentInfo> getDBSegments(Connection connection)
0666: throws SQLException {
0667: PreparedStatement segmentAllSelect = null;
0668: ResultSet resultSet = null;
0669: List<SegmentInfo> dbsegments = new ArrayList<SegmentInfo>();
0670: try {
0671: segmentAllSelect = connection
0672: .prepareStatement("select version_, name_ from search_segments where name_ <> ? ");
0673: segmentAllSelect.clearParameters();
0674: segmentAllSelect.setString(1, INDEX_PATCHNAME);
0675: resultSet = segmentAllSelect.executeQuery();
0676: while (resultSet.next()) {
0677: final long version = resultSet.getLong(1);
0678: final String name = resultSet.getString(2);
0679: SegmentInfo si = SegmentInfoImpl.newSharedSegmentInfo(
0680: name, version, localStructuredStorage,
0681: searchIndexDirectory);
0682: dbsegments.add(si);
0683: if (log.isDebugEnabled())
0684: log.debug("DB Segment " + si);
0685: }
0686: } finally {
0687: try {
0688: resultSet.close();
0689: } catch (Exception ex) {
0690: }
0691: try {
0692: segmentAllSelect.close();
0693: } catch (Exception ex) {
0694: }
0695: }
0696: return dbsegments;
0697: }
0698:
0699: protected void updateDBPatch(Connection connection)
0700: throws SQLException, IOException {
0701:
0702: if (localSegmentsOnly) {
0703: if (log.isDebugEnabled())
0704: log
0705: .debug("Update Patch Requested with inactive Shared Storage ");
0706: } else {
0707: if (sharedSegments == null || sharedSegments.length() == 0) {
0708: updateDBPatchBLOB(connection);
0709: } else {
0710: updateDBPatchFilesystem(connection);
0711: }
0712: }
0713: }
0714:
0715: /**
0716: * updat this save this local segment into the db
0717: *
0718: * @param connection
0719: * @param addsi
0720: */
0721: protected void updateDBPatchBLOB(Connection connection)
0722: throws SQLException, IOException {
0723:
0724: PreparedStatement segmentUpdate = null;
0725: PreparedStatement segmentInsert = null;
0726: InputStream packetStream = null;
0727: File packetFile = null;
0728: long newVersion = System.currentTimeMillis();
0729: try {
0730: segmentUpdate = connection
0731: .prepareStatement("update search_segments set packet_ = ?, version_ = ?, size_ = ? where name_ = ?");
0732: segmentInsert = connection
0733: .prepareStatement("insert into search_segments (packet_, name_, version_, size_ ) values ( ?,?,?,?)");
0734: packetFile = clusterStorage.packPatch();
0735: if (packetFile.exists()) {
0736: packetStream = new FileInputStream(packetFile);
0737: segmentUpdate.clearParameters();
0738: segmentUpdate.setBinaryStream(1, packetStream,
0739: (int) packetFile.length());
0740: segmentUpdate.setLong(2, newVersion);
0741: segmentUpdate.setLong(3, packetFile.length());
0742: segmentUpdate.setString(4, INDEX_PATCHNAME);
0743: if (segmentUpdate.executeUpdate() != 1) {
0744: segmentInsert.clearParameters();
0745: segmentInsert.setBinaryStream(1, packetStream,
0746: (int) packetFile.length());
0747: segmentInsert.setString(2, INDEX_PATCHNAME);
0748: segmentInsert.setLong(3, newVersion);
0749: segmentInsert.setLong(4, packetFile.length());
0750: if (segmentInsert.executeUpdate() != 1) {
0751: throw new SQLException(
0752: " Failed to insert patch ");
0753: }
0754: }
0755: if (log.isDebugEnabled())
0756: log.debug("DB Updated Patch ");
0757: } else {
0758: log.warn(" Packed Patch does not exist "
0759: + packetFile.getPath());
0760: }
0761: } finally {
0762: try {
0763: packetStream.close();
0764: } catch (Exception ex) {
0765: }
0766: try {
0767: packetFile.delete();
0768: } catch (Exception ex) {
0769: }
0770: try {
0771: segmentUpdate.close();
0772: } catch (Exception ex) {
0773: }
0774: try {
0775: segmentInsert.close();
0776: } catch (Exception ex) {
0777: }
0778: }
0779:
0780: }
0781:
0782: /**
0783: * updat this save this local segment into the db
0784: *
0785: * @param connection
0786: * @param addsi
0787: */
0788: protected void updateDBPatchFilesystem(Connection connection)
0789: throws SQLException, IOException {
0790:
0791: PreparedStatement segmentUpdate = null;
0792: PreparedStatement segmentInsert = null;
0793: FileChannel packetStream = null;
0794: FileChannel sharedStream = null;
0795: File packetFile = null;
0796: File sharedFinalFile = null;
0797: File sharedTempFile = null;
0798: long newVersion = System.currentTimeMillis();
0799: try {
0800: sharedTempFile = new File(
0801: getSharedTempFileName(INDEX_PATCHNAME));
0802: sharedFinalFile = new File(getSharedFileName(
0803: INDEX_PATCHNAME, sharedStructuredStorage));
0804: packetFile = clusterStorage.packPatch();
0805: if (packetFile.exists()) {
0806: packetStream = new FileInputStream(packetFile)
0807: .getChannel();
0808: sharedTempFile.getParentFile().mkdirs();
0809: sharedStream = new FileOutputStream(sharedTempFile)
0810: .getChannel();
0811:
0812: sharedStream.transferFrom(packetStream, 0, packetStream
0813: .size());
0814:
0815: packetStream.close();
0816: sharedStream.close();
0817:
0818: segmentUpdate = connection
0819: .prepareStatement("update search_segments set version_ = ?, size_ = ? where name_ = ? ");
0820: segmentInsert = connection
0821: .prepareStatement("insert into search_segments ( name_, version_, size_ ) values ( ?,?,?)");
0822:
0823: segmentUpdate.clearParameters();
0824: segmentUpdate.setLong(1, newVersion);
0825: segmentUpdate.setLong(2, packetFile.length());
0826: segmentUpdate.setString(3, INDEX_PATCHNAME);
0827: if (segmentUpdate.executeUpdate() != 1) {
0828: segmentInsert.clearParameters();
0829: segmentInsert.setString(1, INDEX_PATCHNAME);
0830: segmentInsert.setLong(2, newVersion);
0831: segmentInsert.setLong(3, packetFile.length());
0832: if (segmentInsert.executeUpdate() != 1) {
0833: throw new SQLException(
0834: " Failed to add patch packet ");
0835: }
0836: }
0837: sharedTempFile.renameTo(sharedFinalFile);
0838: if (log.isDebugEnabled())
0839: log.debug("DB Patch ");
0840: } else {
0841: log.warn("Packet file does not exist "
0842: + packetFile.getPath());
0843: }
0844:
0845: } finally {
0846: try {
0847: packetStream.close();
0848: } catch (Exception ex) {
0849: }
0850: try {
0851: packetFile.delete();
0852: } catch (Exception ex) {
0853: }
0854: try {
0855: sharedStream.close();
0856: } catch (Exception ex) {
0857: }
0858: try {
0859: sharedTempFile.delete();
0860: } catch (Exception ex) {
0861: }
0862: try {
0863: segmentUpdate.close();
0864: } catch (Exception ex) {
0865: }
0866: try {
0867: segmentInsert.close();
0868: } catch (Exception ex) {
0869: }
0870: }
0871:
0872: }
0873:
0874: protected void updateDBSegment(Connection connection,
0875: SegmentInfo addsi) throws SQLException, IOException {
0876: if (searchService.hasDiagnostics()) {
0877: log.info("\tUpdate Database Segment from Local " + addsi);
0878: }
0879: if (localSegmentsOnly) {
0880: if (log.isDebugEnabled())
0881: log
0882: .debug("Not Saving Segment to DB as no Shared Storage "
0883: + addsi);
0884: } else {
0885: if (sharedSegments == null || sharedSegments.length() == 0) {
0886: updateDBSegmentBLOB(connection, addsi);
0887: } else {
0888: updateDBSegmentFilesystem(connection, addsi);
0889: }
0890: }
0891: }
0892:
0893: /**
0894: * updat this save this local segment into the db
0895: *
0896: * @param connection
0897: * @param addsi
0898: */
0899: protected void updateDBSegmentBLOB(Connection connection,
0900: SegmentInfo addsi) throws SQLException, IOException {
0901:
0902: PreparedStatement segmentUpdate = null;
0903: PreparedStatement segmentInsert = null;
0904: InputStream packetStream = null;
0905: File packetFile = null;
0906: long newVersion = System.currentTimeMillis();
0907: try {
0908: segmentUpdate = connection
0909: .prepareStatement("update search_segments set packet_ = ?, version_ = ?, size_ = ? where name_ = ? and version_ = ?");
0910: segmentInsert = connection
0911: .prepareStatement("insert into search_segments (packet_, name_, version_, size_ ) values ( ?,?,?,?)");
0912: packetFile = clusterStorage.packSegment(addsi, newVersion);
0913: if (packetFile.exists()) {
0914: packetStream = new FileInputStream(packetFile);
0915: if (addsi.isInDb()) {
0916: segmentUpdate.clearParameters();
0917: segmentUpdate.setBinaryStream(1, packetStream,
0918: (int) packetFile.length());
0919: segmentUpdate.setLong(2, newVersion);
0920: segmentUpdate.setLong(3, packetFile.length());
0921: segmentUpdate.setString(4, addsi.getName());
0922: segmentUpdate.setLong(5, addsi.getVersion());
0923: if (segmentUpdate.executeUpdate() != 1) {
0924: throw new SQLException(
0925: " ant Find packet to update " + addsi);
0926: }
0927: } else {
0928: segmentInsert.clearParameters();
0929: segmentInsert.setBinaryStream(1, packetStream,
0930: (int) packetFile.length());
0931: segmentInsert.setString(2, addsi.getName());
0932: segmentInsert.setLong(3, newVersion);
0933: segmentInsert.setLong(4, packetFile.length());
0934: if (segmentInsert.executeUpdate() != 1) {
0935: throw new SQLException(
0936: " Failed to insert packet " + addsi);
0937: }
0938: }
0939: addsi.setVersion(newVersion);
0940: if (log.isDebugEnabled())
0941: log.debug("DB Updated " + addsi);
0942: try {
0943: packetStream.close();
0944: } catch (Exception ex) {
0945: }
0946: try {
0947: packetFile.delete();
0948: } catch (Exception ex) {
0949: }
0950: } else {
0951: log.warn("Packet file does not exist "
0952: + packetFile.getPath());
0953: }
0954:
0955: } finally {
0956: try {
0957: packetStream.close();
0958: } catch (Exception ex) {
0959: }
0960: try {
0961: packetFile.delete();
0962: } catch (Exception ex) {
0963: }
0964: try {
0965: segmentUpdate.close();
0966: } catch (Exception ex) {
0967: }
0968: try {
0969: segmentInsert.close();
0970: } catch (Exception ex) {
0971: }
0972: }
0973:
0974: }
0975:
0976: private void removeDBSegment(Connection connection, SegmentInfo rmsi)
0977: throws SQLException {
0978: PreparedStatement segmentDelete = null;
0979: try {
0980: if (rmsi.isInDb()) {
0981: segmentDelete = connection
0982: .prepareStatement("delete from search_segments where name_ = ? and version_ = ?");
0983: segmentDelete.clearParameters();
0984: segmentDelete.setString(1, rmsi.getName());
0985: segmentDelete.setLong(2, rmsi.getVersion());
0986: segmentDelete.execute();
0987:
0988: String sharedSegment = getSharedFileName(
0989: rmsi.getName(), sharedStructuredStorage);
0990: if (sharedSegment != null) {
0991: File f = new File(sharedSegment);
0992: if (f.exists()) {
0993: f.delete();
0994: }
0995: }
0996:
0997: if (searchService.hasDiagnostics()) {
0998: log.info("\tRemoved Segment From Database [" + rmsi
0999: + "]");
1000: }
1001: }
1002: } finally {
1003: try {
1004: segmentDelete.close();
1005: } catch (Exception ex) {
1006: }
1007: }
1008: }
1009:
1010: /**
1011: * create a new local segment and mark its tiestamp
1012: */
1013: public SegmentInfo newSegment() throws IOException {
1014: File f = null;
1015: for (;;) {
1016: f = SegmentInfoImpl.getSegmentLocation(String
1017: .valueOf(System.currentTimeMillis()),
1018: localStructuredStorage, searchIndexDirectory);
1019: if (!f.exists()) {
1020: break;
1021: }
1022: }
1023: f.mkdirs();
1024:
1025: SegmentInfo si = SegmentInfoImpl.newLocalSegmentInfo(f,
1026: localStructuredStorage, searchIndexDirectory);
1027: si.setNew();
1028: si.setTimeStamp(System.currentTimeMillis());
1029:
1030: return si;
1031: }
1032:
1033: /**
1034: * get a list of local segments
1035: *
1036: * @return
1037: * @throws IOException
1038: */
1039: public List<SegmentInfo> getLocalSegments() throws IOException {
1040: List<SegmentInfo> l = new ArrayList<SegmentInfo>();
1041: File searchDir = new File(searchIndexDirectory);
1042: return getLocalSegments(searchDir, l);
1043: }
1044:
1045: /**
1046: * recurse into a list of segments
1047: *
1048: * @param searchDir
1049: * @param l
1050: * @return
1051: * @throws IOException
1052: */
1053: public List<SegmentInfo> getLocalSegments(File searchDir,
1054: List<SegmentInfo> l) throws IOException {
1055:
1056: File[] files = searchDir.listFiles();
1057: if (files != null) {
1058: for (int i = 0; i < files.length; i++) {
1059: if (files[i].isDirectory()) {
1060:
1061: SegmentInfo sgi = SegmentInfoImpl
1062: .newLocalSegmentInfo(files[i],
1063: localStructuredStorage,
1064: searchIndexDirectory);
1065: if (sgi.isClusterSegment()) {
1066: if (IndexReader.indexExists(files[i])) {
1067:
1068: if (sgi.isCreated()) {
1069: l.add(sgi);
1070: if (log.isDebugEnabled())
1071: log.debug("LO Segment " + sgi);
1072: } else {
1073: if (log.isDebugEnabled())
1074: log.debug("LO Segment not created "
1075: + sgi.toString());
1076: }
1077: } else {
1078: log
1079: .warn("Found Orphaned directory with no segment information present "
1080: + files[i]);
1081: }
1082:
1083: } else {
1084: l = getLocalSegments(files[i], l);
1085: }
1086: }
1087: }
1088: }
1089: return l;
1090: }
1091:
1092: /**
1093: * get a list of bad segmetns with brokenindexes
1094: *
1095: * @return
1096: * @throws IOException
1097: */
1098: private List<SegmentInfo> getBadLocalSegments() throws IOException {
1099: List<SegmentInfo> l = new ArrayList<SegmentInfo>();
1100: File searchDir = new File(searchIndexDirectory);
1101: return getBadLocalSegments(searchDir, l);
1102: }
1103:
1104: /**
1105: * get a list of segments that are ready for deletion
1106: *
1107: * @return
1108: * @throws IOException
1109: */
1110: private List<SegmentInfo> getDeletedLocalSegments()
1111: throws IOException {
1112: List<SegmentInfo> l = new ArrayList<SegmentInfo>();
1113: File searchDir = new File(searchIndexDirectory);
1114: return getDeletedLocalSegments(searchDir, l);
1115: }
1116:
1117: /**
1118: * recurse into a list of bad local segments
1119: *
1120: * @param searchDir
1121: * @param l
1122: * @return
1123: * @throws IOException
1124: */
1125: private List<SegmentInfo> getBadLocalSegments(File searchDir,
1126: List<SegmentInfo> l) throws IOException {
1127: if (searchDir.isDirectory()) {
1128: File[] files = searchDir.listFiles();
1129: if (files != null) {
1130: for (int i = 0; i < files.length; i++) {
1131: SegmentInfo sgi = SegmentInfoImpl
1132: .newLocalSegmentInfo(files[i],
1133: localStructuredStorage,
1134: searchIndexDirectory);
1135: if (sgi.isClusterSegment()) {
1136: if (sgi.isCreated()) {
1137: if (!IndexReader.indexExists(files[i])) {
1138: l.add(sgi);
1139: }
1140:
1141: }
1142: } else {
1143: l = getBadLocalSegments(files[i], l);
1144: }
1145: }
1146: }
1147: }
1148:
1149: return l;
1150: }
1151:
1152: /**
1153: * Get a list of segments to be deleted
1154: *
1155: * @param searchDir
1156: * @param l
1157: * @return
1158: * @throws IOException
1159: */
1160: private List<SegmentInfo> getDeletedLocalSegments(File searchDir,
1161: List<SegmentInfo> l) throws IOException {
1162: if (searchDir.isDirectory()) {
1163: File[] files = searchDir.listFiles();
1164: if (files != null) {
1165: for (int i = 0; i < files.length; i++) {
1166: SegmentInfo sgi = SegmentInfoImpl
1167: .newLocalSegmentInfo(files[i],
1168: localStructuredStorage,
1169: searchIndexDirectory);
1170: if (sgi.isClusterSegment()) {
1171: if (sgi.isDeleted()) {
1172: l.add(sgi);
1173: }
1174: } else {
1175: l = getDeletedLocalSegments(files[i], l);
1176: }
1177: }
1178: }
1179: }
1180: return l;
1181: }
1182:
1183: /**
1184: * recover from a failiure
1185: */
1186: private void recoverFromFailure() {
1187: log
1188: .error("Recover from Failiure is not implementated at the moment,"
1189: + " the local index is corrupt, please delete it and it will "
1190: + "reload from the database");
1191:
1192: }
1193:
1194: /**
1195: * @return Returns the dataSource.
1196: */
1197: public DataSource getDataSource() {
1198: return dataSource;
1199: }
1200:
1201: /**
1202: * @param dataSource
1203: * The dataSource to set.
1204: */
1205: public void setDataSource(DataSource dataSource) {
1206: this .dataSource = dataSource;
1207: }
1208:
1209: public void setLocation(String location) {
1210:
1211: searchIndexDirectory = location;
1212: log.info("Search Index Location is " + location);
1213:
1214: }
1215:
1216: /**
1217: * @param autoDdl
1218: * The autoDdl to set.
1219: */
1220: public void setAutoDdl(boolean autoDdl) {
1221: this .autoDdl = autoDdl;
1222: }
1223:
1224: /**
1225: * create a temporary index for indexing operations
1226: */
1227: public File getTemporarySegment(boolean delete) {
1228: // this index will not have a timestamp, and hence will not be part sync
1229: // with the db
1230: File f = new File(searchIndexDirectory, TEMP_INDEX_NAME);
1231: if (delete && f.exists()) {
1232: SegmentInfoImpl.deleteAll(f);
1233: }
1234: f.mkdirs();
1235: return f;
1236: }
1237:
1238: public void removeTemporarySegment() {
1239: File f = new File(searchIndexDirectory, TEMP_INDEX_NAME);
1240: if (f.exists()) {
1241: SegmentInfoImpl.deleteAll(f);
1242: }
1243: }
1244:
1245: public SegmentInfo saveTemporarySegment() throws IOException {
1246: SegmentInfo segInfo = newSegment();
1247: File s = new File(searchIndexDirectory, TEMP_INDEX_NAME);
1248: File d = segInfo.getSegmentLocation();
1249: copyAll(s, d);
1250: segInfo.setCreated();
1251: segInfo.touchSegment();
1252: return segInfo;
1253: }
1254:
1255: /**
1256: * Copy a file to a directory, if the source is a directory, the copy
1257: * recurses into the directory. If the destination does not exists, it is
1258: * assumed to be a file.
1259: *
1260: * @param s
1261: * the source file
1262: * @param d
1263: * the source directory
1264: * @throws IOException
1265: */
1266: private void copyAll(File s, File d) throws IOException {
1267: if (s.isDirectory()) {
1268: File[] fl = s.listFiles();
1269: for (int i = 0; i < fl.length; i++) {
1270: if (fl[i].isFile()) {
1271: copyFile(fl[i], d);
1272: } else {
1273: File nd = new File(d, fl[i].getName());
1274: nd.mkdirs();
1275: copyAll(fl[i], nd);
1276: }
1277: }
1278: } else {
1279: copyFile(s, d);
1280: }
1281: }
1282:
1283: /**
1284: * Copy a file from s to d, s will be a file, d may be a file or directory
1285: *
1286: * @param s
1287: * @param d
1288: * @throws IOException
1289: */
1290: private void copyFile(File s, File d) throws IOException {
1291: if (log.isDebugEnabled())
1292: log.debug("Copying " + s.getAbsolutePath() + " to "
1293: + d.getAbsolutePath());
1294: if (s.exists() && s.isFile()) {
1295: File t = d; // target
1296: if (d.isDirectory()) {
1297: if (!d.exists()) {
1298: d.mkdirs();
1299: }
1300: t = new File(d, s.getName());
1301: } else {
1302: File p = d.getParentFile();
1303: if (!p.exists()) {
1304: p.mkdirs();
1305: }
1306: }
1307: FileChannel srcChannel = null;
1308: FileChannel dstChannel = null;
1309: try {
1310: // use nio
1311: // Create channel on the source
1312: srcChannel = new FileInputStream(s).getChannel();
1313:
1314: // Create channel on the destination
1315: dstChannel = new FileOutputStream(t).getChannel();
1316:
1317: // Copy file contents from source to destination
1318: dstChannel.transferFrom(srcChannel, 0, srcChannel
1319: .size());
1320:
1321: // Close the channels
1322: } finally {
1323: try {
1324: srcChannel.close();
1325: } catch (Exception ex) {
1326:
1327: }
1328: try {
1329: dstChannel.close();
1330: } catch (Exception ex) {
1331:
1332: }
1333: }
1334:
1335: }
1336: }
1337:
1338: public void recoverSegment(SegmentInfo recoverSegInfo) {
1339:
1340: recoverSegInfo.setDeleted();
1341: recoverSegInfo.doFinalDelete();
1342: recoverSegInfo.setNew();
1343: Connection connection = null;
1344: try {
1345: connection = dataSource.getConnection();
1346: updateLocalSegment(connection, recoverSegInfo);
1347: // we also need to re-apply the patch
1348: updateLocalPatch(connection);
1349: connection.commit();
1350: } catch (Exception ex) {
1351: try {
1352: connection.rollback();
1353: } catch (Exception e) {
1354: }
1355: throw new RuntimeException(
1356: "Failed to recover dammaged segment ", ex);
1357:
1358: } finally {
1359: try {
1360: connection.close();
1361: } catch (Exception e) {
1362:
1363: }
1364: }
1365:
1366: }
1367:
1368: protected void updateLocalPatch(Connection connection)
1369: throws SQLException, IOException {
1370: if (localSegmentsOnly) {
1371: log
1372: .warn("Update Patch Requested with inactive Shared Storage ");
1373: } else {
1374: if (sharedSegments == null || sharedSegments.length() == 0) {
1375: updateLocalPatchBLOB(connection);
1376: } else {
1377: updateLocalPatchFilesystem(connection);
1378: }
1379: }
1380: }
1381:
1382: protected void updateLocalPatchFilesystem(Connection connection)
1383: throws SQLException, IOException {
1384: if (log.isDebugEnabled())
1385: log.debug("Updating local patch ");
1386: PreparedStatement segmentSelect = null;
1387: ResultSet resultSet = null;
1388: try {
1389: segmentSelect = connection
1390: .prepareStatement("select version_ from search_segments where name_ = ?");
1391: segmentSelect.clearParameters();
1392: segmentSelect.setString(1, INDEX_PATCHNAME);
1393: resultSet = segmentSelect.executeQuery();
1394: if (resultSet.next()) {
1395: InputStream packetStream = null;
1396: try {
1397: long version = resultSet.getLong(1);
1398: File f = new File(getSharedFileName(
1399: INDEX_PATCHNAME, sharedStructuredStorage));
1400: if (f.exists()) {
1401: packetStream = new FileInputStream(f);
1402: clusterStorage.unpackPatch(packetStream);
1403: if (log.isDebugEnabled())
1404: log.debug("Updated Patch ");
1405: } else {
1406: log.warn("Shared Segment File does not exist "
1407: + f.getPath());
1408: }
1409: } finally {
1410: try {
1411: packetStream.close();
1412: } catch (Exception ex) {
1413: }
1414: }
1415: } else {
1416: if (log.isDebugEnabled())
1417: log
1418: .debug("Didnt find patch in database, this is Ok");
1419: }
1420: } finally {
1421: try {
1422: resultSet.close();
1423: } catch (Exception ex) {
1424: }
1425: try {
1426: segmentSelect.close();
1427: } catch (Exception ex) {
1428: }
1429: }
1430:
1431: }
1432:
1433: private void updateLocalPatchBLOB(Connection connection)
1434: throws SQLException, IOException {
1435: if (log.isDebugEnabled())
1436: log.debug("Updating local patch ");
1437: PreparedStatement segmentSelect = null;
1438: ResultSet resultSet = null;
1439: try {
1440: segmentSelect = connection
1441: .prepareStatement("select version_, packet_ from search_segments where name_ = ?");
1442: segmentSelect.clearParameters();
1443: segmentSelect.setString(1, INDEX_PATCHNAME);
1444: resultSet = segmentSelect.executeQuery();
1445: if (resultSet.next()) {
1446: InputStream packetStream = null;
1447: try {
1448: long version = resultSet.getLong(1);
1449: packetStream = resultSet.getBinaryStream(2);
1450: clusterStorage.unpackPatch(packetStream);
1451: if (log.isDebugEnabled())
1452: log.debug("Updated Patch from DB " + version);
1453: } finally {
1454: try {
1455: packetStream.close();
1456: } catch (Exception ex) {
1457: }
1458: }
1459: } else {
1460: if (log.isDebugEnabled())
1461: log
1462: .debug("Didnt find patch in database, this is Ok ");
1463: }
1464: } finally {
1465: try {
1466: resultSet.close();
1467: } catch (Exception ex) {
1468: }
1469: try {
1470: segmentSelect.close();
1471: } catch (Exception ex) {
1472: }
1473: }
1474:
1475: }
1476:
1477: public String getSegmentName(String segmentPath) {
1478: File f = new File(segmentPath);
1479: return f.getName();
1480: }
1481:
1482: /**
1483: * @return Returns the validate.
1484: */
1485: public boolean isValidate() {
1486: return validate;
1487: }
1488:
1489: /**
1490: * @param validate
1491: * The validate to set.
1492: */
1493: public void setValidate(boolean validate) {
1494: this .validate = validate;
1495: }
1496:
1497: /**
1498: * updat this save this local segment into the db
1499: *
1500: * @param connection
1501: * @param addsi
1502: */
1503: protected void updateDBSegmentFilesystem(Connection connection,
1504: SegmentInfo addsi) throws SQLException, IOException {
1505:
1506: PreparedStatement segmentUpdate = null;
1507: PreparedStatement segmentInsert = null;
1508: FileChannel packetStream = null;
1509: FileChannel sharedStream = null;
1510: File packetFile = null;
1511: File sharedFinalFile = null;
1512: File sharedTempFile = null;
1513: long newVersion = System.currentTimeMillis();
1514: try {
1515: sharedTempFile = new File(getSharedTempFileName(addsi
1516: .getName()));
1517: sharedFinalFile = new File(getSharedFileName(addsi
1518: .getName(), sharedStructuredStorage));
1519: packetFile = clusterStorage.packSegment(addsi, newVersion);
1520: if (packetFile.exists()) {
1521: packetStream = new FileInputStream(packetFile)
1522: .getChannel();
1523: sharedTempFile.getParentFile().mkdirs();
1524: sharedStream = new FileOutputStream(sharedTempFile)
1525: .getChannel();
1526:
1527: // Copy file contents from source to destination
1528: sharedStream.transferFrom(packetStream, 0, packetStream
1529: .size());
1530:
1531: packetStream.close();
1532: sharedStream.close();
1533:
1534: segmentUpdate = connection
1535: .prepareStatement("update search_segments set version_ = ?, size_ = ? where name_ = ? and version_ = ?");
1536: segmentInsert = connection
1537: .prepareStatement("insert into search_segments ( name_, version_, size_ ) values ( ?,?,?)");
1538: if (addsi.isInDb()) {
1539: segmentUpdate.clearParameters();
1540: segmentUpdate.setLong(1, newVersion);
1541: segmentUpdate.setLong(2, packetFile.length());
1542: segmentUpdate.setString(3, addsi.getName());
1543: segmentUpdate.setLong(4, addsi.getVersion());
1544: if (segmentUpdate.executeUpdate() != 1) {
1545: throw new SQLException(
1546: " ant Find packet to update " + addsi);
1547: }
1548: } else {
1549: segmentInsert.clearParameters();
1550: segmentInsert.setString(1, addsi.getName());
1551: segmentInsert.setLong(2, newVersion);
1552: segmentInsert.setLong(3, packetFile.length());
1553: if (segmentInsert.executeUpdate() != 1) {
1554: throw new SQLException(
1555: " Failed to insert packet " + addsi);
1556: }
1557: }
1558: addsi.setVersion(newVersion);
1559: sharedFinalFile.getParentFile().mkdirs();
1560: sharedTempFile.renameTo(sharedFinalFile);
1561: log.info("DB Updated " + addsi);
1562: } else {
1563: log.warn("Packet file does not exist "
1564: + packetFile.getPath());
1565: }
1566:
1567: } finally {
1568: try {
1569: packetStream.close();
1570: } catch (Exception ex) {
1571: }
1572: try {
1573: packetFile.delete();
1574: } catch (Exception ex) {
1575: }
1576: try {
1577: sharedStream.close();
1578: } catch (Exception ex) {
1579: }
1580: try {
1581: sharedTempFile.delete();
1582: } catch (Exception ex) {
1583: }
1584: try {
1585: segmentUpdate.close();
1586: } catch (Exception ex) {
1587: }
1588: try {
1589: segmentInsert.close();
1590: } catch (Exception ex) {
1591: }
1592: }
1593:
1594: }
1595:
1596: private String getSharedFileName(String name, boolean structured) {
1597: if (localSegmentsOnly) {
1598: return null;
1599: }
1600: if (sharedSegments != null && sharedSegments.length() > 0) {
1601: if (!sharedSegments.endsWith("/")) {
1602: sharedSegments = sharedSegments + "/";
1603: }
1604: if (structured && !INDEX_PATCHNAME.equals(name)) {
1605: String hashName = name.substring(name.length() - 4,
1606: name.length() - 2);
1607: return sharedSegments + hashName + "/" + name + ".zip";
1608: } else {
1609: return sharedSegments + name + ".zip";
1610: }
1611: }
1612: return null;
1613: }
1614:
1615: private String getSharedTempFileName(String name) {
1616:
1617: if (sharedSegments != null && sharedSegments.length() > 0) {
1618: if (!sharedSegments.endsWith("/")) {
1619: sharedSegments = sharedSegments + "/";
1620: }
1621: return sharedSegments + name + ".zip."
1622: + System.currentTimeMillis();
1623: }
1624: return null;
1625: }
1626:
1627: /**
1628: * updte a segment from the database
1629: *
1630: * @param connection
1631: * @param addsi
1632: */
1633: protected void updateLocalSegmentFilesystem(Connection connection,
1634: SegmentInfo addsi) throws SQLException, IOException {
1635: if (log.isDebugEnabled())
1636: log.debug("Updating local segment from databse " + addsi);
1637: PreparedStatement segmentSelect = null;
1638: ResultSet resultSet = null;
1639: try {
1640: segmentSelect = connection
1641: .prepareStatement("select version_ from search_segments where name_ = ?");
1642: segmentSelect.clearParameters();
1643: segmentSelect.setString(1, addsi.getName());
1644: resultSet = segmentSelect.executeQuery();
1645: if (resultSet.next()) {
1646: InputStream packetStream = null;
1647: try {
1648: long version = resultSet.getLong(1);
1649: File f = new File(getSharedFileName(
1650: addsi.getName(), sharedStructuredStorage));
1651: if (f.exists()) {
1652: packetStream = new FileInputStream(f);
1653: clusterStorage.unpackSegment(addsi,
1654: packetStream, version);
1655: if (log.isDebugEnabled())
1656: log.debug("Updated Local " + addsi);
1657: } else {
1658: log.warn("Shared Segment file is missing "
1659: + f.getPath());
1660: }
1661: } finally {
1662: try {
1663: packetStream.close();
1664: } catch (Exception ex) {
1665: }
1666: }
1667: } else {
1668: log.error("Didnt find segment in database");
1669: }
1670: } finally {
1671: try {
1672: resultSet.close();
1673: } catch (Exception ex) {
1674: }
1675: try {
1676: segmentSelect.close();
1677: } catch (Exception ex) {
1678: }
1679: }
1680:
1681: }
1682:
1683: public String getSharedSegments() {
1684: return sharedSegments;
1685: }
1686:
1687: public void setSharedSegments(String sharedSegments) {
1688: this .sharedSegments = sharedSegments;
1689: }
1690:
1691: public void dolog(String message) {
1692: if (debug) {
1693: log.info("JDBCClusterDebug :" + message);
1694: } else if (log.isDebugEnabled()) {
1695: log.debug("JDBCClusterDebug :" + message);
1696: }
1697: }
1698:
1699: public long getLastUpdate() {
1700: PreparedStatement segmentSelect = null;
1701: ResultSet resultSet = null;
1702: Connection connection = null;
1703:
1704: try {
1705: connection = dataSource.getConnection();
1706: segmentSelect = connection
1707: .prepareStatement("select version_ from search_segments order by version_ desc");
1708: segmentSelect.clearParameters();
1709: resultSet = segmentSelect.executeQuery();
1710: if (resultSet.next()) {
1711: return resultSet.getLong(1);
1712: } else {
1713: return 0;
1714: }
1715: } catch (Exception ex) {
1716: log.warn(" Cant find last update time "
1717: + ex.getClass().getName() + ":" + ex.getMessage());
1718: return 0;
1719: } finally {
1720: try {
1721: resultSet.close();
1722: } catch (Exception ex) {
1723: }
1724: try {
1725: segmentSelect.close();
1726: } catch (Exception ex) {
1727: }
1728: try {
1729: connection.close();
1730: } catch (Exception ex) {
1731:
1732: }
1733: }
1734: }
1735:
1736: public List getSegmentInfoList() {
1737: List seginfo = new ArrayList();
1738: try {
1739:
1740: File searchDir = new File(searchIndexDirectory);
1741: long tsize = getSegmentInfoList(searchDir, seginfo);
1742: String size = null;
1743: if (tsize > 1024 * 1024 * 10) {
1744: size = String.valueOf(tsize / (1024 * 1024)) + "MB";
1745: } else if (tsize >= 1024 * 1024) {
1746: size = String.valueOf(tsize / (1024 * 1024)) + "."
1747: + String.valueOf(tsize / (102 * 1024) + "MB");
1748: } else {
1749: size = String.valueOf(tsize / (1024)) + "KB";
1750: }
1751: seginfo.add(new Object[] { "Total", size, "" });
1752:
1753: } catch (Exception ex) {
1754: seginfo.add("Failed to get Segment Info list "
1755: + ex.getClass().getName() + " " + ex.getMessage());
1756: }
1757: return seginfo;
1758:
1759: }
1760:
1761: public long getSegmentInfoList(File searchDir, List seginfo) {
1762:
1763: File[] files = searchDir.listFiles();
1764: long tsize = 0;
1765: if (files != null) {
1766: for (int i = 0; i < files.length; i++) {
1767: if (files[i].isDirectory()) {
1768:
1769: SegmentInfo sgi = SegmentInfoImpl
1770: .newLocalSegmentInfo(files[i],
1771: localStructuredStorage,
1772: searchIndexDirectory);
1773: if (sgi != null || sgi.isClusterSegment()) {
1774: String name = files[i].getName();
1775: long lsize = sgi.getLocalSegmentSize();
1776: tsize += lsize;
1777: long ts = sgi.getLocalSegmentLastModified();
1778: String lastup = (new Date(ts)).toString();
1779:
1780: String size = null;
1781: if (lsize > 1024 * 1024 * 10) {
1782: size = String
1783: .valueOf(lsize / (1024 * 1024))
1784: + "MB";
1785: } else if (lsize >= 1024 * 1024) {
1786: size = String
1787: .valueOf(lsize / (1024 * 1024))
1788: + "."
1789: + String.valueOf(lsize
1790: / (102 * 1024) + "MB");
1791: } else {
1792: size = String.valueOf(lsize / (1024))
1793: + "KB";
1794: }
1795: seginfo
1796: .add(new Object[] { name, size, lastup });
1797: } else {
1798: tsize += getSegmentInfoList(files[i], seginfo);
1799: }
1800: }
1801: }
1802: }
1803: return tsize;
1804:
1805: }
1806:
1807: private void migrateSharedSegments() {
1808: if (localSegmentsOnly) {
1809: return;
1810: }
1811: if (sharedSegments != null && sharedSegments.length() > 0) {
1812: Connection connection = null;
1813: try {
1814: connection = dataSource.getConnection();
1815:
1816: List l = getDBSegments(connection);
1817: for (Iterator li = l.iterator(); li.hasNext();) {
1818: SegmentInfo si = (SegmentInfo) li.next();
1819: String shared = getSharedFileName(si.getName(),
1820: !sharedStructuredStorage);
1821: File f = new File(shared);
1822: if (f.exists()) {
1823: File fnew = new File(getSharedFileName(si
1824: .getName(), sharedStructuredStorage));
1825: fnew.getParentFile().mkdirs();
1826: log.info("Moving " + f.getPath() + " to "
1827: + fnew.getPath());
1828: try {
1829: f.renameTo(fnew);
1830: } catch (Exception ex) {
1831: log.warn("Failed " + ex.getMessage());
1832: if (log.isDebugEnabled())
1833: log.debug("Debug Failure ", ex);
1834: }
1835: }
1836: }
1837: connection.commit();
1838: } catch (Exception ex) {
1839: try {
1840: connection.rollback();
1841: } catch (Exception ex1) {
1842:
1843: }
1844:
1845: } finally {
1846: try {
1847: connection.close();
1848: } catch (Exception ex) {
1849: }
1850: }
1851: }
1852:
1853: }
1854:
1855: private void migrateLocalSegments() throws IOException {
1856: List<SegmentInfo> l = getLocalSegments();
1857: for (Iterator<SegmentInfo> li = l.iterator(); li.hasNext();) {
1858: SegmentInfo si = li.next();
1859: File f = SegmentInfoImpl.getSegmentLocation(si.getName(),
1860: !localStructuredStorage, searchIndexDirectory);
1861: if (f.exists()) {
1862: File fnew = SegmentInfoImpl.getSegmentLocation(si
1863: .getName(), localStructuredStorage,
1864: searchIndexDirectory);
1865: fnew.getParentFile().mkdirs();
1866: log.info("Moving " + f.getPath() + " to "
1867: + fnew.getPath());
1868: try {
1869: f.renameTo(fnew);
1870: } catch (Exception ex) {
1871: log.warn("Failed " + ex.getMessage());
1872: if (log.isDebugEnabled())
1873: log.debug("Debug Failure ", ex);
1874: }
1875: }
1876: }
1877: }
1878:
1879: public void getLock() {
1880: if (parallelIndex) {
1881: throw new RuntimeException(
1882: "Parallel index is not implemented yet");
1883: }
1884:
1885: }
1886:
1887: public void releaseLock() {
1888: if (parallelIndex) {
1889: throw new RuntimeException(
1890: "Parallel index is not implemented yet");
1891: }
1892: }
1893:
1894: public boolean isMultipleIndexers() {
1895: return parallelIndex;
1896: }
1897:
1898: public boolean isParallelIndex() {
1899: return parallelIndex;
1900: }
1901:
1902: public void setParallelIndex(boolean parallelIndex) {
1903: this .parallelIndex = parallelIndex;
1904: }
1905:
1906: /**
1907: * @return Returns the localStructuredStorage.
1908: */
1909: public boolean isLocalStructuredStorage() {
1910: return localStructuredStorage;
1911: }
1912:
1913: /**
1914: * @param localStructuredStorage
1915: * The localStructuredStorage to set.
1916: */
1917: public void setLocalStructuredStorage(boolean localStructuredStorage) {
1918: this .localStructuredStorage = localStructuredStorage;
1919: }
1920:
1921: /**
1922: * @return Returns the sharedStructuredStorage.
1923: */
1924: public boolean isSharedStructuredStorage() {
1925: return sharedStructuredStorage;
1926: }
1927:
1928: /**
1929: * @param sharedStructuredStorage
1930: * The sharedStructuredStorage to set.
1931: */
1932: public void setSharedStructuredStorage(
1933: boolean sharedStructuredStorage) {
1934: this .sharedStructuredStorage = sharedStructuredStorage;
1935: }
1936:
1937: /**
1938: * @return the localSegmentsOnly
1939: */
1940: public boolean isLocalSegmentsOnly() {
1941: return localSegmentsOnly;
1942: }
1943:
1944: /**
1945: * @param localSegmentsOnly
1946: * the localSegmentsOnly to set
1947: */
1948: public void setLocalSegmentsOnly(boolean localSegmentsOnly) {
1949: this .localSegmentsOnly = localSegmentsOnly;
1950: }
1951:
1952: /**
1953: * @return the searchService
1954: */
1955: public SearchService getSearchService() {
1956: return searchService;
1957: }
1958:
1959: /**
1960: * @param searchService
1961: * the searchService to set
1962: */
1963: public void setSearchService(SearchService searchService) {
1964: this .searchService = searchService;
1965: }
1966:
1967: /* (non-Javadoc)
1968: * @see org.sakaiproject.search.index.ClusterFilesystem#centralIndexExists()
1969: */
1970: public boolean centralIndexExists() {
1971: Connection connection = null;
1972: try {
1973: connection = dataSource.getConnection();
1974: List l = getDBSegments(connection);
1975: if (l != null && l.size() > 0) {
1976: return true;
1977: }
1978: return false;
1979: } catch (SQLException e) {
1980: return false;
1981: } finally {
1982: try {
1983: connection.close();
1984: } catch (Exception ex) {
1985: }
1986: }
1987: }
1988:
1989: }
|