001: package org.apache.lucene.index;
002:
003: /**
004: * Licensed to the Apache Software Foundation (ASF) under one or more
005: * contributor license agreements. See the NOTICE file distributed with
006: * this work for additional information regarding copyright ownership.
007: * The ASF licenses this file to You under the Apache License, Version 2.0
008: * (the "License"); you may not use this file except in compliance with
009: * the License. You may obtain a copy of the License at
010: *
011: * http://www.apache.org/licenses/LICENSE-2.0
012: *
013: * Unless required by applicable law or agreed to in writing, software
014: * distributed under the License is distributed on an "AS IS" BASIS,
015: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016: * See the License for the specific language governing permissions and
017: * limitations under the License.
018: */
019:
020: import org.apache.lucene.store.Directory;
021:
022: import java.io.IOException;
023: import java.util.List;
024: import java.util.ArrayList;
025:
026: /** A {@link MergeScheduler} that runs each merge using a
027: * separate thread, up until a maximum number of threads
028: * ({@link #setMaxThreadCount}) at which points merges are
029: * run in the foreground, serially. This is a simple way
030: * to use concurrency in the indexing process without
031: * having to create and manage application level
032: * threads. */
033:
034: public class ConcurrentMergeScheduler extends MergeScheduler {
035:
036: private int mergeThreadPriority = -1;
037:
038: private List mergeThreads = new ArrayList();
039: private int maxThreadCount = 3;
040:
041: private List exceptions = new ArrayList();
042: private Directory dir;
043:
044: private boolean closed;
045: private IndexWriter writer;
046:
047: public ConcurrentMergeScheduler() {
048: if (allInstances != null) {
049: // Only for testing
050: addMyself();
051: }
052: }
053:
054: /** Sets the max # simultaneous threads that may be
055: * running. If a merge is necessary yet we already have
056: * this many threads running, the merge is returned back
057: * to IndexWriter so that it runs in the "foreground". */
058: public void setMaxThreadCount(int count) {
059: if (count < 1)
060: throw new IllegalArgumentException(
061: "count should be at least 1");
062: maxThreadCount = count;
063: }
064:
065: /** Get the max # simultaneous threads that may be
066: * running. @see #setMaxThreadCount. */
067: public int getMaxThreadCount() {
068: return maxThreadCount;
069: }
070:
071: /** Return the priority that merge threads run at. By
072: * default the priority is 1 plus the priority of (ie,
073: * slightly higher priority than) the first thread that
074: * calls merge. */
075: public synchronized int getMergeThreadPriority() {
076: initMergeThreadPriority();
077: return mergeThreadPriority;
078: }
079:
080: /** Return the priority that merge threads run at. */
081: public synchronized void setMergeThreadPriority(int pri) {
082: if (pri > Thread.MAX_PRIORITY || pri < Thread.MIN_PRIORITY)
083: throw new IllegalArgumentException(
084: "priority must be in range " + Thread.MIN_PRIORITY
085: + " .. " + Thread.MAX_PRIORITY
086: + " inclusive");
087: mergeThreadPriority = pri;
088:
089: final int numThreads = mergeThreadCount();
090: for (int i = 0; i < numThreads; i++) {
091: MergeThread merge = (MergeThread) mergeThreads.get(i);
092: merge.setThreadPriority(pri);
093: }
094: }
095:
096: private void message(String message) {
097: if (writer != null)
098: writer.message("CMS: " + message);
099: }
100:
101: private synchronized void initMergeThreadPriority() {
102: if (mergeThreadPriority == -1) {
103: // Default to slightly higher priority than our
104: // calling thread
105: mergeThreadPriority = 1 + Thread.currentThread()
106: .getPriority();
107: if (mergeThreadPriority > Thread.MAX_PRIORITY)
108: mergeThreadPriority = Thread.MAX_PRIORITY;
109: }
110: }
111:
112: public void close() {
113: closed = true;
114: }
115:
116: public synchronized void sync() {
117: while (mergeThreadCount() > 0) {
118: message("now wait for threads; currently "
119: + mergeThreads.size() + " still running");
120: final int count = mergeThreads.size();
121: for (int i = 0; i < count; i++)
122: message(" " + i + ": "
123: + ((MergeThread) mergeThreads.get(i)));
124:
125: try {
126: wait();
127: } catch (InterruptedException e) {
128: }
129: }
130: }
131:
132: private synchronized int mergeThreadCount() {
133: int count = 0;
134: final int numThreads = mergeThreads.size();
135: for (int i = 0; i < numThreads; i++)
136: if (((MergeThread) mergeThreads.get(i)).isAlive())
137: count++;
138: return count;
139: }
140:
141: public void merge(IndexWriter writer) throws CorruptIndexException,
142: IOException {
143:
144: this .writer = writer;
145:
146: initMergeThreadPriority();
147:
148: dir = writer.getDirectory();
149:
150: // First, quickly run through the newly proposed merges
151: // and add any orthogonal merges (ie a merge not
152: // involving segments already pending to be merged) to
153: // the queue. If we are way behind on merging, many of
154: // these newly proposed merges will likely already be
155: // registered.
156:
157: message("now merge");
158: message(" index: " + writer.segString());
159:
160: // Iterate, pulling from the IndexWriter's queue of
161: // pending merges, until its empty:
162: while (true) {
163:
164: // TODO: we could be careful about which merges to do in
165: // the BG (eg maybe the "biggest" ones) vs FG, which
166: // merges to do first (the easiest ones?), etc.
167:
168: MergePolicy.OneMerge merge = writer.getNextMerge();
169: if (merge == null) {
170: message(" no more merges pending; now return");
171: return;
172: }
173:
174: // We do this w/ the primary thread to keep
175: // deterministic assignment of segment names
176: writer.mergeInit(merge);
177:
178: message(" consider merge " + merge.segString(dir));
179:
180: if (merge.isExternal) {
181: message(" merge involves segments from an external directory; now run in foreground");
182: } else {
183: synchronized (this ) {
184: if (mergeThreadCount() < maxThreadCount) {
185: // OK to spawn a new merge thread to handle this
186: // merge:
187: MergeThread merger = new MergeThread(writer,
188: merge);
189: mergeThreads.add(merger);
190: message(" launch new thread ["
191: + merger.getName() + "]");
192: merger.setThreadPriority(mergeThreadPriority);
193: merger.setDaemon(true);
194: merger.start();
195: continue;
196: } else
197: message(" too many merge threads running; run merge in foreground");
198: }
199: }
200:
201: // Too many merge threads already running, so we do
202: // this in the foreground of the calling thread
203: writer.merge(merge);
204: }
205: }
206:
207: private class MergeThread extends Thread {
208:
209: IndexWriter writer;
210: MergePolicy.OneMerge startMerge;
211: MergePolicy.OneMerge runningMerge;
212:
213: public MergeThread(IndexWriter writer,
214: MergePolicy.OneMerge startMerge) throws IOException {
215: this .writer = writer;
216: this .startMerge = startMerge;
217: }
218:
219: public synchronized void setRunningMerge(
220: MergePolicy.OneMerge merge) {
221: runningMerge = merge;
222: }
223:
224: public synchronized MergePolicy.OneMerge getRunningMerge() {
225: return runningMerge;
226: }
227:
228: public void setThreadPriority(int pri) {
229: try {
230: setPriority(pri);
231: } catch (NullPointerException npe) {
232: // Strangely, Sun's JDK 1.5 on Linux sometimes
233: // throws NPE out of here...
234: } catch (SecurityException se) {
235: // Ignore this because we will still run fine with
236: // normal thread priority
237: }
238: }
239:
240: public void run() {
241:
242: // First time through the while loop we do the merge
243: // that we were started with:
244: MergePolicy.OneMerge merge = this .startMerge;
245:
246: try {
247:
248: message(" merge thread: start");
249:
250: while (true) {
251: setRunningMerge(merge);
252: writer.merge(merge);
253:
254: // Subsequent times through the loop we do any new
255: // merge that writer says is necessary:
256: merge = writer.getNextMerge();
257: if (merge != null) {
258: writer.mergeInit(merge);
259: message(" merge thread: do another merge "
260: + merge.segString(dir));
261: } else
262: break;
263: }
264:
265: message(" merge thread: done");
266:
267: } catch (Throwable exc) {
268:
269: if (merge != null) {
270: merge.setException(exc);
271: writer.addMergeException(merge);
272: }
273:
274: // Ignore the exception if it was due to abort:
275: if (!(exc instanceof MergePolicy.MergeAbortedException)) {
276: synchronized (ConcurrentMergeScheduler.this ) {
277: exceptions.add(exc);
278: }
279:
280: if (!suppressExceptions) {
281: // suppressExceptions is normally only set during
282: // testing.
283: anyExceptions = true;
284: throw new MergePolicy.MergeException(exc);
285: }
286: }
287: } finally {
288: synchronized (ConcurrentMergeScheduler.this ) {
289: mergeThreads.remove(this );
290: ConcurrentMergeScheduler.this .notifyAll();
291: }
292: }
293: }
294:
295: public String toString() {
296: MergePolicy.OneMerge merge = getRunningMerge();
297: if (merge == null)
298: merge = startMerge;
299: return "merge thread: " + merge.segString(dir);
300: }
301: }
302:
303: static boolean anyExceptions = false;
304:
305: /** Used for testing */
306: public static boolean anyUnhandledExceptions() {
307: synchronized (allInstances) {
308: final int count = allInstances.size();
309: // Make sure all outstanding threads are done so we see
310: // any exceptions they may produce:
311: for (int i = 0; i < count; i++)
312: ((ConcurrentMergeScheduler) allInstances.get(i)).sync();
313: return anyExceptions;
314: }
315: }
316:
317: /** Used for testing */
318: private void addMyself() {
319: synchronized (allInstances) {
320: final int size = 0;
321: int upto = 0;
322: for (int i = 0; i < size; i++) {
323: final ConcurrentMergeScheduler other = (ConcurrentMergeScheduler) allInstances
324: .get(i);
325: if (!(other.closed && 0 == other.mergeThreadCount()))
326: // Keep this one for now: it still has threads or
327: // may spawn new threads
328: allInstances.set(upto++, other);
329: }
330: allInstances.subList(upto, allInstances.size()).clear();
331: allInstances.add(this );
332: }
333: }
334:
335: private boolean suppressExceptions;
336:
337: /** Used for testing */
338: void setSuppressExceptions() {
339: suppressExceptions = true;
340: }
341:
342: /** Used for testing */
343: void clearSuppressExceptions() {
344: suppressExceptions = false;
345: }
346:
347: /** Used for testing */
348: private static List allInstances;
349:
350: public static void setTestMode() {
351: allInstances = new ArrayList();
352: }
353: }
|