0001: /**********************************************************************************
0002: * $URL: https://source.sakaiproject.org/svn/search/tags/sakai_2-4-1/search-impl/impl/src/java/org/sakaiproject/search/component/service/impl/SearchIndexBuilderWorkerImpl.java $
0003: * $Id: SearchIndexBuilderWorkerImpl.java 29635 2007-04-26 14:44:09Z ajpoland@iupui.edu $
0004: ***********************************************************************************
0005: *
0006: * Copyright (c) 2003, 2004, 2005, 2006 The Sakai Foundation.
0007: *
0008: * Licensed under the Educational Community License, Version 1.0 (the "License");
0009: * you may not use this file except in compliance with the License.
0010: * You may obtain a copy of the License at
0011: *
0012: * http://www.opensource.org/licenses/ecl1.php
0013: *
0014: * Unless required by applicable law or agreed to in writing, software
0015: * distributed under the License is distributed on an "AS IS" BASIS,
0016: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0017: * See the License for the specific language governing permissions and
0018: * limitations under the License.
0019: *
0020: **********************************************************************************/package org.sakaiproject.search.component.service.impl;
0021:
0022: import java.sql.Connection;
0023: import java.sql.PreparedStatement;
0024: import java.sql.ResultSet;
0025: import java.sql.SQLException;
0026: import java.sql.Timestamp;
0027: import java.util.ArrayList;
0028: import java.util.HashMap;
0029: import java.util.List;
0030:
0031: import javax.sql.DataSource;
0032:
0033: import org.apache.commons.id.IdentifierGenerator;
0034: import org.apache.commons.id.uuid.UUID;
0035: import org.apache.commons.id.uuid.VersionFourGenerator;
0036: import org.apache.commons.logging.Log;
0037: import org.apache.commons.logging.LogFactory;
0038: import org.hibernate.HibernateException;
0039: import org.sakaiproject.component.api.ComponentManager;
0040: import org.sakaiproject.component.cover.ServerConfigurationService;
0041: import org.sakaiproject.entity.api.EntityManager;
0042: import org.sakaiproject.event.api.EventTrackingService;
0043: import org.sakaiproject.search.api.SearchIndexBuilder;
0044: import org.sakaiproject.search.api.SearchIndexBuilderWorker;
0045: import org.sakaiproject.search.api.SearchService;
0046: import org.sakaiproject.search.dao.SearchIndexBuilderWorkerDao;
0047: import org.sakaiproject.search.model.SearchBuilderItem;
0048: import org.sakaiproject.search.model.SearchWriterLock;
0049: import org.sakaiproject.search.model.impl.SearchWriterLockImpl;
0050: import org.sakaiproject.tool.api.SessionManager;
0051: import org.sakaiproject.user.api.User;
0052: import org.sakaiproject.user.api.UserDirectoryService;
0053:
0054: public class SearchIndexBuilderWorkerImpl implements Runnable,
0055: SearchIndexBuilderWorker {
0056:
0057: /**
0058: * The lock we use to ensure single search index writer
0059: */
0060: public static final String LOCKKEY = "searchlockkey";
0061:
0062: protected static final Object GLOBAL_CONTEXT = null;
0063:
0064: private static final String NO_NODE = "none";
0065:
0066: private static final String NODE_LOCK = "nodelockkey";
0067:
0068: private static Log log = LogFactory
0069: .getLog(SearchIndexBuilderWorkerImpl.class);
0070:
0071: private final int numThreads = 2;
0072:
0073: /**
0074: * The maximum sleep time for the wait/notify semaphore
0075: */
0076: public long sleepTime = 5L * 60000L;
0077:
0078: /**
0079: * A load factor 1 is full load, 100 is normal The load factor controlls the
0080: * backoff of the indexer threads. If the load Factor is high, the search
0081: * threads back off more.
0082: */
0083: private long loadFactor = 1000L;
0084:
0085: /**
0086: * The currently running index Builder thread
0087: */
0088: private Thread indexBuilderThread[] = new Thread[numThreads];
0089:
0090: /**
0091: * sync object
0092: */
0093: private Object threadStartLock = new Object();
0094:
0095: /**
0096: * dependency: the search index builder that is accepting new items
0097: */
0098: private SearchIndexBuilderImpl searchIndexBuilder = null;
0099:
0100: /**
0101: * dependency: the current search service, used to get the location of the
0102: * index
0103: */
0104: private SearchService searchService = null;
0105:
0106: private DataSource dataSource = null;
0107:
0108: private IdentifierGenerator idgenerator = new VersionFourGenerator();
0109:
0110: /**
0111: * Semaphore
0112: */
0113: private Object sem = new Object();
0114:
0115: /**
0116: * The number of items to process in a batch, default = 100
0117: */
0118: private int indexBatchSize = 100;
0119:
0120: private boolean enabled = false;
0121:
0122: private SessionManager sessionManager;
0123:
0124: private UserDirectoryService userDirectoryService;
0125:
0126: private EntityManager entityManager;
0127:
0128: private EventTrackingService eventTrackingService;
0129:
0130: private boolean runThreads = false;
0131:
0132: private ThreadLocal nodeIDHolder = new ThreadLocal();
0133:
0134: private SearchIndexBuilderWorkerDao searchIndexBuilderWorkerDao = null;
0135:
0136: private long lastLock = System.currentTimeMillis();
0137:
0138: /**
0139: * Activity acts as a counter that indicates the number of events recieved.
0140: * The Workers may consult this to determine if they should run.
0141: */
0142: private int activity = 0;
0143:
0144: private long lastEvent = System.currentTimeMillis();
0145:
0146: private long lastIndex;
0147:
0148: private long startDocIndex;
0149:
0150: private String nowIndexing;
0151:
0152: private String lastIndexing;
0153:
0154: private boolean soakTest = false;
0155:
0156: private boolean diagnostics = false;
0157:
0158: private boolean started = false;
0159:
0160: private boolean indexExists = false;
0161:
0162: private static HashMap nodeIDList = new HashMap();;
0163:
0164: private static String lockedTo = null;
0165:
0166: private static String SELECT_LOCK_SQL = "select id, nodename, "
0167: + "lockkey, expires from searchwriterlock where lockkey = ?";
0168:
0169: private static String UPDATE_LOCK_SQL = "update searchwriterlock set "
0170: + "nodename = ?, expires = ? where id = ? "
0171: + "and nodename = ? and lockkey = ? ";
0172:
0173: private static String INSERT_LOCK_SQL = "insert into searchwriterlock "
0174: + "( id,nodename,lockkey, expires ) values ( ?, ?, ?, ? )";
0175:
0176: private static String COUNT_WORK_SQL = " select count(*) "
0177: + "from searchbuilderitem where searchstate = ? ";
0178:
0179: private static String CLEAR_LOCK_SQL = "update searchwriterlock "
0180: + "set nodename = ?, expires = ? where nodename = ? and lockkey = ? ";
0181:
0182: private static String SELECT_NODE_LOCK_SQL = "select id, nodename, "
0183: + "lockkey, expires from searchwriterlock where lockkey like '"
0184: + NODE_LOCK + "%'";
0185:
0186: private static String UPDATE_NODE_LOCK_SQL = "update searchwriterlock set "
0187: + "expires = ? where nodename = ? and lockkey = ? ";
0188:
0189: private static final String SELECT_EXPIRED_NODES_SQL = "select id from searchwriterlock "
0190: + "where lockkey like '"
0191: + NODE_LOCK
0192: + "%' and expires < ? ";
0193:
0194: private static final String DELETE_LOCKNODE_SQL = "delete from searchwriterlock "
0195: + "where id = ? ";
0196:
0197: public void init() {
0198: if (started && !runThreads) {
0199: log.warn("JVM Shutdown in progress, will not startup");
0200: return;
0201: }
0202: if (org.sakaiproject.component.cover.ComponentManager
0203: .hasBeenClosed()) {
0204: log
0205: .warn("Component manager Shutdown in progress, will not startup");
0206: return;
0207: }
0208: started = true;
0209: runThreads = true;
0210: ComponentManager cm = org.sakaiproject.component.cover.ComponentManager
0211: .getInstance();
0212: eventTrackingService = (EventTrackingService) load(cm,
0213: EventTrackingService.class.getName());
0214: entityManager = (EntityManager) load(cm, EntityManager.class
0215: .getName());
0216: userDirectoryService = (UserDirectoryService) load(cm,
0217: UserDirectoryService.class.getName());
0218: searchIndexBuilder = (SearchIndexBuilderImpl) load(cm,
0219: SearchIndexBuilder.class.getName());
0220: searchService = (SearchService) load(cm, SearchService.class
0221: .getName());
0222:
0223: sessionManager = (SessionManager) load(cm, SessionManager.class
0224: .getName());
0225:
0226: enabled = "true".equals(ServerConfigurationService.getString(
0227: "search.enable", "false"));
0228:
0229: enabled = enabled
0230: & "true".equals(ServerConfigurationService.getString(
0231: "search.indexbuild", "true"));
0232: try {
0233: if (searchIndexBuilder == null) {
0234: log
0235: .error("Search Index Worker needs SearchIndexBuilder ");
0236: }
0237: if (searchService == null) {
0238: log.error("Search Index Worker needs SearchService ");
0239: }
0240: if (searchIndexBuilderWorkerDao == null) {
0241: log
0242: .error("Search Index Worker needs SearchIndexBuilderWorkerDao ");
0243: }
0244: if (eventTrackingService == null) {
0245: log
0246: .error("Search Index Worker needs EventTrackingService ");
0247: }
0248: if (entityManager == null) {
0249: log.error("Search Index Worker needs EntityManager ");
0250: }
0251: if (userDirectoryService == null) {
0252: log
0253: .error("Search Index Worker needs UserDirectortyService ");
0254: }
0255: if (sessionManager == null) {
0256: log.error("Search Index Worker needs SessionManager ");
0257: }
0258: log.debug("init start");
0259: for (int i = 0; i < indexBuilderThread.length; i++) {
0260: indexBuilderThread[i] = new Thread(this );
0261: indexBuilderThread[i].setName("SearchBuilder_"
0262: + String.valueOf(i));
0263: indexBuilderThread[i].start();
0264: }
0265:
0266: /*
0267: * Capture shutdown
0268: */
0269: Runtime.getRuntime().addShutdownHook(new Thread() {
0270: /*
0271: * (non-Javadoc)
0272: *
0273: * @see java.lang.Thread#run()
0274: */
0275: @Override
0276: public void run() {
0277: runThreads = false;
0278: }
0279: });
0280:
0281: } catch (Throwable t) {
0282: log.error("Failed to init ", t);
0283: }
0284: }
0285:
0286: private Object load(ComponentManager cm, String name) {
0287: Object o = cm.get(name);
0288: if (o == null) {
0289: log.error("Cant find Spring component named " + name);
0290: }
0291: return o;
0292: }
0293:
0294: /**
0295: * Main run target of the worker thread {@inheritDoc}
0296: */
0297: public void run() {
0298: if (!enabled)
0299: return;
0300:
0301: String threadName = Thread.currentThread().getName();
0302: String tn = threadName.substring("SearchBuilder_".length());
0303: log.debug("Index Builder Run " + tn + "_" + threadName);
0304: int threadno = Integer.parseInt(tn);
0305:
0306: String nodeID = getNodeID();
0307:
0308: org.sakaiproject.component.cover.ComponentManager
0309: .waitTillConfigured();
0310:
0311: try {
0312:
0313: while (runThreads) {
0314: log.debug("Run Processing Thread");
0315: boolean locked = false;
0316: org.sakaiproject.tool.api.Session s = null;
0317: if (s == null) {
0318: s = sessionManager.startSession();
0319: User u = userDirectoryService.getUser("admin");
0320: s.setUserId(u.getId());
0321: }
0322:
0323: while (runThreads) {
0324: sessionManager.setCurrentSession(s);
0325: try {
0326: int totalDocs = searchIndexBuilder
0327: .getPendingDocuments();
0328: long lastEvent = getLastEventTime();
0329: int activity = getActivity();
0330: long now = System.currentTimeMillis();
0331: long interval = now - lastEvent;
0332: boolean process = false;
0333: boolean createIndex = false;
0334: if (!indexExists) {
0335: if (!searchIndexBuilderWorkerDao
0336: .indexExists()) {
0337: process = true;
0338: createIndex = true;
0339: log
0340: .debug("No cluster Index exists, creating for the first time");
0341: } else {
0342: indexExists = true;
0343: }
0344: } else {
0345:
0346: // if activity == totalDocs and interval > 10
0347: if (totalDocs > 200) {
0348: loadFactor = 10L;
0349: } else {
0350: loadFactor = 1000L;
0351: }
0352: if (totalDocs == 0) {
0353: process = false;
0354: } else if (totalDocs < 20
0355: && interval > (20 * loadFactor)) {
0356: process = true;
0357: } else if (totalDocs >= 20
0358: && totalDocs < 50
0359: && interval > (10 * loadFactor)) {
0360: process = true;
0361: } else if (totalDocs >= 50
0362: && totalDocs < 90
0363: && interval > (5 * loadFactor)) {
0364: process = true;
0365: } else if (totalDocs > ((90 * loadFactor) / 1000)) {
0366: process = true;
0367: }
0368: if (process) {
0369: resetActivity();
0370: }
0371: }
0372:
0373: // should this node consider taking the lock ?
0374: long lastLockInterval = (System
0375: .currentTimeMillis() - lastLock);
0376: long lastLockMetric = lastLockInterval
0377: * totalDocs;
0378:
0379: // if we have 1000 docs, then indexing should happen
0380: // after 10 seconds break
0381: // 1000*10000 10000000
0382: // 500 docs/ 20 seconds
0383: //
0384:
0385: // make certain that we are alive
0386: log
0387: .debug("Activity "
0388: + (lastLockMetric > (10000L * loadFactor))
0389: + " "
0390: + (lastLockInterval > (60L * loadFactor))
0391: + " " + createIndex);
0392:
0393: if (lastLockMetric > (10000L * loadFactor)
0394: || lastLockInterval > (60L * loadFactor)
0395: || createIndex) {
0396: log.debug("===" + process
0397: + "=============PROCESSING ");
0398: if (process
0399: && getLockTransaction(
0400: 2L * 60L * 1000L,
0401: createIndex)) {
0402:
0403: log.debug("===" + nodeID
0404: + "=============PROCESSING ");
0405: if (lockedTo != null
0406: && lockedTo.equals(nodeID)) {
0407: log
0408: .error("+++++++++++++++Local Lock Collision+++++++++++++");
0409: }
0410: lockedTo = nodeID;
0411:
0412: lastLock = System.currentTimeMillis();
0413:
0414: if (createIndex) {
0415: log
0416: .info("=======================Search Index being created for the first time");
0417: searchIndexBuilderWorkerDao
0418: .createIndexTransaction(this );
0419: indexExists = true;
0420: log
0421: .info("=======================Done creating Search Index for the first time");
0422:
0423: } else {
0424: int batchSize = 100;
0425: if (totalDocs > 500) {
0426: batchSize = 200;
0427: } else if (totalDocs > 1000) {
0428: batchSize = 500;
0429: } else if (totalDocs > 10000) {
0430: batchSize = 1000;
0431: }
0432: searchIndexBuilderWorkerDao
0433: .processToDoListTransaction(
0434: this , batchSize);
0435:
0436: }
0437:
0438: lastLock = System.currentTimeMillis();
0439:
0440: if (lockedTo.equals(nodeID)) {
0441: lockedTo = null;
0442: } else {
0443: log
0444: .error("+++++++++++++++++++++++++++Lost Local Lock+++++++++++");
0445: }
0446: log.debug("===" + nodeID
0447: + "=============COMPLETED ");
0448:
0449: } else {
0450: break;
0451: }
0452: } else {
0453: // make certain the node updates hearbeat
0454: updateNodeLock(2L * 60L * 1000L);
0455: log
0456: .debug("Not taking Lock, too much activity");
0457: break;
0458: }
0459: } finally {
0460: clearLockTransaction();
0461: }
0462: }
0463: // this is here force cluster members
0464: // this will not reload the index on this node as
0465: if (indexExists) {
0466: try {
0467: searchService.reload();
0468: } catch (Exception ex) {
0469: log
0470: .info("No Search Segment exists at present, this is Ok on first start :"
0471: + ex.getMessage());
0472: }
0473: }
0474: if (!runThreads) {
0475: break;
0476: }
0477: try {
0478: log.debug("Sleeping Processing Thread");
0479: synchronized (sem) {
0480: log.debug("++++++WAITING " + nodeID);
0481: sem.wait(sleepTime);
0482:
0483: log.debug("+++++ALIVE " + nodeID);
0484: }
0485: log.debug("Wakey Wakey Processing Thread");
0486:
0487: if (org.sakaiproject.component.cover.ComponentManager
0488: .hasBeenClosed()) {
0489: runThreads = false;
0490: break;
0491: }
0492: if (soakTest
0493: && (searchService.getPendingDocs() == 0)) {
0494: log
0495: .error("SOAK TEST---SOAK TEST---SOAK TEST. Index Rebuild Started");
0496: searchService.rebuildInstance();
0497: }
0498: } catch (InterruptedException e) {
0499: log.debug(" Exit From sleep " + e.getMessage());
0500: break;
0501: }
0502: }
0503: } catch (Throwable t) {
0504: log.warn("Failed in IndexBuilder ", t);
0505: } finally {
0506:
0507: log.debug("IndexBuilder run exit " + threadName);
0508: indexBuilderThread[threadno] = null;
0509: }
0510: }
0511:
0512: private String getNodeID() {
0513: String nodeID = (String) nodeIDHolder.get();
0514: if (nodeID == null) {
0515: UUID uuid = (UUID) idgenerator.nextIdentifier();
0516: nodeID = uuid.toString();
0517: nodeIDHolder.set(nodeID);
0518: if (nodeIDList.get(nodeID) == null) {
0519: nodeIDList.put(nodeID, nodeID);
0520: } else {
0521: log
0522: .error("============NODE ID "
0523: + nodeID
0524: + " has already been issued, there must be a clash");
0525: }
0526: }
0527: return nodeID;
0528: }
0529:
0530: /*
0531: * (non-Javadoc)
0532: *
0533: * @see org.sakaiproject.search.component.service.impl.SearchIndexBuilderWorkerAPI#updateNodeLock(java.sql.Connection)
0534: */
0535: public void updateNodeLock(long lifeLeft) throws SQLException {
0536:
0537: Connection connection = null;
0538: String nodeID = getNodeID();
0539:
0540: PreparedStatement updateNodeLock = null;
0541: PreparedStatement deleteExpiredNodeLock = null;
0542: PreparedStatement selectExpiredNodeLock = null;
0543: PreparedStatement insertLock = null;
0544: ResultSet resultSet = null;
0545: String threadID = Thread.currentThread().getName();
0546: boolean savedautocommit = false;
0547: Timestamp now = new Timestamp(System.currentTimeMillis());
0548: // a node can expire, after 2 minutes, to indicate to an admin that it
0549: // is dead
0550: // the admin can then force the
0551: Timestamp nodeExpired = new Timestamp(now.getTime() + lifeLeft);
0552: try {
0553: connection = dataSource.getConnection();
0554: boolean savedautocommen = connection.getAutoCommit();
0555: connection.setAutoCommit(false);
0556:
0557: updateNodeLock = connection
0558: .prepareStatement(UPDATE_NODE_LOCK_SQL);
0559: deleteExpiredNodeLock = connection
0560: .prepareStatement(DELETE_LOCKNODE_SQL);
0561: selectExpiredNodeLock = connection
0562: .prepareStatement(SELECT_EXPIRED_NODES_SQL);
0563: insertLock = connection.prepareStatement(INSERT_LOCK_SQL);
0564: int retries = 5;
0565: boolean updated = false;
0566: while (!updated && retries > 0) {
0567: try {
0568:
0569: try {
0570: insertLock.clearParameters();
0571: insertLock.setString(1, "Node:" + nodeID); // id
0572: insertLock.setString(2, nodeID); // nodename
0573: insertLock.setString(3, NODE_LOCK + nodeID); // lockkey
0574: insertLock.setTimestamp(4, nodeExpired); // expires
0575: log.debug(threadID + " Doing "
0576: + INSERT_LOCK_SQL + ":{" + "Node:"
0577: + nodeID + "}{" + nodeID + "}{"
0578: + NODE_LOCK + nodeID + "}{"
0579: + nodeExpired + "}");
0580: insertLock.executeUpdate();
0581: } catch (SQLException ex) {
0582: updateNodeLock.clearParameters();
0583: updateNodeLock.setTimestamp(1, nodeExpired); // expires
0584: updateNodeLock.setString(2, nodeID); // nodename
0585: updateNodeLock.setString(3, NODE_LOCK + nodeID); // lockkey
0586: log.debug(threadID + " Doing "
0587: + UPDATE_NODE_LOCK_SQL + ":{"
0588: + nodeExpired + "}{" + nodeID + "}{"
0589: + NODE_LOCK + nodeID + "}");
0590: if (updateNodeLock.executeUpdate() != 1) {
0591: log.warn("Failed to update node heartbeat "
0592: + nodeID);
0593: }
0594: }
0595: log.debug(threadID + " Doing Commit ");
0596: connection.commit();
0597: updated = true;
0598: } catch (SQLException e) {
0599: log.warn("Retrying ", e);
0600: try {
0601: connection.rollback();
0602: } catch (Exception ex) {
0603:
0604: }
0605: retries--;
0606: try {
0607: Thread.sleep(100);
0608: } catch (InterruptedException ie) {
0609: }
0610: }
0611: }
0612: if (!updated) {
0613: log
0614: .error("Failed to update node lock, will try next time ");
0615: } else {
0616: log.debug("Updated Node Lock on " + nodeID
0617: + " to Expire at" + nodeExpired);
0618: }
0619:
0620: retries = 5;
0621: updated = false;
0622: while (!updated && retries > 0) {
0623: try {
0624: selectExpiredNodeLock.clearParameters();
0625: selectExpiredNodeLock.setTimestamp(1, now);
0626: log.debug(threadID + " Doing "
0627: + SELECT_EXPIRED_NODES_SQL + ":{" + now
0628: + "}");
0629:
0630: resultSet = selectExpiredNodeLock.executeQuery();
0631: while (resultSet.next()) {
0632: String id = resultSet.getString(1);
0633: deleteExpiredNodeLock.clearParameters();
0634: deleteExpiredNodeLock.setString(1, id);
0635: deleteExpiredNodeLock.execute();
0636: connection.commit();
0637: }
0638: log.debug(threadID + " Doing Commit");
0639: connection.commit();
0640: resultSet.close();
0641: updated = true;
0642: } catch (SQLException e) {
0643:
0644: log.info("Retrying Delete Due to "
0645: + e.getMessage());
0646: log.debug("Detailed Traceback ", e);
0647: try {
0648: resultSet.close();
0649: } catch (Exception ex) {
0650:
0651: }
0652: try {
0653: connection.rollback();
0654: } catch (Exception ex) {
0655:
0656: }
0657: retries--;
0658: try {
0659: Thread.sleep(100);
0660: } catch (InterruptedException ie) {
0661: }
0662: }
0663: }
0664: if (!updated) {
0665: log
0666: .warn("Failed to clear old nodes, will try next time ");
0667: }
0668:
0669: } catch (Exception ex) {
0670: log.error("Failed to register node ", ex);
0671: connection.rollback();
0672: } finally {
0673: if (resultSet != null) {
0674: try {
0675: resultSet.close();
0676: } catch (SQLException e) {
0677: }
0678: }
0679: if (insertLock != null) {
0680: try {
0681: insertLock.close();
0682: } catch (SQLException e) {
0683: }
0684: }
0685: if (updateNodeLock != null) {
0686: try {
0687: updateNodeLock.close();
0688: } catch (SQLException e) {
0689: }
0690: }
0691: if (selectExpiredNodeLock != null) {
0692: try {
0693: selectExpiredNodeLock.close();
0694: } catch (SQLException e) {
0695: }
0696: }
0697: if (deleteExpiredNodeLock != null) {
0698: try {
0699: deleteExpiredNodeLock.close();
0700: } catch (SQLException e) {
0701: }
0702: }
0703: if (connection != null) {
0704: try {
0705: connection.setAutoCommit(savedautocommit);
0706: connection.close();
0707: } catch (SQLException e) {
0708: }
0709: connection = null;
0710: }
0711:
0712: }
0713:
0714: }
0715:
0716: public boolean getLockTransaction(long nodeLifetime) {
0717: return getLockTransaction(nodeLifetime, false);
0718: }
0719:
0720: /**
0721: * @return
0722: * @throws HibernateException
0723: */
0724: public boolean getLockTransaction(long nodeLifetime,
0725: boolean forceLock) {
0726: if (searchIndexBuilderWorkerDao.isLockRequired()) {
0727: return getHardLock(nodeLifetime, forceLock);
0728: } else {
0729: try {
0730: updateNodeLock(nodeLifetime);
0731: } catch (SQLException e) {
0732: log.warn("Failed to update node lock "
0733: + e.getClass().getName() + " :"
0734: + e.getMessage());
0735: }
0736: return true;
0737: }
0738: }
0739:
0740: public boolean getHardLock(long nodeLifetime, boolean forceLock) {
0741: String nodeID = getNodeID();
0742: Connection connection = null;
0743: boolean locked = false;
0744: boolean autoCommit = false;
0745: PreparedStatement selectLock = null;
0746: PreparedStatement updateLock = null;
0747: PreparedStatement insertLock = null;
0748: PreparedStatement countWork = null;
0749:
0750: ResultSet resultSet = null;
0751: Timestamp now = new Timestamp(System.currentTimeMillis());
0752: Timestamp expiryDate = new Timestamp(now.getTime()
0753: + (10L * 60L * 1000L));
0754:
0755: try {
0756:
0757: // I need to go direct to JDBC since its just too awful to
0758: // try and do this in Hibernate.
0759:
0760: updateNodeLock(nodeLifetime);
0761:
0762: connection = dataSource.getConnection();
0763: autoCommit = connection.getAutoCommit();
0764: if (autoCommit) {
0765: connection.setAutoCommit(false);
0766: }
0767:
0768: selectLock = connection.prepareStatement(SELECT_LOCK_SQL);
0769: updateLock = connection.prepareStatement(UPDATE_LOCK_SQL);
0770: insertLock = connection.prepareStatement(INSERT_LOCK_SQL);
0771: countWork = connection.prepareStatement(COUNT_WORK_SQL);
0772:
0773: SearchWriterLock swl = null;
0774: selectLock.clearParameters();
0775: selectLock.setString(1, LOCKKEY);
0776: resultSet = selectLock.executeQuery();
0777: if (resultSet.next()) {
0778: swl = new SearchWriterLockImpl();
0779: swl.setId(resultSet.getString(1));
0780: swl.setNodename(resultSet.getString(2));
0781: swl.setLockkey(resultSet.getString(3));
0782: swl.setExpires(resultSet.getTimestamp(4));
0783: log.debug("GOT Lock Record " + swl.getId() + "::"
0784: + swl.getNodename() + "::" + swl.getExpires());
0785:
0786: }
0787:
0788: resultSet.close();
0789: resultSet = null;
0790:
0791: boolean takelock = false;
0792: if (swl == null) {
0793: log.debug("_-------------NO Lock Record");
0794: takelock = true;
0795: } else if ("none".equals(swl.getNodename())) {
0796: takelock = true;
0797: log.debug(nodeID + "_-------------no lock");
0798: } else if (nodeID.equals(swl.getNodename())) {
0799: takelock = true;
0800: log.debug(nodeID + "_------------matched threadid ");
0801: } else if (swl.getExpires() == null
0802: || swl.getExpires().before(now)) {
0803: takelock = true;
0804: log.debug(nodeID + "_------------thread dead ");
0805: }
0806:
0807: if (takelock) {
0808: // any work ?
0809: int nitems = 0;
0810: if (!forceLock) {
0811: countWork.clearParameters();
0812: countWork.setInt(1, SearchBuilderItem.STATE_PENDING
0813: .intValue());
0814: resultSet = countWork.executeQuery();
0815: if (resultSet.next()) {
0816: nitems = resultSet.getInt(1);
0817: }
0818: resultSet.close();
0819: resultSet = null;
0820: }
0821: if (nitems > 0 || forceLock) {
0822: try {
0823: if (swl == null) {
0824: insertLock.clearParameters();
0825: insertLock.setString(1, nodeID);
0826: insertLock.setString(2, nodeID);
0827: insertLock.setString(3, LOCKKEY);
0828: insertLock.setTimestamp(4, expiryDate);
0829:
0830: if (insertLock.executeUpdate() == 1) {
0831: log.debug("INSERT Lock Record "
0832: + nodeID + "::" + nodeID + "::"
0833: + expiryDate);
0834:
0835: locked = true;
0836: }
0837:
0838: } else {
0839: updateLock.clearParameters();
0840: updateLock.setString(1, nodeID);
0841: updateLock.setTimestamp(2, expiryDate);
0842: updateLock.setString(3, swl.getId());
0843: updateLock.setString(4, swl.getNodename());
0844: updateLock.setString(5, swl.getLockkey());
0845: if (updateLock.executeUpdate() == 1) {
0846: log.debug("UPDATED Lock Record "
0847: + swl.getId() + "::" + nodeID
0848: + "::" + expiryDate);
0849: locked = true;
0850: }
0851:
0852: }
0853: } catch (SQLException sqlex) {
0854: locked = false;
0855: log.debug(
0856: "Failed to get lock, but this is Ok ",
0857: sqlex);
0858: }
0859:
0860: }
0861:
0862: }
0863: connection.commit();
0864:
0865: } catch (Exception ex) {
0866: if (connection != null) {
0867: try {
0868: connection.rollback();
0869: } catch (SQLException e) {
0870: }
0871: }
0872: log.error("Failed to get lock " + ex.getMessage());
0873: locked = false;
0874: } finally {
0875: if (resultSet != null) {
0876: try {
0877: resultSet.close();
0878: } catch (SQLException e) {
0879: }
0880: }
0881: if (selectLock != null) {
0882: try {
0883: selectLock.close();
0884: } catch (SQLException e) {
0885: }
0886: }
0887: if (updateLock != null) {
0888: try {
0889: updateLock.close();
0890: } catch (SQLException e) {
0891: }
0892: }
0893: if (insertLock != null) {
0894: try {
0895: insertLock.close();
0896: } catch (SQLException e) {
0897: }
0898: }
0899: if (countWork != null) {
0900: try {
0901: countWork.close();
0902: } catch (SQLException e) {
0903: }
0904: }
0905:
0906: if (connection != null) {
0907: try {
0908: connection.setAutoCommit(autoCommit);
0909: } catch (SQLException e) {
0910: }
0911: try {
0912: connection.close();
0913: log.debug("Connection Closed ");
0914: } catch (SQLException e) {
0915: log.error("Error Closing Connection ", e);
0916: }
0917: connection = null;
0918: }
0919: }
0920: return locked;
0921:
0922: }
0923:
0924: /**
0925: * Count the number of pending documents waiting to be indexed on this
0926: * cluster node. All nodes will potentially perform the index in a cluster,
0927: * however only one must be doing the index, hence this method attampts to
0928: * grab a lock on the writer. If sucessfull it then gets the real number of
0929: * pending documents. There is a timeout, such that if the witer has not
0930: * been seen for 10 minutes, it is assumed that something has gon wrong with
0931: * it, and a new writer is elected on a first grab basis. Every time the
0932: * elected writer comes back, it updates its record to say its still active.
0933: * We could do some round robin timeout, or allow deployers to select a pool
0934: * of index writers in Sakai properties. {@inheritDoc}
0935: */
0936:
0937: private void clearLockTransaction() {
0938: if (searchIndexBuilderWorkerDao.isLockRequired()) {
0939: clearHardLock();
0940: }
0941: }
0942:
0943: public void clearHardLock() {
0944: String nodeID = getNodeID();
0945:
0946: Connection connection = null;
0947: PreparedStatement clearLock = null;
0948: try {
0949: connection = dataSource.getConnection();
0950:
0951: clearLock = connection.prepareStatement(CLEAR_LOCK_SQL);
0952: clearLock.clearParameters();
0953: clearLock.setString(1, NO_NODE);
0954: clearLock.setTimestamp(2, new Timestamp(System
0955: .currentTimeMillis()));
0956: clearLock.setString(3, nodeID);
0957: clearLock.setString(4, LOCKKEY);
0958: if (clearLock.executeUpdate() == 1) {
0959: log.debug("UNLOCK - OK::" + nodeID + "::now");
0960:
0961: } else {
0962: log.debug("UNLOCK - no-lock::" + nodeID + "::now");
0963: }
0964: connection.commit();
0965:
0966: } catch (Exception ex) {
0967: try {
0968: connection.rollback();
0969: } catch (SQLException e) {
0970: }
0971: log.error("Failed to clear lock" + ex.getMessage());
0972: } finally {
0973: if (clearLock != null) {
0974: try {
0975: clearLock.close();
0976: } catch (SQLException e) {
0977: log.error("Error Closing Prepared Statement ", e);
0978: }
0979: }
0980: if (connection != null) {
0981: try {
0982: connection.close();
0983: log.debug("Connection Closed");
0984: } catch (SQLException e) {
0985: log.error("Error Closing Connection", e);
0986: }
0987: }
0988: }
0989:
0990: }
0991:
0992: public boolean isRunning() {
0993: if (org.sakaiproject.component.cover.ComponentManager
0994: .hasBeenClosed()) {
0995: runThreads = false;
0996: }
0997: return runThreads;
0998: }
0999:
1000: /*
1001: * (non-Javadoc)
1002: *
1003: * @see org.sakaiproject.search.component.service.impl.SearchIndexBuilderWorkerAPI#checkRunning()
1004: */
1005: public void checkRunning() {
1006: if (!enabled)
1007: return;
1008: runThreads = true;
1009: synchronized (threadStartLock) {
1010: for (int i = 0; i < indexBuilderThread.length; i++) {
1011: if (indexBuilderThread[i] == null) {
1012: indexBuilderThread[i] = new Thread(this );
1013: indexBuilderThread[i].setName(String.valueOf(i)
1014: + "::" + this .getClass().getName());
1015: indexBuilderThread[i].start();
1016: }
1017: }
1018: }
1019: synchronized (sem) {
1020: log.debug("_________NOTIFY");
1021: sem.notify();
1022: log.debug("_________NOTIFY COMPLETE");
1023: }
1024:
1025: }
1026:
1027: public void destroy() {
1028: if (!enabled)
1029: return;
1030:
1031: log.debug("Destroy SearchIndexBuilderWorker ");
1032: runThreads = false;
1033:
1034: synchronized (sem) {
1035: sem.notifyAll();
1036: }
1037: }
1038:
1039: /**
1040: * @return Returns the sleepTime.
1041: */
1042: public long getSleepTime() {
1043: return sleepTime;
1044: }
1045:
1046: /**
1047: * @param sleepTime
1048: * The sleepTime to set.
1049: */
1050: public void setSleepTime(long sleepTime) {
1051: this .sleepTime = sleepTime;
1052: }
1053:
1054: /**
1055: * @return Returns the dataSource.
1056: */
1057: public DataSource getDataSource() {
1058: return dataSource;
1059: }
1060:
1061: /**
1062: * @param dataSource
1063: * The dataSource to set.
1064: */
1065: public void setDataSource(DataSource dataSource) {
1066: this .dataSource = dataSource;
1067: }
1068:
1069: /**
1070: * @return Returns the searchIndexBuilderWorkerDao.
1071: */
1072: public SearchIndexBuilderWorkerDao getSearchIndexBuilderWorkerDao() {
1073: return searchIndexBuilderWorkerDao;
1074: }
1075:
1076: /**
1077: * @param searchIndexBuilderWorkerDao
1078: * The searchIndexBuilderWorkerDao to set.
1079: */
1080: public void setSearchIndexBuilderWorkerDao(
1081: SearchIndexBuilderWorkerDao searchIndexBuilderWorkerDao) {
1082: this .searchIndexBuilderWorkerDao = searchIndexBuilderWorkerDao;
1083: }
1084:
1085: /*
1086: * (non-Javadoc)
1087: *
1088: * @see org.sakaiproject.search.component.service.impl.SearchIndexBuilderWorkerAPI#getCurrentLock()
1089: */
1090: public SearchWriterLock getCurrentLock() {
1091: String nodeID = getNodeID();
1092: Connection connection = null;
1093: PreparedStatement selectLock = null;
1094: ResultSet resultSet = null;
1095:
1096: try {
1097:
1098: // I need to go direct to JDBC since its just too awful to
1099: // try and do this in Hibernate.
1100:
1101: connection = dataSource.getConnection();
1102:
1103: selectLock = connection.prepareStatement(SELECT_LOCK_SQL);
1104:
1105: SearchWriterLock swl = null;
1106: selectLock.clearParameters();
1107: selectLock.setString(1, LOCKKEY);
1108: resultSet = selectLock.executeQuery();
1109: if (resultSet.next()) {
1110: swl = new SearchWriterLockImpl();
1111: swl.setId(resultSet.getString(1));
1112: swl.setNodename(resultSet.getString(2));
1113: swl.setLockkey(resultSet.getString(3));
1114: swl.setExpires(resultSet.getTimestamp(4));
1115: log.debug("GOT Lock Record " + swl.getId() + "::"
1116: + swl.getNodename() + "::" + swl.getExpires());
1117:
1118: }
1119:
1120: resultSet.close();
1121: resultSet = null;
1122: if (swl == null) {
1123: swl = new SearchWriterLockImpl();
1124: swl.setNodename(NO_NODE);
1125: swl.setLockkey(LOCKKEY);
1126: swl.setExpires(new Timestamp(0));
1127:
1128: }
1129: return swl;
1130:
1131: } catch (Exception ex) {
1132: log.error("Failed to get lock " + ex.getMessage());
1133: SearchWriterLock swl = new SearchWriterLockImpl();
1134: swl.setNodename(NO_NODE);
1135: swl.setLockkey(LOCKKEY);
1136: swl.setExpires(new Timestamp(0));
1137:
1138: return swl;
1139: } finally {
1140: if (resultSet != null) {
1141: try {
1142: resultSet.close();
1143: } catch (SQLException e) {
1144: }
1145: }
1146: if (selectLock != null) {
1147: try {
1148: selectLock.close();
1149: } catch (SQLException e) {
1150: }
1151: }
1152:
1153: if (connection != null) {
1154: try {
1155: connection.close();
1156: log.debug("Connection Closed ");
1157: } catch (SQLException e) {
1158: log.error("Error Closing Connection ", e);
1159: }
1160: connection = null;
1161: }
1162: }
1163:
1164: }
1165:
1166: /*
1167: * (non-Javadoc)
1168: *
1169: * @see org.sakaiproject.search.component.service.impl.SearchIndexBuilderWorkerAPI#getNodeStatus()
1170: */
1171: public List getNodeStatus() {
1172: String nodeID = getNodeID();
1173: Connection connection = null;
1174: PreparedStatement selectLock = null;
1175: ResultSet resultSet = null;
1176: ArrayList locks = new ArrayList();
1177:
1178: try {
1179:
1180: // I need to go direct to JDBC since its just too awful to
1181: // try and do this in Hibernate.
1182:
1183: connection = dataSource.getConnection();
1184:
1185: selectLock = connection
1186: .prepareStatement(SELECT_NODE_LOCK_SQL);
1187:
1188: selectLock.clearParameters();
1189: resultSet = selectLock.executeQuery();
1190: while (resultSet.next()) {
1191: SearchWriterLock swl = new SearchWriterLockImpl();
1192: swl.setId(resultSet.getString(1));
1193: swl.setNodename(resultSet.getString(2));
1194: swl.setLockkey(resultSet.getString(3));
1195: swl.setExpires(resultSet.getTimestamp(4));
1196: log.debug("GOT Lock Record " + swl.getId() + "::"
1197: + swl.getNodename() + "::" + swl.getExpires());
1198: locks.add(swl);
1199: }
1200:
1201: resultSet.close();
1202: resultSet = null;
1203: return locks;
1204:
1205: } catch (Exception ex) {
1206: log.error("Failed to load nodes ", ex);
1207: return locks;
1208: } finally {
1209: if (resultSet != null) {
1210: try {
1211: resultSet.close();
1212: } catch (SQLException e) {
1213: }
1214: }
1215: if (selectLock != null) {
1216: try {
1217: selectLock.close();
1218: } catch (SQLException e) {
1219: }
1220: }
1221:
1222: if (connection != null) {
1223: try {
1224: connection.close();
1225: log.debug("Connection Closed ");
1226: } catch (SQLException e) {
1227: log.error("Error Closing Connection ", e);
1228: }
1229: connection = null;
1230: }
1231: }
1232:
1233: }
1234:
1235: public boolean removeWorkerLock() {
1236: Connection connection = null;
1237: PreparedStatement selectLock = null;
1238: PreparedStatement selectNodeLock = null;
1239: PreparedStatement clearLock = null;
1240: ResultSet resultSet = null;
1241: ArrayList locks = new ArrayList();
1242:
1243: try {
1244:
1245: // I need to go direct to JDBC since its just too awful to
1246: // try and do this in Hibernate.
1247:
1248: connection = dataSource.getConnection();
1249:
1250: selectNodeLock = connection
1251: .prepareStatement(SELECT_NODE_LOCK_SQL);
1252: selectLock = connection.prepareStatement(SELECT_LOCK_SQL);
1253: clearLock = connection.prepareStatement(CLEAR_LOCK_SQL);
1254:
1255: SearchWriterLock swl = null;
1256: selectLock.clearParameters();
1257: selectLock.setString(1, LOCKKEY);
1258: resultSet = selectLock.executeQuery();
1259: if (resultSet.next()) {
1260: swl = new SearchWriterLockImpl();
1261: swl.setId(resultSet.getString(1));
1262: swl.setNodename(resultSet.getString(2));
1263: swl.setLockkey(resultSet.getString(3));
1264: swl.setExpires(resultSet.getTimestamp(4));
1265: } else {
1266: return true;
1267: }
1268:
1269: resultSet.close();
1270: resultSet = null;
1271:
1272: selectNodeLock.clearParameters();
1273: resultSet = selectLock.executeQuery();
1274:
1275: while (resultSet.next()) {
1276: SearchWriterLock node = new SearchWriterLockImpl();
1277: node.setId(resultSet.getString(1));
1278: node.setNodename(resultSet.getString(2));
1279: node.setLockkey(resultSet.getString(3));
1280: node.setExpires(resultSet.getTimestamp(4));
1281: if (swl.getNodename().equals(node.getNodename())) {
1282: log.info("Cant remove Lock to node "
1283: + node.getNodename() + " node exists ");
1284: return false;
1285: }
1286: }
1287:
1288: resultSet.close();
1289: resultSet = null;
1290:
1291: clearLock.clearParameters();
1292: clearLock.setString(1, NO_NODE);
1293: clearLock.setTimestamp(2, new Timestamp(System
1294: .currentTimeMillis()));
1295: clearLock.setString(3, swl.getNodename());
1296: clearLock.setString(4, LOCKKEY);
1297: if (clearLock.executeUpdate() == 1) {
1298: log.warn("NODE UNLOCKED BY USER " + swl.getNodename());
1299:
1300: } else {
1301: log.info("NODE NOT UNLOCKED BY USER "
1302: + swl.getNodename());
1303: return false;
1304: }
1305:
1306: return true;
1307:
1308: } catch (Exception ex) {
1309: log.error("Failed to unlock ", ex);
1310: return false;
1311: } finally {
1312: if (resultSet != null) {
1313: try {
1314: resultSet.close();
1315: } catch (SQLException e) {
1316: }
1317: }
1318: if (selectLock != null) {
1319: try {
1320: selectLock.close();
1321: } catch (SQLException e) {
1322: }
1323: }
1324: if (selectNodeLock != null) {
1325: try {
1326: selectNodeLock.close();
1327: } catch (SQLException e) {
1328: }
1329: }
1330: if (clearLock != null) {
1331: try {
1332: clearLock.close();
1333: } catch (SQLException e) {
1334: }
1335: }
1336:
1337: if (connection != null) {
1338: try {
1339: connection.close();
1340: log.debug("Connection Closed ");
1341: } catch (SQLException e) {
1342: log.error("Error Closing Connection ", e);
1343: }
1344: connection = null;
1345: }
1346: }
1347: }
1348:
1349: /**
1350: * @return Returns the activity.
1351: */
1352: public int getActivity() {
1353: return activity;
1354: }
1355:
1356: public long getLastEventTime() {
1357: return lastEvent;
1358: }
1359:
1360: /**
1361: * @param activity
1362: * The activity to set.
1363: */
1364: public void resetActivity() {
1365: this .activity = 0;
1366:
1367: }
1368:
1369: public void incrementActivity() {
1370: activity++;
1371: lastEvent = System.currentTimeMillis();
1372: }
1373:
1374: public void setLastIndex(long l) {
1375: this .lastIndex = l;
1376:
1377: }
1378:
1379: public void setStartDocIndex(long startDocIndex) {
1380: this .startDocIndex = startDocIndex;
1381: }
1382:
1383: public void setNowIndexing(String reference) {
1384: this .lastIndexing = this .nowIndexing;
1385: this .nowIndexing = reference;
1386: }
1387:
1388: /**
1389: * @return Returns the lastIndex.
1390: */
1391: public long getLastIndex() {
1392: return lastIndex;
1393: }
1394:
1395: /**
1396: * @return Returns the nowIndexing.
1397: */
1398: public String getNowIndexing() {
1399: return nowIndexing;
1400: }
1401:
1402: /**
1403: * @return Returns the startDocIndex.
1404: */
1405: public long getStartDocIndex() {
1406: return startDocIndex;
1407: }
1408:
1409: public String getLastDocument() {
1410: return lastIndexing;
1411: }
1412:
1413: public String getLastElapsed() {
1414: long l = lastIndex;
1415: long h = l / 3600000L;
1416: l = l - (3600000L * h);
1417: long m = l / 600000L;
1418: ;
1419: l = l - (60000L * m);
1420: long s = l / 1000;
1421: l = l - (1000L * s);
1422: return "" + h + "h" + m + "m" + s + "." + l + "s";
1423: }
1424:
1425: public String getCurrentDocument() {
1426: return nowIndexing;
1427: }
1428:
1429: public String getCurrentElapsed() {
1430: long l = System.currentTimeMillis() - startDocIndex;
1431: long h = l / 3600000L;
1432: l = l - (3600000L * h);
1433: long m = l / 60000L;
1434: l = l - (60000L * m);
1435: long s = l / 1000L;
1436: l = l - (1000L * s);
1437: return "" + h + "h" + m + "m" + s + "." + l + "s";
1438: }
1439:
1440: /**
1441: * @return the loadFactor
1442: */
1443: public long getLoadFactor() {
1444: return loadFactor;
1445: }
1446:
1447: /**
1448: * @param loadFactor
1449: * the loadFactor to set
1450: */
1451: public void setLoadFactor(long loadFactor) {
1452: this .loadFactor = loadFactor;
1453: }
1454:
1455: /**
1456: * Is the lock on this node, but not this thread lockedTo == null, localloc ==
1457: * false lockedTo == this node, locallock = false; lockedTo != this node,
1458: * localLock = true
1459: */
1460: public boolean isLocalLock() {
1461: if (lockedTo == null) {
1462: return false;
1463: } else if (getNodeID().equals(lockedTo)) {
1464: return false;
1465: }
1466: return true;
1467: }
1468:
1469: /**
1470: * @return the soakTest
1471: */
1472: public boolean getSoakTest() {
1473: return soakTest;
1474: }
1475:
1476: /**
1477: * Puts the index builder into a Soak test, when there are no pending items,
1478: * it starts building again.
1479: *
1480: * @param soakTest
1481: * the soakTest to set
1482: */
1483: public void setSoakTest(boolean soakTest) {
1484:
1485: this .soakTest = soakTest;
1486: if (soakTest) {
1487: log
1488: .warn("SOAK TEST ACTIVE ======================DONT USE FOR PRODUCTION ");
1489: }
1490: }
1491:
1492: /*
1493: * (non-Javadoc)
1494: *
1495: * @see org.sakaiproject.search.api.Diagnosable#disableDiagnostics()
1496: */
1497: public void disableDiagnostics() {
1498: diagnostics = false;
1499:
1500: }
1501:
1502: /*
1503: * (non-Javadoc)
1504: *
1505: * @see org.sakaiproject.search.api.Diagnosable#enableDiagnostics()
1506: */
1507: public void enableDiagnostics() {
1508: diagnostics = true;
1509: }
1510:
1511: /*
1512: * (non-Javadoc)
1513: *
1514: * @see org.sakaiproject.search.api.Diagnosable#hasDiagnostics()
1515: */
1516: public boolean hasDiagnostics() {
1517: return diagnostics;
1518: }
1519:
1520: }
|