001: package ri.cache;
002:
003: import javax.cache.spi.CacheLoader;
004: import java.util.Collection;
005: import java.util.Map;
006: import java.util.HashMap;
007: import java.util.concurrent.*;
008:
009: public class AsyncLoader<K, V> {
010:
011: public interface AsyncLoaderCallback<K, V> {
012: public void loadComplete(K key, Future<V> future);
013: }
014:
015: private final ConcurrentMap<K, Future<V>> inProgress = new ConcurrentHashMap<K, Future<V>>();
016:
017: private final CacheLoader<K, V> loader;
018: private final ExecutorService executor;
019: private final ScheduledExecutorService cancelService;
020: private final boolean sharedExecutor;
021: private final long timeoutMillis;
022:
023: /**
024: * Create an AsyncLoader which uses a single-threaded executor for loading
025: * @param loader Underlying loader to perform actual loading
026: * @param timeoutMillis How long, in milliseconds, before we consider the load to have failed and cancel the loading (and notify the callback listener with an exception)
027: */
028: public AsyncLoader(CacheLoader<K, V> loader, long timeoutMillis) {
029: this (loader, timeoutMillis,
030: Executors.newSingleThreadExecutor(), false);
031: }
032:
033: /**
034: * @param loader Underlying loader to perform actual loading
035: * @param timeoutMillis How long, in milliseconds, before we consider the load to have failed and cancel the loading (and notify the callback listener with an exception)
036: * @param executor ExecutorService to use for actual loading
037: * @param sharedExecutor Whether this ExecutorService is to be considered owned by the AsyncLoader (in which case
038: * the AsyncLoader should shut it down) or whether it is shared (in which case its owner should shut it down)
039: */
040: public AsyncLoader(CacheLoader<K, V> loader, long timeoutMillis,
041: ExecutorService executor, boolean sharedExecutor) {
042: this .loader = loader;
043: this .executor = executor;
044: this .timeoutMillis = timeoutMillis;
045: this .sharedExecutor = sharedExecutor;
046: cancelService = Executors
047: .newSingleThreadScheduledExecutor(new ThreadFactory() {
048: public Thread newThread(Runnable r) {
049: Thread t = new Thread(r);
050: t.setDaemon(true);
051: return t;
052: }
053: });
054: }
055:
056: public Future<V> load(final K key) {
057: return load(key, null);
058: }
059:
060: public Future<V> load(final K key,
061: final AsyncLoaderCallback<K, V> callback) {
062: Callable<V> callable = new Callable<V>() {
063: public V call() throws Exception {
064: return loader.load(key);
065: }
066: };
067: final FutureTask<V> task = new FutureTask<V>(callable) {
068: protected void done() {
069: try {
070: inProgress.remove(key, this );
071: if (callback != null)
072: callback.loadComplete(key, this );
073: } finally {
074: super .done();
075: }
076: }
077: };
078: Future<V> prev = inProgress.putIfAbsent(key, task);
079: if (prev == null) {
080: executor.submit(task);
081: cancelService.schedule(new Runnable() {
082: public void run() {
083: if (!task.isDone())
084: task.cancel(true);
085: }
086: }, timeoutMillis, TimeUnit.MILLISECONDS);
087: return task;
088: } else
089: return prev;
090: }
091:
092: public Map<K, Future<V>> loadAll(final Collection<? extends K> keys) {
093: return loadAll(keys, null);
094: }
095:
096: public Map<K, Future<V>> loadAll(
097: final Collection<? extends K> keys,
098: final AsyncLoaderCallback<K, V> callback) {
099: Map<K, Future<V>> result = new HashMap<K, Future<V>>();
100: for (K key : keys)
101: result.put(key, load(key, callback));
102: return result;
103: }
104:
105: public void shutdown() {
106: if (!sharedExecutor)
107: executor.shutdownNow();
108: }
109: }
|