Source Code Cross Referenced for SearchIndexBuilderWorkerImpl.java in  » ERP-CRM-Financial » sakai » org » sakaiproject » search » component » service » impl » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » ERP CRM Financial » sakai » org.sakaiproject.search.component.service.impl 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


0001:        /**********************************************************************************
0002:         * $URL: https://source.sakaiproject.org/svn/search/tags/sakai_2-4-1/search-impl/impl/src/java/org/sakaiproject/search/component/service/impl/SearchIndexBuilderWorkerImpl.java $
0003:         * $Id: SearchIndexBuilderWorkerImpl.java 29635 2007-04-26 14:44:09Z ajpoland@iupui.edu $
0004:         ***********************************************************************************
0005:         *
0006:         * Copyright (c) 2003, 2004, 2005, 2006 The Sakai Foundation.
0007:         *
0008:         * Licensed under the Educational Community License, Version 1.0 (the "License");
0009:         * you may not use this file except in compliance with the License.
0010:         * You may obtain a copy of the License at
0011:         *
0012:         *      http://www.opensource.org/licenses/ecl1.php
0013:         *
0014:         * Unless required by applicable law or agreed to in writing, software
0015:         * distributed under the License is distributed on an "AS IS" BASIS,
0016:         * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0017:         * See the License for the specific language governing permissions and
0018:         * limitations under the License.
0019:         *
0020:         **********************************************************************************/package org.sakaiproject.search.component.service.impl;
0021:
0022:        import java.sql.Connection;
0023:        import java.sql.PreparedStatement;
0024:        import java.sql.ResultSet;
0025:        import java.sql.SQLException;
0026:        import java.sql.Timestamp;
0027:        import java.util.ArrayList;
0028:        import java.util.HashMap;
0029:        import java.util.List;
0030:
0031:        import javax.sql.DataSource;
0032:
0033:        import org.apache.commons.id.IdentifierGenerator;
0034:        import org.apache.commons.id.uuid.UUID;
0035:        import org.apache.commons.id.uuid.VersionFourGenerator;
0036:        import org.apache.commons.logging.Log;
0037:        import org.apache.commons.logging.LogFactory;
0038:        import org.hibernate.HibernateException;
0039:        import org.sakaiproject.component.api.ComponentManager;
0040:        import org.sakaiproject.component.cover.ServerConfigurationService;
0041:        import org.sakaiproject.entity.api.EntityManager;
0042:        import org.sakaiproject.event.api.EventTrackingService;
0043:        import org.sakaiproject.search.api.SearchIndexBuilder;
0044:        import org.sakaiproject.search.api.SearchIndexBuilderWorker;
0045:        import org.sakaiproject.search.api.SearchService;
0046:        import org.sakaiproject.search.dao.SearchIndexBuilderWorkerDao;
0047:        import org.sakaiproject.search.model.SearchBuilderItem;
0048:        import org.sakaiproject.search.model.SearchWriterLock;
0049:        import org.sakaiproject.search.model.impl.SearchWriterLockImpl;
0050:        import org.sakaiproject.tool.api.SessionManager;
0051:        import org.sakaiproject.user.api.User;
0052:        import org.sakaiproject.user.api.UserDirectoryService;
0053:
0054:        public class SearchIndexBuilderWorkerImpl implements  Runnable,
0055:                SearchIndexBuilderWorker {
0056:
0057:            /**
0058:             * The lock we use to ensure single search index writer
0059:             */
0060:            public static final String LOCKKEY = "searchlockkey";
0061:
0062:            protected static final Object GLOBAL_CONTEXT = null;
0063:
0064:            private static final String NO_NODE = "none";
0065:
0066:            private static final String NODE_LOCK = "nodelockkey";
0067:
0068:            private static Log log = LogFactory
0069:                    .getLog(SearchIndexBuilderWorkerImpl.class);
0070:
0071:            private final int numThreads = 2;
0072:
0073:            /**
0074:             * The maximum sleep time for the wait/notify semaphore
0075:             */
0076:            public long sleepTime = 5L * 60000L;
0077:
0078:            /**
0079:             * A load factor 1 is full load, 100 is normal The load factor controlls the
0080:             * backoff of the indexer threads. If the load Factor is high, the search
0081:             * threads back off more.
0082:             */
0083:            private long loadFactor = 1000L;
0084:
0085:            /**
0086:             * The currently running index Builder thread
0087:             */
0088:            private Thread indexBuilderThread[] = new Thread[numThreads];
0089:
0090:            /**
0091:             * sync object
0092:             */
0093:            private Object threadStartLock = new Object();
0094:
0095:            /**
0096:             * dependency: the search index builder that is accepting new items
0097:             */
0098:            private SearchIndexBuilderImpl searchIndexBuilder = null;
0099:
0100:            /**
0101:             * dependency: the current search service, used to get the location of the
0102:             * index
0103:             */
0104:            private SearchService searchService = null;
0105:
0106:            private DataSource dataSource = null;
0107:
0108:            private IdentifierGenerator idgenerator = new VersionFourGenerator();
0109:
0110:            /**
0111:             * Semaphore
0112:             */
0113:            private Object sem = new Object();
0114:
0115:            /**
0116:             * The number of items to process in a batch, default = 100
0117:             */
0118:            private int indexBatchSize = 100;
0119:
0120:            private boolean enabled = false;
0121:
0122:            private SessionManager sessionManager;
0123:
0124:            private UserDirectoryService userDirectoryService;
0125:
0126:            private EntityManager entityManager;
0127:
0128:            private EventTrackingService eventTrackingService;
0129:
0130:            private boolean runThreads = false;
0131:
0132:            private ThreadLocal nodeIDHolder = new ThreadLocal();
0133:
0134:            private SearchIndexBuilderWorkerDao searchIndexBuilderWorkerDao = null;
0135:
0136:            private long lastLock = System.currentTimeMillis();
0137:
0138:            /**
0139:             * Activity acts as a counter that indicates the number of events recieved.
0140:             * The Workers may consult this to determine if they should run.
0141:             */
0142:            private int activity = 0;
0143:
0144:            private long lastEvent = System.currentTimeMillis();
0145:
0146:            private long lastIndex;
0147:
0148:            private long startDocIndex;
0149:
0150:            private String nowIndexing;
0151:
0152:            private String lastIndexing;
0153:
0154:            private boolean soakTest = false;
0155:
0156:            private boolean diagnostics = false;
0157:
0158:            private boolean started = false;
0159:
0160:            private boolean indexExists = false;
0161:
0162:            private static HashMap nodeIDList = new HashMap();;
0163:
0164:            private static String lockedTo = null;
0165:
0166:            private static String SELECT_LOCK_SQL = "select id, nodename, "
0167:                    + "lockkey, expires from searchwriterlock where lockkey = ?";
0168:
0169:            private static String UPDATE_LOCK_SQL = "update searchwriterlock set "
0170:                    + "nodename = ?, expires = ? where id = ? "
0171:                    + "and nodename = ? and lockkey = ? ";
0172:
0173:            private static String INSERT_LOCK_SQL = "insert into searchwriterlock "
0174:                    + "( id,nodename,lockkey, expires ) values ( ?, ?, ?, ? )";
0175:
0176:            private static String COUNT_WORK_SQL = " select count(*) "
0177:                    + "from searchbuilderitem where searchstate = ? ";
0178:
0179:            private static String CLEAR_LOCK_SQL = "update searchwriterlock "
0180:                    + "set nodename = ?, expires = ? where nodename = ? and lockkey = ? ";
0181:
0182:            private static String SELECT_NODE_LOCK_SQL = "select id, nodename, "
0183:                    + "lockkey, expires from searchwriterlock where lockkey like '"
0184:                    + NODE_LOCK + "%'";
0185:
0186:            private static String UPDATE_NODE_LOCK_SQL = "update searchwriterlock set "
0187:                    + "expires = ? where nodename = ? and lockkey = ? ";
0188:
0189:            private static final String SELECT_EXPIRED_NODES_SQL = "select id from searchwriterlock "
0190:                    + "where lockkey like '"
0191:                    + NODE_LOCK
0192:                    + "%' and expires < ? ";
0193:
0194:            private static final String DELETE_LOCKNODE_SQL = "delete from searchwriterlock "
0195:                    + "where id = ? ";
0196:
0197:            public void init() {
0198:                if (started && !runThreads) {
0199:                    log.warn("JVM Shutdown in progress, will not startup");
0200:                    return;
0201:                }
0202:                if (org.sakaiproject.component.cover.ComponentManager
0203:                        .hasBeenClosed()) {
0204:                    log
0205:                            .warn("Component manager Shutdown in progress, will not startup");
0206:                    return;
0207:                }
0208:                started = true;
0209:                runThreads = true;
0210:                ComponentManager cm = org.sakaiproject.component.cover.ComponentManager
0211:                        .getInstance();
0212:                eventTrackingService = (EventTrackingService) load(cm,
0213:                        EventTrackingService.class.getName());
0214:                entityManager = (EntityManager) load(cm, EntityManager.class
0215:                        .getName());
0216:                userDirectoryService = (UserDirectoryService) load(cm,
0217:                        UserDirectoryService.class.getName());
0218:                searchIndexBuilder = (SearchIndexBuilderImpl) load(cm,
0219:                        SearchIndexBuilder.class.getName());
0220:                searchService = (SearchService) load(cm, SearchService.class
0221:                        .getName());
0222:
0223:                sessionManager = (SessionManager) load(cm, SessionManager.class
0224:                        .getName());
0225:
0226:                enabled = "true".equals(ServerConfigurationService.getString(
0227:                        "search.enable", "false"));
0228:
0229:                enabled = enabled
0230:                        & "true".equals(ServerConfigurationService.getString(
0231:                                "search.indexbuild", "true"));
0232:                try {
0233:                    if (searchIndexBuilder == null) {
0234:                        log
0235:                                .error("Search Index Worker needs SearchIndexBuilder ");
0236:                    }
0237:                    if (searchService == null) {
0238:                        log.error("Search Index Worker needs SearchService ");
0239:                    }
0240:                    if (searchIndexBuilderWorkerDao == null) {
0241:                        log
0242:                                .error("Search Index Worker needs SearchIndexBuilderWorkerDao ");
0243:                    }
0244:                    if (eventTrackingService == null) {
0245:                        log
0246:                                .error("Search Index Worker needs EventTrackingService ");
0247:                    }
0248:                    if (entityManager == null) {
0249:                        log.error("Search Index Worker needs EntityManager ");
0250:                    }
0251:                    if (userDirectoryService == null) {
0252:                        log
0253:                                .error("Search Index Worker needs UserDirectortyService ");
0254:                    }
0255:                    if (sessionManager == null) {
0256:                        log.error("Search Index Worker needs SessionManager ");
0257:                    }
0258:                    log.debug("init start");
0259:                    for (int i = 0; i < indexBuilderThread.length; i++) {
0260:                        indexBuilderThread[i] = new Thread(this );
0261:                        indexBuilderThread[i].setName("SearchBuilder_"
0262:                                + String.valueOf(i));
0263:                        indexBuilderThread[i].start();
0264:                    }
0265:
0266:                    /*
0267:                     * Capture shutdown
0268:                     */
0269:                    Runtime.getRuntime().addShutdownHook(new Thread() {
0270:                        /*
0271:                         * (non-Javadoc)
0272:                         * 
0273:                         * @see java.lang.Thread#run()
0274:                         */
0275:                        @Override
0276:                        public void run() {
0277:                            runThreads = false;
0278:                        }
0279:                    });
0280:
0281:                } catch (Throwable t) {
0282:                    log.error("Failed to init ", t);
0283:                }
0284:            }
0285:
0286:            private Object load(ComponentManager cm, String name) {
0287:                Object o = cm.get(name);
0288:                if (o == null) {
0289:                    log.error("Cant find Spring component named " + name);
0290:                }
0291:                return o;
0292:            }
0293:
0294:            /**
0295:             * Main run target of the worker thread {@inheritDoc}
0296:             */
0297:            public void run() {
0298:                if (!enabled)
0299:                    return;
0300:
0301:                String threadName = Thread.currentThread().getName();
0302:                String tn = threadName.substring("SearchBuilder_".length());
0303:                log.debug("Index Builder Run " + tn + "_" + threadName);
0304:                int threadno = Integer.parseInt(tn);
0305:
0306:                String nodeID = getNodeID();
0307:
0308:                org.sakaiproject.component.cover.ComponentManager
0309:                        .waitTillConfigured();
0310:
0311:                try {
0312:
0313:                    while (runThreads) {
0314:                        log.debug("Run Processing Thread");
0315:                        boolean locked = false;
0316:                        org.sakaiproject.tool.api.Session s = null;
0317:                        if (s == null) {
0318:                            s = sessionManager.startSession();
0319:                            User u = userDirectoryService.getUser("admin");
0320:                            s.setUserId(u.getId());
0321:                        }
0322:
0323:                        while (runThreads) {
0324:                            sessionManager.setCurrentSession(s);
0325:                            try {
0326:                                int totalDocs = searchIndexBuilder
0327:                                        .getPendingDocuments();
0328:                                long lastEvent = getLastEventTime();
0329:                                int activity = getActivity();
0330:                                long now = System.currentTimeMillis();
0331:                                long interval = now - lastEvent;
0332:                                boolean process = false;
0333:                                boolean createIndex = false;
0334:                                if (!indexExists) {
0335:                                    if (!searchIndexBuilderWorkerDao
0336:                                            .indexExists()) {
0337:                                        process = true;
0338:                                        createIndex = true;
0339:                                        log
0340:                                                .debug("No cluster Index exists, creating for the first time");
0341:                                    } else {
0342:                                        indexExists = true;
0343:                                    }
0344:                                } else {
0345:
0346:                                    // if activity == totalDocs and interval > 10
0347:                                    if (totalDocs > 200) {
0348:                                        loadFactor = 10L;
0349:                                    } else {
0350:                                        loadFactor = 1000L;
0351:                                    }
0352:                                    if (totalDocs == 0) {
0353:                                        process = false;
0354:                                    } else if (totalDocs < 20
0355:                                            && interval > (20 * loadFactor)) {
0356:                                        process = true;
0357:                                    } else if (totalDocs >= 20
0358:                                            && totalDocs < 50
0359:                                            && interval > (10 * loadFactor)) {
0360:                                        process = true;
0361:                                    } else if (totalDocs >= 50
0362:                                            && totalDocs < 90
0363:                                            && interval > (5 * loadFactor)) {
0364:                                        process = true;
0365:                                    } else if (totalDocs > ((90 * loadFactor) / 1000)) {
0366:                                        process = true;
0367:                                    }
0368:                                    if (process) {
0369:                                        resetActivity();
0370:                                    }
0371:                                }
0372:
0373:                                // should this node consider taking the lock ?
0374:                                long lastLockInterval = (System
0375:                                        .currentTimeMillis() - lastLock);
0376:                                long lastLockMetric = lastLockInterval
0377:                                        * totalDocs;
0378:
0379:                                // if we have 1000 docs, then indexing should happen
0380:                                // after 10 seconds break
0381:                                // 1000*10000 10000000
0382:                                // 500 docs/ 20 seconds
0383:                                //
0384:
0385:                                // make certain that we are alive
0386:                                log
0387:                                        .debug("Activity "
0388:                                                + (lastLockMetric > (10000L * loadFactor))
0389:                                                + " "
0390:                                                + (lastLockInterval > (60L * loadFactor))
0391:                                                + " " + createIndex);
0392:
0393:                                if (lastLockMetric > (10000L * loadFactor)
0394:                                        || lastLockInterval > (60L * loadFactor)
0395:                                        || createIndex) {
0396:                                    log.debug("===" + process
0397:                                            + "=============PROCESSING ");
0398:                                    if (process
0399:                                            && getLockTransaction(
0400:                                                    2L * 60L * 1000L,
0401:                                                    createIndex)) {
0402:
0403:                                        log.debug("===" + nodeID
0404:                                                + "=============PROCESSING ");
0405:                                        if (lockedTo != null
0406:                                                && lockedTo.equals(nodeID)) {
0407:                                            log
0408:                                                    .error("+++++++++++++++Local Lock Collision+++++++++++++");
0409:                                        }
0410:                                        lockedTo = nodeID;
0411:
0412:                                        lastLock = System.currentTimeMillis();
0413:
0414:                                        if (createIndex) {
0415:                                            log
0416:                                                    .info("=======================Search Index being created for the first time");
0417:                                            searchIndexBuilderWorkerDao
0418:                                                    .createIndexTransaction(this );
0419:                                            indexExists = true;
0420:                                            log
0421:                                                    .info("=======================Done creating Search Index for the first time");
0422:
0423:                                        } else {
0424:                                            int batchSize = 100;
0425:                                            if (totalDocs > 500) {
0426:                                                batchSize = 200;
0427:                                            } else if (totalDocs > 1000) {
0428:                                                batchSize = 500;
0429:                                            } else if (totalDocs > 10000) {
0430:                                                batchSize = 1000;
0431:                                            }
0432:                                            searchIndexBuilderWorkerDao
0433:                                                    .processToDoListTransaction(
0434:                                                            this , batchSize);
0435:
0436:                                        }
0437:
0438:                                        lastLock = System.currentTimeMillis();
0439:
0440:                                        if (lockedTo.equals(nodeID)) {
0441:                                            lockedTo = null;
0442:                                        } else {
0443:                                            log
0444:                                                    .error("+++++++++++++++++++++++++++Lost Local Lock+++++++++++");
0445:                                        }
0446:                                        log.debug("===" + nodeID
0447:                                                + "=============COMPLETED ");
0448:
0449:                                    } else {
0450:                                        break;
0451:                                    }
0452:                                } else {
0453:                                    // make certain the node updates hearbeat
0454:                                    updateNodeLock(2L * 60L * 1000L);
0455:                                    log
0456:                                            .debug("Not taking Lock, too much activity");
0457:                                    break;
0458:                                }
0459:                            } finally {
0460:                                clearLockTransaction();
0461:                            }
0462:                        }
0463:                        // this is here force cluster members
0464:                        // this will not reload the index on this node as
0465:                        if (indexExists) {
0466:                            try {
0467:                                searchService.reload();
0468:                            } catch (Exception ex) {
0469:                                log
0470:                                        .info("No Search Segment exists at present, this is Ok on first start :"
0471:                                                + ex.getMessage());
0472:                            }
0473:                        }
0474:                        if (!runThreads) {
0475:                            break;
0476:                        }
0477:                        try {
0478:                            log.debug("Sleeping Processing Thread");
0479:                            synchronized (sem) {
0480:                                log.debug("++++++WAITING " + nodeID);
0481:                                sem.wait(sleepTime);
0482:
0483:                                log.debug("+++++ALIVE " + nodeID);
0484:                            }
0485:                            log.debug("Wakey Wakey Processing Thread");
0486:
0487:                            if (org.sakaiproject.component.cover.ComponentManager
0488:                                    .hasBeenClosed()) {
0489:                                runThreads = false;
0490:                                break;
0491:                            }
0492:                            if (soakTest
0493:                                    && (searchService.getPendingDocs() == 0)) {
0494:                                log
0495:                                        .error("SOAK TEST---SOAK TEST---SOAK TEST. Index Rebuild Started");
0496:                                searchService.rebuildInstance();
0497:                            }
0498:                        } catch (InterruptedException e) {
0499:                            log.debug(" Exit From sleep " + e.getMessage());
0500:                            break;
0501:                        }
0502:                    }
0503:                } catch (Throwable t) {
0504:                    log.warn("Failed in IndexBuilder ", t);
0505:                } finally {
0506:
0507:                    log.debug("IndexBuilder run exit " + threadName);
0508:                    indexBuilderThread[threadno] = null;
0509:                }
0510:            }
0511:
0512:            private String getNodeID() {
0513:                String nodeID = (String) nodeIDHolder.get();
0514:                if (nodeID == null) {
0515:                    UUID uuid = (UUID) idgenerator.nextIdentifier();
0516:                    nodeID = uuid.toString();
0517:                    nodeIDHolder.set(nodeID);
0518:                    if (nodeIDList.get(nodeID) == null) {
0519:                        nodeIDList.put(nodeID, nodeID);
0520:                    } else {
0521:                        log
0522:                                .error("============NODE ID "
0523:                                        + nodeID
0524:                                        + " has already been issued, there must be a clash");
0525:                    }
0526:                }
0527:                return nodeID;
0528:            }
0529:
0530:            /*
0531:             * (non-Javadoc)
0532:             * 
0533:             * @see org.sakaiproject.search.component.service.impl.SearchIndexBuilderWorkerAPI#updateNodeLock(java.sql.Connection)
0534:             */
0535:            public void updateNodeLock(long lifeLeft) throws SQLException {
0536:
0537:                Connection connection = null;
0538:                String nodeID = getNodeID();
0539:
0540:                PreparedStatement updateNodeLock = null;
0541:                PreparedStatement deleteExpiredNodeLock = null;
0542:                PreparedStatement selectExpiredNodeLock = null;
0543:                PreparedStatement insertLock = null;
0544:                ResultSet resultSet = null;
0545:                String threadID = Thread.currentThread().getName();
0546:                boolean savedautocommit = false;
0547:                Timestamp now = new Timestamp(System.currentTimeMillis());
0548:                // a node can expire, after 2 minutes, to indicate to an admin that it
0549:                // is dead
0550:                // the admin can then force the
0551:                Timestamp nodeExpired = new Timestamp(now.getTime() + lifeLeft);
0552:                try {
0553:                    connection = dataSource.getConnection();
0554:                    boolean savedautocommen = connection.getAutoCommit();
0555:                    connection.setAutoCommit(false);
0556:
0557:                    updateNodeLock = connection
0558:                            .prepareStatement(UPDATE_NODE_LOCK_SQL);
0559:                    deleteExpiredNodeLock = connection
0560:                            .prepareStatement(DELETE_LOCKNODE_SQL);
0561:                    selectExpiredNodeLock = connection
0562:                            .prepareStatement(SELECT_EXPIRED_NODES_SQL);
0563:                    insertLock = connection.prepareStatement(INSERT_LOCK_SQL);
0564:                    int retries = 5;
0565:                    boolean updated = false;
0566:                    while (!updated && retries > 0) {
0567:                        try {
0568:
0569:                            try {
0570:                                insertLock.clearParameters();
0571:                                insertLock.setString(1, "Node:" + nodeID); // id
0572:                                insertLock.setString(2, nodeID); // nodename
0573:                                insertLock.setString(3, NODE_LOCK + nodeID); // lockkey
0574:                                insertLock.setTimestamp(4, nodeExpired); // expires
0575:                                log.debug(threadID + " Doing "
0576:                                        + INSERT_LOCK_SQL + ":{" + "Node:"
0577:                                        + nodeID + "}{" + nodeID + "}{"
0578:                                        + NODE_LOCK + nodeID + "}{"
0579:                                        + nodeExpired + "}");
0580:                                insertLock.executeUpdate();
0581:                            } catch (SQLException ex) {
0582:                                updateNodeLock.clearParameters();
0583:                                updateNodeLock.setTimestamp(1, nodeExpired); // expires
0584:                                updateNodeLock.setString(2, nodeID); // nodename
0585:                                updateNodeLock.setString(3, NODE_LOCK + nodeID); // lockkey
0586:                                log.debug(threadID + " Doing "
0587:                                        + UPDATE_NODE_LOCK_SQL + ":{"
0588:                                        + nodeExpired + "}{" + nodeID + "}{"
0589:                                        + NODE_LOCK + nodeID + "}");
0590:                                if (updateNodeLock.executeUpdate() != 1) {
0591:                                    log.warn("Failed to update node heartbeat "
0592:                                            + nodeID);
0593:                                }
0594:                            }
0595:                            log.debug(threadID + " Doing Commit ");
0596:                            connection.commit();
0597:                            updated = true;
0598:                        } catch (SQLException e) {
0599:                            log.warn("Retrying ", e);
0600:                            try {
0601:                                connection.rollback();
0602:                            } catch (Exception ex) {
0603:
0604:                            }
0605:                            retries--;
0606:                            try {
0607:                                Thread.sleep(100);
0608:                            } catch (InterruptedException ie) {
0609:                            }
0610:                        }
0611:                    }
0612:                    if (!updated) {
0613:                        log
0614:                                .error("Failed to update node lock, will try next time ");
0615:                    } else {
0616:                        log.debug("Updated Node Lock on " + nodeID
0617:                                + " to Expire at" + nodeExpired);
0618:                    }
0619:
0620:                    retries = 5;
0621:                    updated = false;
0622:                    while (!updated && retries > 0) {
0623:                        try {
0624:                            selectExpiredNodeLock.clearParameters();
0625:                            selectExpiredNodeLock.setTimestamp(1, now);
0626:                            log.debug(threadID + " Doing "
0627:                                    + SELECT_EXPIRED_NODES_SQL + ":{" + now
0628:                                    + "}");
0629:
0630:                            resultSet = selectExpiredNodeLock.executeQuery();
0631:                            while (resultSet.next()) {
0632:                                String id = resultSet.getString(1);
0633:                                deleteExpiredNodeLock.clearParameters();
0634:                                deleteExpiredNodeLock.setString(1, id);
0635:                                deleteExpiredNodeLock.execute();
0636:                                connection.commit();
0637:                            }
0638:                            log.debug(threadID + " Doing Commit");
0639:                            connection.commit();
0640:                            resultSet.close();
0641:                            updated = true;
0642:                        } catch (SQLException e) {
0643:
0644:                            log.info("Retrying Delete Due to  "
0645:                                    + e.getMessage());
0646:                            log.debug("Detailed Traceback  ", e);
0647:                            try {
0648:                                resultSet.close();
0649:                            } catch (Exception ex) {
0650:
0651:                            }
0652:                            try {
0653:                                connection.rollback();
0654:                            } catch (Exception ex) {
0655:
0656:                            }
0657:                            retries--;
0658:                            try {
0659:                                Thread.sleep(100);
0660:                            } catch (InterruptedException ie) {
0661:                            }
0662:                        }
0663:                    }
0664:                    if (!updated) {
0665:                        log
0666:                                .warn("Failed to clear old nodes, will try next time ");
0667:                    }
0668:
0669:                } catch (Exception ex) {
0670:                    log.error("Failed to register node ", ex);
0671:                    connection.rollback();
0672:                } finally {
0673:                    if (resultSet != null) {
0674:                        try {
0675:                            resultSet.close();
0676:                        } catch (SQLException e) {
0677:                        }
0678:                    }
0679:                    if (insertLock != null) {
0680:                        try {
0681:                            insertLock.close();
0682:                        } catch (SQLException e) {
0683:                        }
0684:                    }
0685:                    if (updateNodeLock != null) {
0686:                        try {
0687:                            updateNodeLock.close();
0688:                        } catch (SQLException e) {
0689:                        }
0690:                    }
0691:                    if (selectExpiredNodeLock != null) {
0692:                        try {
0693:                            selectExpiredNodeLock.close();
0694:                        } catch (SQLException e) {
0695:                        }
0696:                    }
0697:                    if (deleteExpiredNodeLock != null) {
0698:                        try {
0699:                            deleteExpiredNodeLock.close();
0700:                        } catch (SQLException e) {
0701:                        }
0702:                    }
0703:                    if (connection != null) {
0704:                        try {
0705:                            connection.setAutoCommit(savedautocommit);
0706:                            connection.close();
0707:                        } catch (SQLException e) {
0708:                        }
0709:                        connection = null;
0710:                    }
0711:
0712:                }
0713:
0714:            }
0715:
0716:            public boolean getLockTransaction(long nodeLifetime) {
0717:                return getLockTransaction(nodeLifetime, false);
0718:            }
0719:
0720:            /**
0721:             * @return
0722:             * @throws HibernateException
0723:             */
0724:            public boolean getLockTransaction(long nodeLifetime,
0725:                    boolean forceLock) {
0726:                if (searchIndexBuilderWorkerDao.isLockRequired()) {
0727:                    return getHardLock(nodeLifetime, forceLock);
0728:                } else {
0729:                    try {
0730:                        updateNodeLock(nodeLifetime);
0731:                    } catch (SQLException e) {
0732:                        log.warn("Failed to update node lock "
0733:                                + e.getClass().getName() + " :"
0734:                                + e.getMessage());
0735:                    }
0736:                    return true;
0737:                }
0738:            }
0739:
0740:            public boolean getHardLock(long nodeLifetime, boolean forceLock) {
0741:                String nodeID = getNodeID();
0742:                Connection connection = null;
0743:                boolean locked = false;
0744:                boolean autoCommit = false;
0745:                PreparedStatement selectLock = null;
0746:                PreparedStatement updateLock = null;
0747:                PreparedStatement insertLock = null;
0748:                PreparedStatement countWork = null;
0749:
0750:                ResultSet resultSet = null;
0751:                Timestamp now = new Timestamp(System.currentTimeMillis());
0752:                Timestamp expiryDate = new Timestamp(now.getTime()
0753:                        + (10L * 60L * 1000L));
0754:
0755:                try {
0756:
0757:                    // I need to go direct to JDBC since its just too awful to
0758:                    // try and do this in Hibernate.
0759:
0760:                    updateNodeLock(nodeLifetime);
0761:
0762:                    connection = dataSource.getConnection();
0763:                    autoCommit = connection.getAutoCommit();
0764:                    if (autoCommit) {
0765:                        connection.setAutoCommit(false);
0766:                    }
0767:
0768:                    selectLock = connection.prepareStatement(SELECT_LOCK_SQL);
0769:                    updateLock = connection.prepareStatement(UPDATE_LOCK_SQL);
0770:                    insertLock = connection.prepareStatement(INSERT_LOCK_SQL);
0771:                    countWork = connection.prepareStatement(COUNT_WORK_SQL);
0772:
0773:                    SearchWriterLock swl = null;
0774:                    selectLock.clearParameters();
0775:                    selectLock.setString(1, LOCKKEY);
0776:                    resultSet = selectLock.executeQuery();
0777:                    if (resultSet.next()) {
0778:                        swl = new SearchWriterLockImpl();
0779:                        swl.setId(resultSet.getString(1));
0780:                        swl.setNodename(resultSet.getString(2));
0781:                        swl.setLockkey(resultSet.getString(3));
0782:                        swl.setExpires(resultSet.getTimestamp(4));
0783:                        log.debug("GOT Lock Record " + swl.getId() + "::"
0784:                                + swl.getNodename() + "::" + swl.getExpires());
0785:
0786:                    }
0787:
0788:                    resultSet.close();
0789:                    resultSet = null;
0790:
0791:                    boolean takelock = false;
0792:                    if (swl == null) {
0793:                        log.debug("_-------------NO Lock Record");
0794:                        takelock = true;
0795:                    } else if ("none".equals(swl.getNodename())) {
0796:                        takelock = true;
0797:                        log.debug(nodeID + "_-------------no lock");
0798:                    } else if (nodeID.equals(swl.getNodename())) {
0799:                        takelock = true;
0800:                        log.debug(nodeID + "_------------matched threadid ");
0801:                    } else if (swl.getExpires() == null
0802:                            || swl.getExpires().before(now)) {
0803:                        takelock = true;
0804:                        log.debug(nodeID + "_------------thread dead ");
0805:                    }
0806:
0807:                    if (takelock) {
0808:                        // any work ?
0809:                        int nitems = 0;
0810:                        if (!forceLock) {
0811:                            countWork.clearParameters();
0812:                            countWork.setInt(1, SearchBuilderItem.STATE_PENDING
0813:                                    .intValue());
0814:                            resultSet = countWork.executeQuery();
0815:                            if (resultSet.next()) {
0816:                                nitems = resultSet.getInt(1);
0817:                            }
0818:                            resultSet.close();
0819:                            resultSet = null;
0820:                        }
0821:                        if (nitems > 0 || forceLock) {
0822:                            try {
0823:                                if (swl == null) {
0824:                                    insertLock.clearParameters();
0825:                                    insertLock.setString(1, nodeID);
0826:                                    insertLock.setString(2, nodeID);
0827:                                    insertLock.setString(3, LOCKKEY);
0828:                                    insertLock.setTimestamp(4, expiryDate);
0829:
0830:                                    if (insertLock.executeUpdate() == 1) {
0831:                                        log.debug("INSERT Lock Record "
0832:                                                + nodeID + "::" + nodeID + "::"
0833:                                                + expiryDate);
0834:
0835:                                        locked = true;
0836:                                    }
0837:
0838:                                } else {
0839:                                    updateLock.clearParameters();
0840:                                    updateLock.setString(1, nodeID);
0841:                                    updateLock.setTimestamp(2, expiryDate);
0842:                                    updateLock.setString(3, swl.getId());
0843:                                    updateLock.setString(4, swl.getNodename());
0844:                                    updateLock.setString(5, swl.getLockkey());
0845:                                    if (updateLock.executeUpdate() == 1) {
0846:                                        log.debug("UPDATED Lock Record "
0847:                                                + swl.getId() + "::" + nodeID
0848:                                                + "::" + expiryDate);
0849:                                        locked = true;
0850:                                    }
0851:
0852:                                }
0853:                            } catch (SQLException sqlex) {
0854:                                locked = false;
0855:                                log.debug(
0856:                                        "Failed to get lock, but this is Ok ",
0857:                                        sqlex);
0858:                            }
0859:
0860:                        }
0861:
0862:                    }
0863:                    connection.commit();
0864:
0865:                } catch (Exception ex) {
0866:                    if (connection != null) {
0867:                        try {
0868:                            connection.rollback();
0869:                        } catch (SQLException e) {
0870:                        }
0871:                    }
0872:                    log.error("Failed to get lock " + ex.getMessage());
0873:                    locked = false;
0874:                } finally {
0875:                    if (resultSet != null) {
0876:                        try {
0877:                            resultSet.close();
0878:                        } catch (SQLException e) {
0879:                        }
0880:                    }
0881:                    if (selectLock != null) {
0882:                        try {
0883:                            selectLock.close();
0884:                        } catch (SQLException e) {
0885:                        }
0886:                    }
0887:                    if (updateLock != null) {
0888:                        try {
0889:                            updateLock.close();
0890:                        } catch (SQLException e) {
0891:                        }
0892:                    }
0893:                    if (insertLock != null) {
0894:                        try {
0895:                            insertLock.close();
0896:                        } catch (SQLException e) {
0897:                        }
0898:                    }
0899:                    if (countWork != null) {
0900:                        try {
0901:                            countWork.close();
0902:                        } catch (SQLException e) {
0903:                        }
0904:                    }
0905:
0906:                    if (connection != null) {
0907:                        try {
0908:                            connection.setAutoCommit(autoCommit);
0909:                        } catch (SQLException e) {
0910:                        }
0911:                        try {
0912:                            connection.close();
0913:                            log.debug("Connection Closed ");
0914:                        } catch (SQLException e) {
0915:                            log.error("Error Closing Connection ", e);
0916:                        }
0917:                        connection = null;
0918:                    }
0919:                }
0920:                return locked;
0921:
0922:            }
0923:
0924:            /**
0925:             * Count the number of pending documents waiting to be indexed on this
0926:             * cluster node. All nodes will potentially perform the index in a cluster,
0927:             * however only one must be doing the index, hence this method attampts to
0928:             * grab a lock on the writer. If sucessfull it then gets the real number of
0929:             * pending documents. There is a timeout, such that if the witer has not
0930:             * been seen for 10 minutes, it is assumed that something has gon wrong with
0931:             * it, and a new writer is elected on a first grab basis. Every time the
0932:             * elected writer comes back, it updates its record to say its still active.
0933:             * We could do some round robin timeout, or allow deployers to select a pool
0934:             * of index writers in Sakai properties. {@inheritDoc}
0935:             */
0936:
0937:            private void clearLockTransaction() {
0938:                if (searchIndexBuilderWorkerDao.isLockRequired()) {
0939:                    clearHardLock();
0940:                }
0941:            }
0942:
0943:            public void clearHardLock() {
0944:                String nodeID = getNodeID();
0945:
0946:                Connection connection = null;
0947:                PreparedStatement clearLock = null;
0948:                try {
0949:                    connection = dataSource.getConnection();
0950:
0951:                    clearLock = connection.prepareStatement(CLEAR_LOCK_SQL);
0952:                    clearLock.clearParameters();
0953:                    clearLock.setString(1, NO_NODE);
0954:                    clearLock.setTimestamp(2, new Timestamp(System
0955:                            .currentTimeMillis()));
0956:                    clearLock.setString(3, nodeID);
0957:                    clearLock.setString(4, LOCKKEY);
0958:                    if (clearLock.executeUpdate() == 1) {
0959:                        log.debug("UNLOCK - OK::" + nodeID + "::now");
0960:
0961:                    } else {
0962:                        log.debug("UNLOCK - no-lock::" + nodeID + "::now");
0963:                    }
0964:                    connection.commit();
0965:
0966:                } catch (Exception ex) {
0967:                    try {
0968:                        connection.rollback();
0969:                    } catch (SQLException e) {
0970:                    }
0971:                    log.error("Failed to clear lock" + ex.getMessage());
0972:                } finally {
0973:                    if (clearLock != null) {
0974:                        try {
0975:                            clearLock.close();
0976:                        } catch (SQLException e) {
0977:                            log.error("Error Closing Prepared Statement ", e);
0978:                        }
0979:                    }
0980:                    if (connection != null) {
0981:                        try {
0982:                            connection.close();
0983:                            log.debug("Connection Closed");
0984:                        } catch (SQLException e) {
0985:                            log.error("Error Closing Connection", e);
0986:                        }
0987:                    }
0988:                }
0989:
0990:            }
0991:
0992:            public boolean isRunning() {
0993:                if (org.sakaiproject.component.cover.ComponentManager
0994:                        .hasBeenClosed()) {
0995:                    runThreads = false;
0996:                }
0997:                return runThreads;
0998:            }
0999:
1000:            /*
1001:             * (non-Javadoc)
1002:             * 
1003:             * @see org.sakaiproject.search.component.service.impl.SearchIndexBuilderWorkerAPI#checkRunning()
1004:             */
1005:            public void checkRunning() {
1006:                if (!enabled)
1007:                    return;
1008:                runThreads = true;
1009:                synchronized (threadStartLock) {
1010:                    for (int i = 0; i < indexBuilderThread.length; i++) {
1011:                        if (indexBuilderThread[i] == null) {
1012:                            indexBuilderThread[i] = new Thread(this );
1013:                            indexBuilderThread[i].setName(String.valueOf(i)
1014:                                    + "::" + this .getClass().getName());
1015:                            indexBuilderThread[i].start();
1016:                        }
1017:                    }
1018:                }
1019:                synchronized (sem) {
1020:                    log.debug("_________NOTIFY");
1021:                    sem.notify();
1022:                    log.debug("_________NOTIFY COMPLETE");
1023:                }
1024:
1025:            }
1026:
1027:            public void destroy() {
1028:                if (!enabled)
1029:                    return;
1030:
1031:                log.debug("Destroy SearchIndexBuilderWorker ");
1032:                runThreads = false;
1033:
1034:                synchronized (sem) {
1035:                    sem.notifyAll();
1036:                }
1037:            }
1038:
1039:            /**
1040:             * @return Returns the sleepTime.
1041:             */
1042:            public long getSleepTime() {
1043:                return sleepTime;
1044:            }
1045:
1046:            /**
1047:             * @param sleepTime
1048:             *        The sleepTime to set.
1049:             */
1050:            public void setSleepTime(long sleepTime) {
1051:                this .sleepTime = sleepTime;
1052:            }
1053:
1054:            /**
1055:             * @return Returns the dataSource.
1056:             */
1057:            public DataSource getDataSource() {
1058:                return dataSource;
1059:            }
1060:
1061:            /**
1062:             * @param dataSource
1063:             *        The dataSource to set.
1064:             */
1065:            public void setDataSource(DataSource dataSource) {
1066:                this .dataSource = dataSource;
1067:            }
1068:
1069:            /**
1070:             * @return Returns the searchIndexBuilderWorkerDao.
1071:             */
1072:            public SearchIndexBuilderWorkerDao getSearchIndexBuilderWorkerDao() {
1073:                return searchIndexBuilderWorkerDao;
1074:            }
1075:
1076:            /**
1077:             * @param searchIndexBuilderWorkerDao
1078:             *        The searchIndexBuilderWorkerDao to set.
1079:             */
1080:            public void setSearchIndexBuilderWorkerDao(
1081:                    SearchIndexBuilderWorkerDao searchIndexBuilderWorkerDao) {
1082:                this .searchIndexBuilderWorkerDao = searchIndexBuilderWorkerDao;
1083:            }
1084:
1085:            /*
1086:             * (non-Javadoc)
1087:             * 
1088:             * @see org.sakaiproject.search.component.service.impl.SearchIndexBuilderWorkerAPI#getCurrentLock()
1089:             */
1090:            public SearchWriterLock getCurrentLock() {
1091:                String nodeID = getNodeID();
1092:                Connection connection = null;
1093:                PreparedStatement selectLock = null;
1094:                ResultSet resultSet = null;
1095:
1096:                try {
1097:
1098:                    // I need to go direct to JDBC since its just too awful to
1099:                    // try and do this in Hibernate.
1100:
1101:                    connection = dataSource.getConnection();
1102:
1103:                    selectLock = connection.prepareStatement(SELECT_LOCK_SQL);
1104:
1105:                    SearchWriterLock swl = null;
1106:                    selectLock.clearParameters();
1107:                    selectLock.setString(1, LOCKKEY);
1108:                    resultSet = selectLock.executeQuery();
1109:                    if (resultSet.next()) {
1110:                        swl = new SearchWriterLockImpl();
1111:                        swl.setId(resultSet.getString(1));
1112:                        swl.setNodename(resultSet.getString(2));
1113:                        swl.setLockkey(resultSet.getString(3));
1114:                        swl.setExpires(resultSet.getTimestamp(4));
1115:                        log.debug("GOT Lock Record " + swl.getId() + "::"
1116:                                + swl.getNodename() + "::" + swl.getExpires());
1117:
1118:                    }
1119:
1120:                    resultSet.close();
1121:                    resultSet = null;
1122:                    if (swl == null) {
1123:                        swl = new SearchWriterLockImpl();
1124:                        swl.setNodename(NO_NODE);
1125:                        swl.setLockkey(LOCKKEY);
1126:                        swl.setExpires(new Timestamp(0));
1127:
1128:                    }
1129:                    return swl;
1130:
1131:                } catch (Exception ex) {
1132:                    log.error("Failed to get lock " + ex.getMessage());
1133:                    SearchWriterLock swl = new SearchWriterLockImpl();
1134:                    swl.setNodename(NO_NODE);
1135:                    swl.setLockkey(LOCKKEY);
1136:                    swl.setExpires(new Timestamp(0));
1137:
1138:                    return swl;
1139:                } finally {
1140:                    if (resultSet != null) {
1141:                        try {
1142:                            resultSet.close();
1143:                        } catch (SQLException e) {
1144:                        }
1145:                    }
1146:                    if (selectLock != null) {
1147:                        try {
1148:                            selectLock.close();
1149:                        } catch (SQLException e) {
1150:                        }
1151:                    }
1152:
1153:                    if (connection != null) {
1154:                        try {
1155:                            connection.close();
1156:                            log.debug("Connection Closed ");
1157:                        } catch (SQLException e) {
1158:                            log.error("Error Closing Connection ", e);
1159:                        }
1160:                        connection = null;
1161:                    }
1162:                }
1163:
1164:            }
1165:
1166:            /*
1167:             * (non-Javadoc)
1168:             * 
1169:             * @see org.sakaiproject.search.component.service.impl.SearchIndexBuilderWorkerAPI#getNodeStatus()
1170:             */
1171:            public List getNodeStatus() {
1172:                String nodeID = getNodeID();
1173:                Connection connection = null;
1174:                PreparedStatement selectLock = null;
1175:                ResultSet resultSet = null;
1176:                ArrayList locks = new ArrayList();
1177:
1178:                try {
1179:
1180:                    // I need to go direct to JDBC since its just too awful to
1181:                    // try and do this in Hibernate.
1182:
1183:                    connection = dataSource.getConnection();
1184:
1185:                    selectLock = connection
1186:                            .prepareStatement(SELECT_NODE_LOCK_SQL);
1187:
1188:                    selectLock.clearParameters();
1189:                    resultSet = selectLock.executeQuery();
1190:                    while (resultSet.next()) {
1191:                        SearchWriterLock swl = new SearchWriterLockImpl();
1192:                        swl.setId(resultSet.getString(1));
1193:                        swl.setNodename(resultSet.getString(2));
1194:                        swl.setLockkey(resultSet.getString(3));
1195:                        swl.setExpires(resultSet.getTimestamp(4));
1196:                        log.debug("GOT Lock Record " + swl.getId() + "::"
1197:                                + swl.getNodename() + "::" + swl.getExpires());
1198:                        locks.add(swl);
1199:                    }
1200:
1201:                    resultSet.close();
1202:                    resultSet = null;
1203:                    return locks;
1204:
1205:                } catch (Exception ex) {
1206:                    log.error("Failed to load nodes ", ex);
1207:                    return locks;
1208:                } finally {
1209:                    if (resultSet != null) {
1210:                        try {
1211:                            resultSet.close();
1212:                        } catch (SQLException e) {
1213:                        }
1214:                    }
1215:                    if (selectLock != null) {
1216:                        try {
1217:                            selectLock.close();
1218:                        } catch (SQLException e) {
1219:                        }
1220:                    }
1221:
1222:                    if (connection != null) {
1223:                        try {
1224:                            connection.close();
1225:                            log.debug("Connection Closed ");
1226:                        } catch (SQLException e) {
1227:                            log.error("Error Closing Connection ", e);
1228:                        }
1229:                        connection = null;
1230:                    }
1231:                }
1232:
1233:            }
1234:
1235:            public boolean removeWorkerLock() {
1236:                Connection connection = null;
1237:                PreparedStatement selectLock = null;
1238:                PreparedStatement selectNodeLock = null;
1239:                PreparedStatement clearLock = null;
1240:                ResultSet resultSet = null;
1241:                ArrayList locks = new ArrayList();
1242:
1243:                try {
1244:
1245:                    // I need to go direct to JDBC since its just too awful to
1246:                    // try and do this in Hibernate.
1247:
1248:                    connection = dataSource.getConnection();
1249:
1250:                    selectNodeLock = connection
1251:                            .prepareStatement(SELECT_NODE_LOCK_SQL);
1252:                    selectLock = connection.prepareStatement(SELECT_LOCK_SQL);
1253:                    clearLock = connection.prepareStatement(CLEAR_LOCK_SQL);
1254:
1255:                    SearchWriterLock swl = null;
1256:                    selectLock.clearParameters();
1257:                    selectLock.setString(1, LOCKKEY);
1258:                    resultSet = selectLock.executeQuery();
1259:                    if (resultSet.next()) {
1260:                        swl = new SearchWriterLockImpl();
1261:                        swl.setId(resultSet.getString(1));
1262:                        swl.setNodename(resultSet.getString(2));
1263:                        swl.setLockkey(resultSet.getString(3));
1264:                        swl.setExpires(resultSet.getTimestamp(4));
1265:                    } else {
1266:                        return true;
1267:                    }
1268:
1269:                    resultSet.close();
1270:                    resultSet = null;
1271:
1272:                    selectNodeLock.clearParameters();
1273:                    resultSet = selectLock.executeQuery();
1274:
1275:                    while (resultSet.next()) {
1276:                        SearchWriterLock node = new SearchWriterLockImpl();
1277:                        node.setId(resultSet.getString(1));
1278:                        node.setNodename(resultSet.getString(2));
1279:                        node.setLockkey(resultSet.getString(3));
1280:                        node.setExpires(resultSet.getTimestamp(4));
1281:                        if (swl.getNodename().equals(node.getNodename())) {
1282:                            log.info("Cant remove Lock to node "
1283:                                    + node.getNodename() + " node exists ");
1284:                            return false;
1285:                        }
1286:                    }
1287:
1288:                    resultSet.close();
1289:                    resultSet = null;
1290:
1291:                    clearLock.clearParameters();
1292:                    clearLock.setString(1, NO_NODE);
1293:                    clearLock.setTimestamp(2, new Timestamp(System
1294:                            .currentTimeMillis()));
1295:                    clearLock.setString(3, swl.getNodename());
1296:                    clearLock.setString(4, LOCKKEY);
1297:                    if (clearLock.executeUpdate() == 1) {
1298:                        log.warn("NODE UNLOCKED BY USER " + swl.getNodename());
1299:
1300:                    } else {
1301:                        log.info("NODE NOT UNLOCKED BY USER "
1302:                                + swl.getNodename());
1303:                        return false;
1304:                    }
1305:
1306:                    return true;
1307:
1308:                } catch (Exception ex) {
1309:                    log.error("Failed to unlock ", ex);
1310:                    return false;
1311:                } finally {
1312:                    if (resultSet != null) {
1313:                        try {
1314:                            resultSet.close();
1315:                        } catch (SQLException e) {
1316:                        }
1317:                    }
1318:                    if (selectLock != null) {
1319:                        try {
1320:                            selectLock.close();
1321:                        } catch (SQLException e) {
1322:                        }
1323:                    }
1324:                    if (selectNodeLock != null) {
1325:                        try {
1326:                            selectNodeLock.close();
1327:                        } catch (SQLException e) {
1328:                        }
1329:                    }
1330:                    if (clearLock != null) {
1331:                        try {
1332:                            clearLock.close();
1333:                        } catch (SQLException e) {
1334:                        }
1335:                    }
1336:
1337:                    if (connection != null) {
1338:                        try {
1339:                            connection.close();
1340:                            log.debug("Connection Closed ");
1341:                        } catch (SQLException e) {
1342:                            log.error("Error Closing Connection ", e);
1343:                        }
1344:                        connection = null;
1345:                    }
1346:                }
1347:            }
1348:
1349:            /**
1350:             * @return Returns the activity.
1351:             */
1352:            public int getActivity() {
1353:                return activity;
1354:            }
1355:
1356:            public long getLastEventTime() {
1357:                return lastEvent;
1358:            }
1359:
1360:            /**
1361:             * @param activity
1362:             *        The activity to set.
1363:             */
1364:            public void resetActivity() {
1365:                this .activity = 0;
1366:
1367:            }
1368:
1369:            public void incrementActivity() {
1370:                activity++;
1371:                lastEvent = System.currentTimeMillis();
1372:            }
1373:
1374:            public void setLastIndex(long l) {
1375:                this .lastIndex = l;
1376:
1377:            }
1378:
1379:            public void setStartDocIndex(long startDocIndex) {
1380:                this .startDocIndex = startDocIndex;
1381:            }
1382:
1383:            public void setNowIndexing(String reference) {
1384:                this .lastIndexing = this .nowIndexing;
1385:                this .nowIndexing = reference;
1386:            }
1387:
1388:            /**
1389:             * @return Returns the lastIndex.
1390:             */
1391:            public long getLastIndex() {
1392:                return lastIndex;
1393:            }
1394:
1395:            /**
1396:             * @return Returns the nowIndexing.
1397:             */
1398:            public String getNowIndexing() {
1399:                return nowIndexing;
1400:            }
1401:
1402:            /**
1403:             * @return Returns the startDocIndex.
1404:             */
1405:            public long getStartDocIndex() {
1406:                return startDocIndex;
1407:            }
1408:
1409:            public String getLastDocument() {
1410:                return lastIndexing;
1411:            }
1412:
1413:            public String getLastElapsed() {
1414:                long l = lastIndex;
1415:                long h = l / 3600000L;
1416:                l = l - (3600000L * h);
1417:                long m = l / 600000L;
1418:                ;
1419:                l = l - (60000L * m);
1420:                long s = l / 1000;
1421:                l = l - (1000L * s);
1422:                return "" + h + "h" + m + "m" + s + "." + l + "s";
1423:            }
1424:
1425:            public String getCurrentDocument() {
1426:                return nowIndexing;
1427:            }
1428:
1429:            public String getCurrentElapsed() {
1430:                long l = System.currentTimeMillis() - startDocIndex;
1431:                long h = l / 3600000L;
1432:                l = l - (3600000L * h);
1433:                long m = l / 60000L;
1434:                l = l - (60000L * m);
1435:                long s = l / 1000L;
1436:                l = l - (1000L * s);
1437:                return "" + h + "h" + m + "m" + s + "." + l + "s";
1438:            }
1439:
1440:            /**
1441:             * @return the loadFactor
1442:             */
1443:            public long getLoadFactor() {
1444:                return loadFactor;
1445:            }
1446:
1447:            /**
1448:             * @param loadFactor
1449:             *        the loadFactor to set
1450:             */
1451:            public void setLoadFactor(long loadFactor) {
1452:                this .loadFactor = loadFactor;
1453:            }
1454:
1455:            /**
1456:             * Is the lock on this node, but not this thread lockedTo == null, localloc ==
1457:             * false lockedTo == this node, locallock = false; lockedTo != this node,
1458:             * localLock = true
1459:             */
1460:            public boolean isLocalLock() {
1461:                if (lockedTo == null) {
1462:                    return false;
1463:                } else if (getNodeID().equals(lockedTo)) {
1464:                    return false;
1465:                }
1466:                return true;
1467:            }
1468:
1469:            /**
1470:             * @return the soakTest
1471:             */
1472:            public boolean getSoakTest() {
1473:                return soakTest;
1474:            }
1475:
1476:            /**
1477:             * Puts the index builder into a Soak test, when there are no pending items,
1478:             * it starts building again.
1479:             * 
1480:             * @param soakTest
1481:             *        the soakTest to set
1482:             */
1483:            public void setSoakTest(boolean soakTest) {
1484:
1485:                this .soakTest = soakTest;
1486:                if (soakTest) {
1487:                    log
1488:                            .warn("SOAK TEST ACTIVE ======================DONT USE FOR PRODUCTION ");
1489:                }
1490:            }
1491:
1492:            /*
1493:             * (non-Javadoc)
1494:             * 
1495:             * @see org.sakaiproject.search.api.Diagnosable#disableDiagnostics()
1496:             */
1497:            public void disableDiagnostics() {
1498:                diagnostics = false;
1499:
1500:            }
1501:
1502:            /*
1503:             * (non-Javadoc)
1504:             * 
1505:             * @see org.sakaiproject.search.api.Diagnosable#enableDiagnostics()
1506:             */
1507:            public void enableDiagnostics() {
1508:                diagnostics = true;
1509:            }
1510:
1511:            /*
1512:             * (non-Javadoc)
1513:             * 
1514:             * @see org.sakaiproject.search.api.Diagnosable#hasDiagnostics()
1515:             */
1516:            public boolean hasDiagnostics() {
1517:                return diagnostics;
1518:            }
1519:
1520:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.