Source Code Cross Referenced for ClusterFSIndexStorage.java in  » ERP-CRM-Financial » sakai » org » sakaiproject » search » index » impl » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » ERP CRM Financial » sakai » org.sakaiproject.search.index.impl 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        /**********************************************************************************
002:         * $URL: https://source.sakaiproject.org/svn/search/tags/sakai_2-4-1/search-impl/impl/src/java/org/sakaiproject/search/index/impl/ClusterFSIndexStorage.java $
003:         * $Id: ClusterFSIndexStorage.java 29635 2007-04-26 14:44:09Z ajpoland@iupui.edu $
004:         ***********************************************************************************
005:         *
006:         * Copyright (c) 2003, 2004, 2005, 2006 The Sakai Foundation.
007:         *
008:         * Licensed under the Educational Community License, Version 1.0 (the "License");
009:         * you may not use this file except in compliance with the License.
010:         * You may obtain a copy of the License at
011:         *
012:         *      http://www.opensource.org/licenses/ecl1.php
013:         *
014:         * Unless required by applicable law or agreed to in writing, software
015:         * distributed under the License is distributed on an "AS IS" BASIS,
016:         * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017:         * See the License for the specific language governing permissions and
018:         * limitations under the License.
019:         *
020:         **********************************************************************************/package org.sakaiproject.search.index.impl;
021:
022:        import java.io.File;
023:        import java.io.FileNotFoundException;
024:        import java.io.IOException;
025:        import java.util.ArrayList;
026:        import java.util.Collections;
027:        import java.util.Comparator;
028:        import java.util.Iterator;
029:        import java.util.List;
030:
031:        import org.apache.commons.logging.Log;
032:        import org.apache.commons.logging.LogFactory;
033:        import org.apache.lucene.analysis.Analyzer;
034:        import org.apache.lucene.index.IndexReader;
035:        import org.apache.lucene.index.IndexWriter;
036:        import org.apache.lucene.index.MultiReader;
037:        import org.apache.lucene.search.IndexSearcher;
038:        import org.apache.lucene.store.Directory;
039:        import org.apache.lucene.store.FSDirectory;
040:        import org.sakaiproject.search.index.AnalyzerFactory;
041:        import org.sakaiproject.search.index.ClusterFilesystem;
042:        import org.sakaiproject.search.index.IndexStorage;
043:        import org.sakaiproject.search.index.SegmentInfo;
044:
045:        /**
046:         * Inpmelemtation of IndexStorage using a Cluster File system. This
047:         * implementation perfoms all index write operations in a new temporary segment.
048:         * On completion of the index operation it is merged with the current segment.
049:         * If the current segment is larger than the threshold, a new segment is
050:         * created. Managing the segments and how they relate to the cluster is
051:         * delegateed to the ClusterFilesystem
052:         * 
053:         * @author ieb
054:         */
055:        public class ClusterFSIndexStorage implements  IndexStorage {
056:            private static final Log log = LogFactory
057:                    .getLog(ClusterFSIndexStorage.class);
058:
059:            /**
060:             * Location of the index store on local disk, passed to the underlying index
061:             * store
062:             */
063:            private String searchIndexDirectory = null;
064:
065:            /**
066:             * The token analyzer
067:             */
068:            private AnalyzerFactory analyzerFactory = null;
069:
070:            /**
071:             * Maximum size of a segment on write
072:             */
073:            private long segmentThreshold = 1024 * 1024 * 20; // Maximum Segment size
074:
075:            // is 20M
076:
077:            private ClusterFilesystem clusterFS = null;
078:
079:            // maximum size of a segment during merge
080:
081:            private long maxSegmentSize = 1024L * 1024L * 1500L; // just short of
082:
083:            // 1.5G
084:
085:            // maximum size of a segment considered for merge operations
086:            private long maxMegeSegmentSize = 1024L * 1024L * 1200L; // 1.2G
087:
088:            private boolean diagnostics;
089:
090:            public void init() {
091:            }
092:
093:            public IndexReader getIndexReader() throws IOException {
094:                return getIndexReader(true);
095:            }
096:
097:            private IndexReader getIndexReader(boolean withLock)
098:                    throws IOException {
099:                if (withLock) {
100:                    clusterFS.getLock();
101:                }
102:                List<SegmentInfo> segments = clusterFS.updateSegments();
103:                if (log.isDebugEnabled())
104:                    log.debug("Found " + segments.size() + " segments ");
105:                IndexReader[] readers = new IndexReader[segments.size()];
106:                int j = 0;
107:                for (Iterator<SegmentInfo> i = segments.iterator(); i.hasNext();) {
108:                    SegmentInfo segment = i.next();
109:                    try {
110:                        if (false) {
111:
112:                            // this code will simulate a massive index failure, where
113:                            // evey 5th segment is dammaged beyond repair.
114:                            // only enable if you want to test the recovery mechanism
115:                            if (j % 5 == 0) {
116:                                File f = segment.getSegmentLocation();
117:                                log.warn("Removing Segment for test " + f);
118:                                File[] files = f.listFiles();
119:                                for (int k = 0; k < files.length; k++) {
120:                                    files[k].delete();
121:                                }
122:                                f.delete();
123:                            }
124:                        }
125:
126:                        if (!segment.checkSegmentValidity(diagnostics,
127:                                "getIndexReader ")) {
128:                            log.warn("Checksum Failed on  " + segment);
129:                            segment.checkSegmentValidity(true,
130:                                    "getIndexReader Failed");
131:                        }
132:                        readers[j] = IndexReader.open(segment
133:                                .getSegmentLocation());
134:                    } catch (Exception ex) {
135:                        try {
136:                            if (readers[j] != null) {
137:                                try {
138:                                    readers[j].close();
139:                                    readers[j] = null;
140:                                } catch (Exception e) {
141:
142:                                }
143:                            }
144:
145:                            if (log.isDebugEnabled())
146:                                log.debug("Invalid segment  ", ex);
147:                            log
148:                                    .warn(
149:                                            "Found corrupted segment ("
150:                                                    + segment.getName()
151:                                                    + ") in Local store, attempting to recover from DB.  Reason: "
152:                                                    + ex.getClass().getName()
153:                                                    + ":" + ex.getMessage(), ex);
154:                            clusterFS.recoverSegment(segment);
155:                            readers[j] = IndexReader.open(segment
156:                                    .getSegmentLocation());
157:                            log
158:                                    .warn("Recovery complete, resuming normal operations having restored, ignore previous problems with this segment "
159:                                            + segment.getName());
160:                        } catch (Exception e) {
161:                            if (readers[j] != null) {
162:                                try {
163:                                    readers[j].close();
164:                                    readers[j] = null;
165:                                } catch (Exception ex2) {
166:
167:                                }
168:                            }
169:                            log
170:                                    .error(
171:                                            "---Problem recovering corrupted segment from the DB,\n"
172:                                                    + "--- it is probably that there has been a local hardware\n"
173:                                                    + "--- failure on this node or that the backup in the DB is missing\n"
174:                                                    + "--- or corrupt. To recover, remove the segment from the db, and rebuild the index \n"
175:                                                    + "--- eg delete from search_segments where name_ = '"
176:                                                    + segment.getName()
177:                                                    + "'; \n", ex);
178:
179:                        }
180:                    }
181:                    j++;
182:                }
183:                List<IndexReader> l = new ArrayList<IndexReader>();
184:                for (int i = 0; i < readers.length; i++) {
185:                    if (readers[i] != null) {
186:                        l.add(readers[i]);
187:                    }
188:                }
189:                if (l.size() != readers.length) {
190:                    log
191:                            .warn(" Opening index reader with a partial index set, this may result in a smallere search set than otherwise expected");
192:                }
193:                readers = l.toArray(new IndexReader[0]);
194:                if (readers.length > 0) {
195:                    IndexReader indexReader = new MultiReader(readers);
196:                    return indexReader;
197:                }
198:                throw new IOException("No Index available to open ");
199:            }
200:
201:            public IndexWriter getIndexWriter(boolean create)
202:                    throws IOException {
203:
204:                if (log.isDebugEnabled())
205:                    log.debug("+++++++++++++++++Start Index Writer Cycle   ");
206:                // to ensure that we dont dammage the index due to OutOfMemory, if it
207:                // should ever happen
208:                // we will open a temporary index, which will be merged on completion
209:                SegmentInfo currentSegment = null;
210:                IndexWriter indexWriter = null;
211:                if (false) {
212:                    List<SegmentInfo> segments = clusterFS.updateSegments();
213:                    if (log.isDebugEnabled())
214:                        log.debug("Found " + segments.size() + " segments ");
215:                    if (segments.size() > 0) {
216:                        currentSegment = segments.get(segments.size() - 1);
217:                        if (!currentSegment.isClusterSegment()
218:                                || currentSegment.getTotalSize() > segmentThreshold) {
219:                            currentSegment = null;
220:                        }
221:
222:                    }
223:                    if (currentSegment == null) {
224:                        currentSegment = clusterFS.newSegment();
225:                        if (log.isDebugEnabled())
226:                            log.debug("Created new segment "
227:                                    + currentSegment.getName());
228:                        currentSegment.touchSegment();
229:                        indexWriter = new IndexWriter(currentSegment
230:                                .getSegmentLocation(), getAnalyzer(), true);
231:                        indexWriter.setUseCompoundFile(true);
232:                        // indexWriter.setInfoStream(System.out);
233:                        indexWriter.setMaxMergeDocs(50);
234:                        indexWriter.setMergeFactor(50);
235:                    } else {
236:                        currentSegment.touchSegment();
237:                        indexWriter = new IndexWriter(currentSegment
238:                                .getSegmentLocation(), getAnalyzer(), false);
239:                        indexWriter.setUseCompoundFile(true);
240:                        // indexWriter.setInfoStream(System.out);
241:                        indexWriter.setMaxMergeDocs(50);
242:                        indexWriter.setMergeFactor(50);
243:                    }
244:                    if (log.isDebugEnabled())
245:                        log
246:                                .debug("Using Current Index Writer "
247:                                        + currentSegment.getSegmentLocation()
248:                                                .getPath());
249:                } else {
250:                    File tempIndex = clusterFS.getTemporarySegment(true);
251:                    indexWriter = new IndexWriter(tempIndex, getAnalyzer(),
252:                            true);
253:                    indexWriter.setUseCompoundFile(true);
254:                    // indexWriter.setInfoStream(System.out);
255:                    indexWriter.setMaxMergeDocs(50);
256:                    indexWriter.setMergeFactor(50);
257:                    if (log.isDebugEnabled())
258:                        log.debug("Using Temp Index Writer "
259:                                + tempIndex.getPath());
260:                }
261:                return indexWriter;
262:            }
263:
264:            public IndexSearcher getIndexSearcher() throws IOException {
265:
266:                IndexSearcher indexSearcher = null;
267:                try {
268:                    long reloadStart = System.currentTimeMillis();
269:                    log.debug("Open Search");
270:                    indexSearcher = new IndexSearcher(getIndexReader(false));
271:                    if (indexSearcher == null) {
272:                        log.warn("No search Index exists at this time");
273:
274:                    }
275:                    long reloadEnd = System.currentTimeMillis();
276:                    if (log.isDebugEnabled())
277:                        log.debug("Reload Complete " + indexSearcher.maxDoc()
278:                                + " in " + (reloadEnd - reloadStart));
279:
280:                } catch (FileNotFoundException e) {
281:                    try {
282:                        indexSearcher.close();
283:                    } catch (Exception ex) {
284:                    }
285:                    indexSearcher = null;
286:                    log.error("There has been a major poblem with the"
287:                            + " Search Index which has become corrupted ", e);
288:                } catch (IOException e) {
289:                    try {
290:                        indexSearcher.close();
291:                    } catch (Exception ex) {
292:                    }
293:                    indexSearcher = null;
294:                    log.error("There has been a major poblem with the "
295:                            + "Search Index which has become corrupted", e);
296:                }
297:                return indexSearcher;
298:            }
299:
300:            public boolean indexExists() {
301:                List<SegmentInfo> segments = clusterFS.updateSegments();
302:                return (segments.size() > 0);
303:            }
304:
305:            public Analyzer getAnalyzer() {
306:                return analyzerFactory.newAnalyzer();
307:            }
308:
309:            public void setLocation(String location) {
310:                searchIndexDirectory = location;
311:                if (clusterFS != null) {
312:                    clusterFS.setLocation(location);
313:                }
314:
315:            }
316:
317:            public void doPreIndexUpdate() throws IOException {
318:                if (log.isDebugEnabled())
319:                    log.debug("Start Index Cycle");
320:                // dont enable locks
321:                FSDirectory.setDisableLocks(true);
322:
323:            }
324:
325:            public void doPostIndexUpdate() throws IOException {
326:            }
327:
328:            private void mergeAndUpdate(boolean merge) throws IOException {
329:                if (merge) {
330:                    FSDirectory.setDisableLocks(true);
331:                    // get the tmp index
332:                    File tmpSegment = clusterFS.getTemporarySegment(false);
333:                    Directory[] tmpDirectory = new Directory[1];
334:                    tmpDirectory[0] = FSDirectory.getDirectory(tmpSegment,
335:                            false);
336:
337:                    // Need to fix checksums before merging.... is that really true,
338:                    // 
339:
340:                    List<SegmentInfo> segments = clusterFS.updateSegments();
341:
342:                    if (log.isDebugEnabled())
343:                        log.debug("Merge Phase 1: Starting on "
344:                                + segments.size() + " segments ");
345:
346:                    // merge it with the current index
347:                    SegmentInfo currentSegment = null;
348:
349:                    if (log.isDebugEnabled())
350:                        log.debug("Found " + segments.size() + " segments ");
351:                    if (segments.size() > 0) {
352:                        currentSegment = segments.get(segments.size() - 1);
353:                        if (currentSegment != null) {
354:                            if (!currentSegment.isClusterSegment()
355:                                    || (currentSegment.getTotalSize() > segmentThreshold)
356:                                    || currentSegment.isDeleted()) {
357:                                if (diagnostics) {
358:                                    log
359:                                            .info("Current Segment not suitable, generating new segment "
360:                                                    + (currentSegment
361:                                                            .isDeleted() ? "deleted,"
362:                                                            : "")
363:                                                    + (!currentSegment
364:                                                            .isClusterSegment() ? "non-cluster,"
365:                                                            : "")
366:                                                    + ((currentSegment
367:                                                            .getTotalSize() > segmentThreshold) ? "toobig,"
368:                                                            : ""));
369:                                }
370:                                currentSegment = null;
371:                            }
372:                        }
373:
374:                    }
375:                    if (currentSegment == null) {
376:                        if (tmpDirectory[0].fileExists("segments")) {
377:                            currentSegment = clusterFS.saveTemporarySegment();
378:                            /*
379:                             * We must add the new current segment to the list of segments so if it 
380:                             * gets merged in the next step is is not left out
381:                             */
382:                            segments.add(currentSegment);
383:                            /*
384:                             * We should touch the segment to notify that it has been updated
385:                             */
386:                            currentSegment.touchSegment();
387:                        } else {
388:                            log
389:                                    .warn("No Segment Created during indexing process, this should not happen, although it is possible tha the indexing operation did not find any files to index.");
390:                        }
391:                    } else {
392:                        IndexWriter indexWriter = null;
393:                        try {
394:                            if (log.isDebugEnabled())
395:                                log.debug("Using Existing Segment "
396:                                        + currentSegment.getName());
397:                            currentSegment.touchSegment();
398:                            indexWriter = new IndexWriter(FSDirectory
399:                                    .getDirectory(currentSegment
400:                                            .getSegmentLocation(), false),
401:                                    getAnalyzer(), false);
402:                            indexWriter.setUseCompoundFile(true);
403:                            // indexWriter.setInfoStream(System.out);
404:                            indexWriter.setMaxMergeDocs(50);
405:                            indexWriter.setMergeFactor(50);
406:
407:                            if (tmpDirectory[0].fileExists("segments")) {
408:                                if (log.isDebugEnabled())
409:                                    log.debug("Merging Temp segment "
410:                                            + tmpSegment.getPath()
411:                                            + " with current segment "
412:                                            + currentSegment
413:                                                    .getSegmentLocation()
414:                                                    .getPath());
415:                                indexWriter.addIndexes(tmpDirectory);
416:                                indexWriter.optimize();
417:                            } else {
418:                                log.warn("No Merge performed, no tmp segment");
419:                            }
420:                        } finally {
421:                            try {
422:                                indexWriter.close();
423:                                currentSegment.touchSegment();
424:                            } catch (Exception ex) {
425:                                // dotn care if this fails
426:                            }
427:                        }
428:                    }
429:
430:                    /*
431:                     * segments in now a list of all segments including the current segment
432:                     */
433:
434:                    // create a size sorted list
435:                    if (segments.size() > 10) {
436:                        if (log.isDebugEnabled())
437:                            log.debug("Merge Phase 0 : Stating");
438:                        // long[] segmentSize = new long[segments.size() - 1];
439:                        // File[] segmentName = new File[segments.size() - 1];
440:                        for (Iterator<SegmentInfo> i = segments.iterator(); i
441:                                .hasNext();) {
442:                            i.next().loadSize();
443:                        }
444:
445:                        Collections.sort(segments,
446:                                new Comparator<SegmentInfo>() {
447:
448:                                    public int compare(SegmentInfo o1,
449:                                            SegmentInfo o2) {
450:                                        long l = o1.getSize() - o2.getSize();
451:                                        if (l == 0) {
452:                                            return 0;
453:                                        } else if (l < 0) {
454:                                            return -1;
455:                                        } else {
456:                                            return 1;
457:                                        }
458:                                    }
459:
460:                                });
461:
462:                        long sizeBlock = 0;
463:                        int ninblock = 0;
464:                        int mergegroupno = 1;
465:                        int[] mergegroup = new int[segments.size()];
466:                        int[] groupstomerge = new int[segments.size()];
467:                        mergegroup[0] = mergegroupno;
468:                        {
469:                            int j = 0;
470:                            for (int i = 0; i < mergegroup.length; i++) {
471:                                if (segments.get(i).getSize() < maxMegeSegmentSize) {
472:                                    groupstomerge[i] = 0;
473:                                    if (ninblock == 0) {
474:                                        sizeBlock = segments.get(0).getSize();
475:                                        ninblock = 1;
476:                                        if (log.isDebugEnabled())
477:                                            log.debug("Order Size = "
478:                                                    + sizeBlock);
479:                                    }
480:
481:                                    if (segments.get(i).getSize() > sizeBlock / 10) {
482:                                        ninblock++;
483:                                        // count up blocks that have the same order of
484:                                        // size
485:                                    } else {
486:                                        // if there are more than 2 in the block force a
487:                                        // merge
488:                                        if (ninblock >= 2) {
489:                                            groupstomerge[j++] = mergegroupno;
490:                                        }
491:
492:                                        // reset for the next order of magnitude down
493:                                        ninblock = 1;
494:                                        mergegroupno++;
495:                                        sizeBlock = segments.get(i).getSize();
496:                                    }
497:                                    mergegroup[i] = mergegroupno;
498:                                }
499:                            }
500:                            // catch the merge all case
501:                            if (ninblock >= 2) {
502:                                groupstomerge[j++] = mergegroupno;
503:                            }
504:                            if (j > 0) {
505:                                StringBuffer status = new StringBuffer();
506:                                for (int i = 0; i < segments.size(); i++) {
507:                                    SegmentInfo si = segments.get(i);
508:                                    status.append("Segment ").append(i).append(
509:                                            " n").append(si.getName()).append(
510:                                            " s").append(si.getSize()).append(
511:                                            " g").append(mergegroup[i]).append(
512:                                            "\n");
513:                                }
514:                                for (int i = 0; i < groupstomerge.length; i++) {
515:                                    status.append("Merge group ").append(i)
516:                                            .append(" m").append(
517:                                                    groupstomerge[i]).append(
518:                                                    "\n");
519:                                }
520:                                if (log.isDebugEnabled())
521:                                    log.debug("Search Merge \n" + status);
522:                            }
523:
524:                        }
525:                        // groups to merge contains a list of group numbers that need to
526:                        // be
527:                        // merged.
528:                        // mergegroup marks each segment with a group number.
529:                        for (int i = 0; i < groupstomerge.length; i++) {
530:                            if (groupstomerge[i] != 0) {
531:                                StringBuffer status = new StringBuffer();
532:                                status.append("Group ").append(i).append(
533:                                        " Merge ").append(groupstomerge[i])
534:                                        .append("\n");
535:
536:                                // merge the old segments into a new segment.
537:
538:                                SegmentInfo mergeSegment = clusterFS
539:                                        .newSegment();
540:
541:                                IndexWriter mergeIndexWriter = null;
542:                                boolean mergeOk = false;
543:                                try {
544:                                    mergeIndexWriter = new IndexWriter(
545:                                            FSDirectory
546:                                                    .getDirectory(
547:                                                            mergeSegment
548:                                                                    .getSegmentLocation(),
549:                                                            false),
550:                                            getAnalyzer(), true);
551:                                    mergeIndexWriter.setUseCompoundFile(true);
552:                                    // indexWriter.setInfoStream(System.out);
553:                                    mergeIndexWriter.setMaxMergeDocs(50);
554:                                    mergeIndexWriter.setMergeFactor(50);
555:                                    List<Directory> indexes = new ArrayList<Directory>();
556:                                    long currentSize = 0L;
557:                                    for (int j = 0; j < mergegroup.length; j++) {
558:                                        // find if this segment is in the current merge group
559:                                        SegmentInfo si = segments.get(j);
560:                                        if (mergegroup[j] == groupstomerge[i]) {
561:                                            // if we merge this segment will the result
562:                                            // probably remain small enough
563:                                            if (si.isDeleted()) {
564:                                                status
565:                                                        .append(
566:                                                                "   Skipped, Segment is already deleted  ")
567:                                                        .append(" ")
568:                                                        .append(si.getName())
569:                                                        .append(" || ")
570:                                                        .append(
571:                                                                mergeSegment
572:                                                                        .getName())
573:                                                        .append("\n");
574:                                            } else if ((currentSize + si
575:                                                    .getSize()) < maxSegmentSize) {
576:                                                currentSize += si.getSize();
577:
578:                                                Directory d = FSDirectory
579:                                                        .getDirectory(
580:                                                                si
581:                                                                        .getSegmentLocation(),
582:                                                                false);
583:                                                if (d.fileExists("segments")) {
584:                                                    status
585:                                                            .append("   Merge ")
586:                                                            .append(
587:                                                                    si
588:                                                                            .getName())
589:                                                            .append(" >> ")
590:                                                            .append(
591:                                                                    mergeSegment
592:                                                                            .getName())
593:                                                            .append("\n");
594:                                                    indexes.add(d);
595:                                                } else {
596:                                                    status
597:                                                            .append(
598:                                                                    "   Ignored segment as it does not exist ")
599:                                                            .append(
600:                                                                    mergeSegment
601:                                                                            .getName())
602:                                                            .append("\n");
603:
604:                                                }
605:                                            } else {
606:                                                status
607:                                                        .append(
608:                                                                "   Skipped, size >  ")
609:                                                        .append(maxSegmentSize)
610:                                                        .append(" ")
611:                                                        .append(si.getName())
612:                                                        .append(" || ")
613:                                                        .append(
614:                                                                mergeSegment
615:                                                                        .getName())
616:                                                        .append("\n");
617:                                                // Dont merge this segment this time
618:                                                mergegroup[j] = -10;
619:                                            }
620:
621:                                        }
622:                                    }
623:                                    // merge in the list of segments that we have waiting to be merged
624:                                    if (diagnostics) {
625:                                        log.info("Merging \n" + status);
626:                                    }
627:                                    mergeIndexWriter
628:                                            .addIndexes((Directory[]) indexes
629:                                                    .toArray(new Directory[indexes
630:                                                            .size()]));
631:                                    mergeIndexWriter.optimize();
632:                                    if (diagnostics) {
633:                                        log.info("Merged Segment contians "
634:                                                + mergeIndexWriter.docCount()
635:                                                + " documents ");
636:                                    }
637:
638:                                    mergeIndexWriter.close();
639:                                    // mark the segment as create and ready of upload
640:                                    mergeSegment.setCreated();
641:                                    mergeSegment.touchSegment();
642:
643:                                    if (log.isDebugEnabled())
644:                                        log.debug("Done " + groupstomerge[i]);
645:                                    mergeIndexWriter = null;
646:                                    // remove old segments
647:                                    mergeOk = true;
648:                                } catch (Exception ex) {
649:                                    log
650:                                            .error("Failed to merge search segments "
651:                                                    + ex.getMessage());
652:                                    try {
653:                                        mergeIndexWriter.close();
654:                                    } catch (Exception ex2) {
655:                                    }
656:                                    try {
657:                                        clusterFS
658:                                                .removeLocalSegment(mergeSegment);
659:                                    } catch (Exception ex2) {
660:                                        log
661:                                                .error("Failed to remove merge segment "
662:                                                        + mergeSegment
663:                                                                .getName()
664:                                                        + " "
665:                                                        + ex2.getMessage());
666:                                    }
667:
668:                                } finally {
669:                                    try {
670:                                        mergeIndexWriter.close();
671:                                    } catch (Exception ex) {
672:                                    }
673:                                }
674:                                if (mergeOk) {
675:                                    for (int j = 0; j < mergegroup.length; j++) {
676:                                        if (mergegroup[j] == groupstomerge[i]) {
677:                                            clusterFS
678:                                                    .removeLocalSegment(segments
679:                                                            .get(j));
680:                                        }
681:                                    }
682:                                }
683:                            }
684:                        }
685:                    }
686:                } else {
687:                    log.debug("Merge Not requested ");
688:                }
689:                clusterFS.removeTemporarySegment();
690:
691:                clusterFS.saveSegments();
692:                if (log.isDebugEnabled())
693:                    log
694:                            .debug("+++++++++++++++++++++++++++++++++++++End Index Cycle");
695:            }
696:
697:            /**
698:             * @return Returns the analzyserFactory.
699:             */
700:            public AnalyzerFactory getAnalyzerFactory() {
701:                return analyzerFactory;
702:            }
703:
704:            /**
705:             * @param analzyserFactory
706:             *        The analzyserFactory to set.
707:             */
708:            public void setAnalyzerFactory(AnalyzerFactory analzyserFactory) {
709:                this .analyzerFactory = analzyserFactory;
710:            }
711:
712:            public void setRecoverCorruptedIndex(boolean recover) {
713:            }
714:
715:            /**
716:             * @return Returns the clusterFS.
717:             */
718:            public ClusterFilesystem getClusterFS() {
719:                return clusterFS;
720:            }
721:
722:            /**
723:             * @param clusterFS
724:             *        The clusterFS to set.
725:             */
726:            public void setClusterFS(ClusterFilesystem clusterFS) {
727:                this .clusterFS = clusterFS;
728:            }
729:
730:            public long getLastUpdate() {
731:                return clusterFS.getLastUpdate();
732:            }
733:
734:            public List getSegmentInfoList() {
735:                return clusterFS.getSegmentInfoList();
736:            }
737:
738:            public void closeIndexReader(IndexReader indexReader)
739:                    throws IOException {
740:                if (indexReader != null) {
741:                    indexReader.close();
742:                }
743:
744:                // only update required, no merge
745:                clusterFS.getLock();
746:                mergeAndUpdate(false);
747:                clusterFS.releaseLock();
748:                // if a lock was aquired, the lock should be released and the indx
749:                // synchronised
750:
751:            }
752:
753:            public void closeIndexWriter(IndexWriter indexWrite)
754:                    throws IOException {
755:                if (indexWrite != null) {
756:                    indexWrite.close();
757:                }
758:                clusterFS.getLock();
759:                mergeAndUpdate(true);
760:                clusterFS.releaseLock();
761:                // we should aquire a lock, merge in the index and sync
762:            }
763:
764:            public boolean isMultipleIndexers() {
765:                return clusterFS.isMultipleIndexers();
766:            }
767:
768:            public void closeIndexSearcher(IndexSearcher indexSearcher) {
769:                IndexReader indexReader = indexSearcher.getIndexReader();
770:                boolean closedAlready = false;
771:                try {
772:                    if (indexReader != null) {
773:                        indexReader.close();
774:                        closedAlready = true;
775:                    }
776:                } catch (Exception ex) {
777:                    log
778:                            .error("Failed to close Index Reader "
779:                                    + ex.getMessage());
780:                }
781:                try {
782:                    indexSearcher.close();
783:                } catch (Exception ex) {
784:                    if (closedAlready) {
785:                        log.debug("Failed to close Index Searcher "
786:                                + ex.getMessage());
787:                    } else {
788:                        log.error("Failed to close Index Searcher "
789:                                + ex.getMessage());
790:                    }
791:
792:                }
793:            }
794:
795:            /**
796:             * @return the maxMegeSegmentSize
797:             */
798:            public long getMaxMegeSegmentSize() {
799:                return maxMegeSegmentSize;
800:            }
801:
802:            /**
803:             * @param maxMegeSegmentSize the maxMegeSegmentSize to set
804:             */
805:            public void setMaxMegeSegmentSize(long maxMegeSegmentSize) {
806:                log.info("Max Segment Merge Size set to " + maxMegeSegmentSize);
807:                this .maxMegeSegmentSize = maxMegeSegmentSize;
808:            }
809:
810:            /**
811:             * @return the maxSegmentSize
812:             */
813:            public long getMaxSegmentSize() {
814:                return maxSegmentSize;
815:            }
816:
817:            /**
818:             * @param maxSegmentSize the maxSegmentSize to set
819:             */
820:            public void setMaxSegmentSize(long maxSegmentSize) {
821:                log.info("Max Segment Size set to " + maxSegmentSize);
822:                this .maxSegmentSize = maxSegmentSize;
823:            }
824:
825:            /**
826:             * @return the segmentThreshold
827:             */
828:            public long getSegmentThreshold() {
829:                return segmentThreshold;
830:            }
831:
832:            /**
833:             * @param segmentThreshold the segmentThreshold to set
834:             */
835:            public void setSegmentThreshold(long segmentThreshold) {
836:                log.info("New Segment Size threshold set to "
837:                        + segmentThreshold);
838:                this .segmentThreshold = segmentThreshold;
839:            }
840:
841:            /* (non-Javadoc)
842:             * @see org.sakaiproject.search.api.Diagnosable#disableDiagnostics()
843:             */
844:            public void disableDiagnostics() {
845:                diagnostics = false;
846:            }
847:
848:            /* (non-Javadoc)
849:             * @see org.sakaiproject.search.api.Diagnosable#enableDiagnostics()
850:             */
851:            public void enableDiagnostics() {
852:                diagnostics = true;
853:            }
854:
855:            /* (non-Javadoc)
856:             * @see org.sakaiproject.search.api.Diagnosable#hasDiagnostics()
857:             */
858:            public boolean hasDiagnostics() {
859:                return diagnostics;
860:            }
861:
862:            /* (non-Javadoc)
863:             * @see org.sakaiproject.search.index.IndexStorage#centralIndexExists()
864:             */
865:            public boolean centralIndexExists() {
866:                return clusterFS.centralIndexExists();
867:            }
868:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.