001: /*
002: * Copyright 2004-2006 the original author or authors.
003: *
004: * Licensed under the Apache License, Version 2.0 (the "License");
005: * you may not use this file except in compliance with the License.
006: * You may obtain a copy of the License at
007: *
008: * http://www.apache.org/licenses/LICENSE-2.0
009: *
010: * Unless required by applicable law or agreed to in writing, software
011: * distributed under the License is distributed on an "AS IS" BASIS,
012: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013: * See the License for the specific language governing permissions and
014: * limitations under the License.
015: */
016:
017: package org.compass.gps.device.support.parallel;
018:
019: import java.util.ArrayList;
020: import java.util.Iterator;
021: import java.util.List;
022: import java.util.concurrent.Callable;
023: import java.util.concurrent.ExecutionException;
024: import java.util.concurrent.ExecutorService;
025: import java.util.concurrent.Executors;
026: import java.util.concurrent.Future;
027:
028: import org.apache.commons.logging.Log;
029: import org.apache.commons.logging.LogFactory;
030: import org.compass.core.CompassCallbackWithoutResult;
031: import org.compass.core.CompassException;
032: import org.compass.core.CompassSession;
033: import org.compass.core.spi.InternalCompassSession;
034: import org.compass.core.util.concurrent.SingleThreadThreadFactory;
035: import org.compass.gps.CompassGpsException;
036: import org.compass.gps.spi.CompassGpsInterfaceDevice;
037:
038: /**
039: * <p>Executes the indexing process using sevearl threads based on the partitioned
040: * list of index entities.
041: *
042: * <p>By default (with <code>maxThreads</code> set to <code>-1</code>) creates N threads
043: * during the indexing process where N is the number of partitioned index entities
044: * groups (one therad per group). If <code>maxThreads<code> is set to a positive integer
045: * number, the index executor will use it as the number of threads to create, regardless
046: * of the number of partitioned entities groups.
047: *
048: * @author kimchy
049: */
050: public class ConcurrentParallelIndexExecutor implements
051: ParallelIndexExecutor {
052:
053: private static final Log log = LogFactory
054: .getLog(ConcurrentParallelIndexExecutor.class);
055:
056: private int maxThreads = -1;
057:
058: /**
059: * Constructs a new concurrent index executor with <code>maxThreads</code>
060: * defaults to -1.
061: */
062: public ConcurrentParallelIndexExecutor() {
063:
064: }
065:
066: /**
067: * Constructs a new concurrent index executor with the given max threads.
068: *
069: * @param maxThreads The number of threads to use or -1 for dynamic threads
070: */
071: public ConcurrentParallelIndexExecutor(int maxThreads) {
072: if (maxThreads < -1 || maxThreads == 0) {
073: throw new IllegalArgumentException(
074: "maxThreads must either be -1 or a value greater than 0");
075: }
076: this .maxThreads = maxThreads;
077: }
078:
079: /**
080: * Performs the indexing process using the provided index entities indexer. Creates a pool of N
081: * threads (if <code>maxThreads</code> is set to -1, N is the numer of entities groups, otherwise
082: * N is the number of <code>maxThreads</code>).
083: *
084: * @param entities The partitioned index entities groups and index entities to index
085: * @param indexEntitiesIndexer The entities indexer to use
086: * @param compassGps Compass gps interface for meta information
087: */
088: public void performIndex(final IndexEntity[][] entities,
089: final IndexEntitiesIndexer indexEntitiesIndexer,
090: final CompassGpsInterfaceDevice compassGps) {
091:
092: if (entities.length <= 0) {
093: throw new IllegalArgumentException(
094: "No entities listed to be indexed, have you defined your entities correctly?");
095: }
096: int maxThreads = this .maxThreads;
097: if (maxThreads == -1) {
098: maxThreads = entities.length;
099: }
100: ExecutorService executorService = Executors.newFixedThreadPool(
101: maxThreads, new SingleThreadThreadFactory(
102: "Compass Gps Index", false));
103: try {
104: ArrayList tasks = new ArrayList();
105: for (int i = 0; i < entities.length; i++) {
106: final IndexEntity[] indexEntities = entities[i];
107: tasks.add(new Callable() {
108: public Object call() throws Exception {
109: compassGps
110: .executeForIndex(new CompassCallbackWithoutResult() {
111: protected void doInCompassWithoutResult(
112: CompassSession session)
113: throws CompassException {
114: indexEntitiesIndexer
115: .performIndex(session,
116: indexEntities);
117: ((InternalCompassSession) session)
118: .flush();
119: }
120: });
121: return null;
122: }
123: });
124: }
125: List futures;
126: try {
127: futures = executorService.invokeAll(tasks);
128: } catch (InterruptedException e) {
129: throw new CompassGpsException(
130: "Failed to index, interrupted", e);
131: }
132:
133: for (Iterator it = futures.iterator(); it.hasNext();) {
134: Future future = (Future) it.next();
135: try {
136: future.get();
137: } catch (InterruptedException e) {
138: throw new CompassGpsException(
139: "Failed to index, interrupted", e);
140: } catch (ExecutionException e) {
141: throw new CompassGpsException(
142: "Failed to index, execution exception", e);
143: }
144: }
145: } finally {
146: executorService.shutdownNow();
147: }
148: }
149:
150: }
|