001: /*
002: * Copyright 2004-2006 the original author or authors.
003: *
004: * Licensed under the Apache License, Version 2.0 (the "License");
005: * you may not use this file except in compliance with the License.
006: * You may obtain a copy of the License at
007: *
008: * http://www.apache.org/licenses/LICENSE-2.0
009: *
010: * Unless required by applicable law or agreed to in writing, software
011: * distributed under the License is distributed on an "AS IS" BASIS,
012: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013: * See the License for the specific language governing permissions and
014: * limitations under the License.
015: */
016:
017: package org.apache.lucene.index;
018:
019: import java.io.IOException;
020:
021: import org.apache.lucene.store.Directory;
022: import org.compass.core.executor.ExecutorManager;
023: import org.compass.core.transaction.context.TransactionContext;
024: import org.compass.core.transaction.context.TransactionalRunnable;
025:
026: /**
027: * The executor merge scheduler is similar to Lucene {@link org.apache.lucene.index.ConcurrentMergeScheduler}
028: * but instead of spawning threads it uses Compass {@link org.compass.core.executor.ExecutorManager} in order
029: * to execute the merges.
030: *
031: * <p>Since the executor manager is a thread pool, there is no need to have the running threads continue to
032: * ask the index writer for more merges. Instead, it simple reexecute another possible merge with the executor manager.
033: *
034: * @author kimchy
035: */
036: // LUCENE MONITOR
037: public class ExecutorMergeScheduler extends MergeScheduler {
038:
039: private ExecutorManager executorManager;
040:
041: private TransactionContext transactionContext;
042:
043: private volatile int currentConcurrentMerges = 0;
044: private volatile int maxConcurrentMerges = 3;
045:
046: private Directory dir;
047:
048: private boolean closed;
049:
050: private IndexWriter writer;
051:
052: public ExecutorMergeScheduler(ExecutorManager executorManager,
053: TransactionContext transactionContext) {
054: this .executorManager = executorManager;
055: this .transactionContext = transactionContext;
056: }
057:
058: public int getMaxConcurrentMerges() {
059: return maxConcurrentMerges;
060: }
061:
062: public void setMaxConcurrentMerges(int maxConcurrentMerges) {
063: this .maxConcurrentMerges = maxConcurrentMerges;
064: }
065:
066: private void message(String message) {
067: if (writer != null)
068: writer.message("EMS: " + message);
069: }
070:
071: public void close() {
072: closed = true;
073: }
074:
075: public void merge(IndexWriter writer) throws CorruptIndexException,
076: IOException {
077:
078: this .writer = writer;
079:
080: dir = writer.getDirectory();
081:
082: // First, quickly run through the newly proposed merges
083: // and add any orthogonal merges (ie a merge not
084: // involving segments already pending to be merged) to
085: // the queue. If we are way behind on merging, many of
086: // these newly proposed merges will likely already be
087: // registered.
088:
089: message("now merge");
090: message(" index: " + writer.segString());
091:
092: // Iterate, pulling from the IndexWriter's queue of
093: // pending merges, until its empty:
094: while (true) {
095:
096: // TODO: we could be careful about which merges to do in
097: // the BG (eg maybe the "biggest" ones) vs FG, which
098: // merges to do first (the easiest ones?), etc.
099:
100: MergePolicy.OneMerge merge = writer.getNextMerge();
101: if (merge == null) {
102: message(" no more merges pending; now return");
103: return;
104: }
105:
106: // We do this w/ the primary thread to keep
107: // deterministic assignment of segment names
108: writer.mergeInit(merge);
109:
110: message(" consider merge " + merge.segString(dir));
111:
112: if (merge.isExternal) {
113: message(" merge involves segments from an external directory; now run in foreground");
114: } else {
115: synchronized (this ) {
116: if (currentConcurrentMerges < maxConcurrentMerges) {
117: // OK to spawn a new merge thread to handle this
118: // merge:
119: currentConcurrentMerges++;
120: MergeThread merger = new MergeThread(writer,
121: merge);
122: executorManager
123: .submit(new TransactionalRunnable(
124: transactionContext, merger));
125: message(" executed merge in executor manager");
126: continue;
127: } else
128: message(" too many merge threads running; run merge in foreground");
129: }
130: }
131:
132: // Too many merge threads already running, so we do
133: // this in the foreground of the calling thread
134: writer.merge(merge);
135: }
136: }
137:
138: private class MergeThread implements Runnable {
139:
140: IndexWriter writer;
141: MergePolicy.OneMerge startMerge;
142: MergePolicy.OneMerge runningMerge;
143:
144: public MergeThread(IndexWriter writer,
145: MergePolicy.OneMerge startMerge) throws IOException {
146: this .writer = writer;
147: this .startMerge = startMerge;
148: }
149:
150: public synchronized void setRunningMerge(
151: MergePolicy.OneMerge merge) {
152: runningMerge = merge;
153: }
154:
155: public synchronized MergePolicy.OneMerge getRunningMerge() {
156: return runningMerge;
157: }
158:
159: public void run() {
160: // First time through the while loop we do the merge
161: // that we were started with:
162: MergePolicy.OneMerge merge = this .startMerge;
163:
164: // COMPASS: If we get into this because of another reschecdule, we set just before we run it the
165: // running merge, so, if it is not null, we use that one instead of the startMerge
166: if (runningMerge != null) {
167: merge = runningMerge;
168: }
169:
170: try {
171:
172: message(" merge thread: start");
173:
174: // Compass: No need to execute continous merges, we simply reschedule another merge, if there is any, using executor manager
175: // while (true) {
176: setRunningMerge(merge);
177: writer.merge(merge);
178:
179: // Subsequent times through the loop we do any new
180: // merge that writer says is necessary:
181: merge = writer.getNextMerge();
182: if (merge != null) {
183: writer.mergeInit(merge);
184: message(" merge thread: do another merge "
185: + merge.segString(dir));
186: // COMPASS: Set the running merge so it will be picked up in the next run
187: setRunningMerge(merge);
188: executorManager.submit(new TransactionalRunnable(
189: transactionContext, this ));
190: } else {
191: currentConcurrentMerges--;
192: }
193: // }
194:
195: message(" merge thread: done");
196:
197: } catch (Throwable exc) {
198:
199: if (merge != null) {
200: merge.setException(exc);
201: writer.addMergeException(merge);
202: }
203:
204: // Ignore the exception if it was due to abort:
205: if (!(exc instanceof MergePolicy.MergeAbortedException)) {
206: if (!suppressExceptions) {
207: // suppressExceptions is normally only set during
208: // testing.
209: anyExceptions = true;
210: throw new MergePolicy.MergeException(exc);
211: }
212: }
213: } finally {
214: if (merge == null) { // only decrease if we have no more merges and we actually exit
215: synchronized (ExecutorMergeScheduler.this ) {
216: // ExecutorMergeScheduler.this.notifyAll();
217: }
218: }
219: }
220: }
221:
222: public String toString() {
223: MergePolicy.OneMerge merge = getRunningMerge();
224: if (merge == null)
225: merge = startMerge;
226: return "merge thread: " + merge.segString(dir);
227: }
228: }
229:
230: static boolean anyExceptions = false;
231:
232: private boolean suppressExceptions;
233:
234: /**
235: * Used for testing
236: */
237: void setSuppressExceptions() {
238: suppressExceptions = true;
239: }
240:
241: /**
242: * Used for testing
243: */
244: void clearSuppressExceptions() {
245: suppressExceptions = false;
246: }
247: }
|