001: /**************************************************************************************
002: * Copyright (c) Jonas BonŽr, Alexandre Vasseur. All rights reserved. *
003: * http://aspectwerkz.codehaus.org *
004: * ---------------------------------------------------------------------------------- *
005: * The software in this package is published under the terms of the LGPL license *
006: * a copy of which has been included with this distribution in the license.txt file. *
007: **************************************************************************************/package examples.util.concurrent;
008:
009: import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
010: import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
011: import EDU.oswego.cs.dl.util.concurrent.BoundedBuffer;
012: import org.codehaus.aspectwerkz.exception.WrappedRuntimeException;
013: import examples.util.definition.Definition;
014:
015: /**
016: * Manages the thread pool for all the asynchronous invocations.
017: *
018: * @author <a href="mailto:jboner@codehaus.org">Jonas BonŽr </a>
019: */
020: public class AsynchronousManager {
021:
022: protected static final AsynchronousManager INSTANCE = new AsynchronousManager();
023:
024: protected PooledExecutor m_threadPool = null;
025:
026: protected boolean m_initialized = false;
027:
028: /**
029: * Executes a task in a thread from the thread pool.
030: *
031: * @param task the task to execute (Runnable)
032: */
033: public void execute(final Runnable task) {
034: if (notInitialized()) {
035: throw new IllegalStateException(
036: "asynchronous thread pool not initialized");
037: }
038: try {
039: m_threadPool.execute(task);
040: } catch (InterruptedException e) {
041: Thread.currentThread().interrupt();
042: notifyAll();
043: throw new WrappedRuntimeException(e);
044: } catch (Exception e) {
045: throw new WrappedRuntimeException(e);
046: }
047: }
048:
049: /**
050: * Returns the one A only AsynchronousManager instance.
051: *
052: * @return the asynchronous manager
053: */
054: public static AsynchronousManager getInstance() {
055: return INSTANCE;
056: }
057:
058: /**
059: * Initializes the thread pool.
060: *
061: * @param def the definition
062: */
063: public synchronized void initialize(final Definition definition) {
064: if (definition == null) {
065: return;
066: }
067: if (m_initialized) {
068: return;
069: }
070: examples.util.definition.ThreadPoolDefinition def = (examples.util.definition.ThreadPoolDefinition) definition;
071: int threadPoolMaxSize = def.getMaxSize();
072: int threadPoolInitSize = def.getInitSize();
073: int threadPoolMinSize = def.getMinSize();
074: int keepAliveTime = def.getKeepAliveTime();
075: boolean waitWhenBlocked = def.getWaitWhenBlocked();
076: boolean bounded = def.getBounded();
077: if (threadPoolMaxSize < threadPoolInitSize
078: || threadPoolMaxSize < threadPoolMinSize) {
079: throw new IllegalArgumentException(
080: "max size of thread pool can not exceed the init size");
081: }
082:
083: // if threadPoolMaxSize is -1 or less => no maximum limit
084: // if keepAliveTime is -1 or less => threads are alive forever, i.e no timeout
085: if (bounded) {
086: createBoundedThreadPool(threadPoolMaxSize,
087: threadPoolMinSize, threadPoolInitSize,
088: keepAliveTime, waitWhenBlocked);
089: } else {
090: createDynamicThreadPool(threadPoolMinSize,
091: threadPoolInitSize, keepAliveTime);
092: }
093: m_initialized = true;
094: }
095:
096: /**
097: * Closes down the thread pool.
098: */
099: public void stop() {
100: m_threadPool.shutdownNow();
101: }
102:
103: /**
104: * Creates a bounded thread pool.
105: *
106: * @param threadPoolMaxSize
107: * @param threadPoolMinSize
108: * @param threadPoolInitSize
109: * @param keepAliveTime
110: * @param waitWhenBlocked
111: */
112: protected void createBoundedThreadPool(final int threadPoolMaxSize,
113: final int threadPoolMinSize, final int threadPoolInitSize,
114: final int keepAliveTime, final boolean waitWhenBlocked) {
115: m_threadPool = new PooledExecutor(new BoundedBuffer(
116: threadPoolInitSize), threadPoolMaxSize);
117: m_threadPool.setKeepAliveTime(keepAliveTime);
118: m_threadPool.createThreads(threadPoolInitSize);
119: m_threadPool.setMinimumPoolSize(threadPoolMinSize);
120: if (waitWhenBlocked) {
121: m_threadPool.waitWhenBlocked();
122: }
123: }
124:
125: /**
126: * Creates a dynamic thread pool.
127: *
128: * @param threadPoolMinSize
129: * @param threadPoolInitSize
130: * @param keepAliveTime
131: */
132: protected void createDynamicThreadPool(final int threadPoolMinSize,
133: final int threadPoolInitSize, final int keepAliveTime) {
134: m_threadPool = new PooledExecutor(new LinkedQueue());
135: m_threadPool.setKeepAliveTime(keepAliveTime);
136: m_threadPool.createThreads(threadPoolInitSize);
137: m_threadPool.setMinimumPoolSize(threadPoolMinSize);
138: }
139:
140: /**
141: * Checks if the service has been initialized.
142: *
143: * @return boolean
144: */
145: protected boolean notInitialized() {
146: return !m_initialized;
147: }
148:
149: /**
150: * Private constructor.
151: */
152: protected AsynchronousManager() {
153: }
154: }
|