001: /*
002: * JBoss, Home of Professional Open Source.
003: * Copyright 2006, Red Hat Middleware LLC, and individual contributors
004: * as indicated by the @author tags. See the copyright.txt file in the
005: * distribution for a full listing of individual contributors.
006: *
007: * This is free software; you can redistribute it and/or modify it
008: * under the terms of the GNU Lesser General Public License as
009: * published by the Free Software Foundation; either version 2.1 of
010: * the License, or (at your option) any later version.
011: *
012: * This software is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015: * Lesser General Public License for more details.
016: *
017: * You should have received a copy of the GNU Lesser General Public
018: * License along with this software; if not, write to the Free
019: * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020: * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
021: */
022: package org.jboss.aspects.asynchronous.concurrent;
023:
024: import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
025: import EDU.oswego.cs.dl.util.concurrent.ThreadFactoryUser;
026: import org.jboss.aspects.asynchronous.AsynchronousConstants;
027: import org.jboss.aspects.asynchronous.AsynchronousParameters;
028: import org.jboss.aspects.asynchronous.AsynchronousTask;
029: import org.jboss.aspects.asynchronous.AsynchronousUserTask;
030: import org.jboss.aspects.asynchronous.ThreadManager;
031: import org.jboss.aspects.asynchronous.ThreadManagerRequest;
032: import org.jboss.aspects.asynchronous.ThreadManagerResponse;
033: import org.jboss.aspects.asynchronous.common.AsynchronousEmptyTask;
034: import org.jboss.aspects.asynchronous.common.ThreadManagerResponseImpl;
035:
036: /**
037: * @author <a href="mailto:chussenet@yahoo.com">{Claude Hussenet Independent Consultant}</a>.
038: * @version <tt>$Revision: 57186 $</tt>
039: */
040:
041: public class ThreadManagerImpl
042:
043: extends ThreadFactoryUser
044:
045: implements AsynchronousConstants, ThreadManager {
046:
047: protected PooledExecutor _pooledExecutor = null;
048:
049: protected boolean waitWhenPoolSizeIsFull = true;
050:
051: protected boolean isPooling = true;
052:
053: /**
054: * Create a new pool with all default settings
055: */
056:
057: public ThreadManagerImpl() {
058:
059: _pooledExecutor = new PooledExecutor();
060:
061: setWaitWhenPoolSizeIsFull(false);
062:
063: }
064:
065: /**
066: * Create a new pool with all default settings except
067: * <p/>
068: * for maximum pool size.
069: */
070:
071: public ThreadManagerImpl(int maximumPoolSize) {
072:
073: _pooledExecutor = new PooledExecutor(maximumPoolSize);
074:
075: setWaitWhenPoolSizeIsFull(false);
076:
077: }
078:
079: /**
080: * Set the minimum number of threads to use.
081: *
082: * @throws IllegalArgumentException if less than zero. (It is not
083: * <p/>
084: * considered an error to set the minimum to be greater than the
085: * <p/>
086: * maximum. However, in this case there are no guarantees about
087: * <p/>
088: * behavior.)
089: */
090:
091: public void setMaximumPoolSize(int maximumPoolSize) {
092:
093: _pooledExecutor.setMaximumPoolSize(maximumPoolSize);
094:
095: }
096:
097: /**
098: * Set the policy for blocked execution to be to wait until a thread
099: * <p/>
100: * is available.
101: * <p/>
102: * <p/>
103: * <p/>
104: * OR
105: * <p/>
106: * <p/>
107: * <p/>
108: * Set the policy for blocked execution to be to
109: * <p/>
110: * throw a RuntimeException.
111: */
112:
113: public void setWaitWhenPoolSizeIsFull(boolean value) {
114:
115: if (value)
116:
117: _pooledExecutor.waitWhenBlocked();
118:
119: else
120:
121: _pooledExecutor.abortWhenBlocked();
122:
123: waitWhenPoolSizeIsFull = value;
124:
125: }
126:
127: /**
128: * return the policy when the pool is full
129: */
130:
131: public boolean getWaitWhenPoolSizeIsFull() {
132:
133: return waitWhenPoolSizeIsFull;
134:
135: }
136:
137: /**
138: * Return the maximum number of threads to simultaneously execute
139: */
140:
141: public int getMaximumPoolSize() {
142:
143: return _pooledExecutor.getMaximumPoolSize();
144:
145: }
146:
147: /**
148: * Set the minimum number of threads to use.
149: *
150: * @throws IllegalArgumentException if less than zero. (It is not
151: * <p/>
152: * considered an error to set the minimum to be greater than the
153: * <p/>
154: * maximum. However, in this case there are no guarantees about
155: * <p/>
156: * behavior.)
157: */
158:
159: public void setMinimumPoolSize(int minimumPoolSize) {
160:
161: _pooledExecutor.setMinimumPoolSize(minimumPoolSize);
162:
163: }
164:
165: /**
166: * Return the minimum number of threads to simultaneously execute.
167: * <p/>
168: * (Default value is 1). If fewer than the mininum number are
169: * <p/>
170: * running upon reception of a new request, a new thread is started
171: * <p/>
172: * to handle this request.
173: */
174:
175: public int getMinimumPoolSize() {
176:
177: return _pooledExecutor.getMinimumPoolSize();
178:
179: }
180:
181: /**
182: * Set the number of milliseconds to keep threads alive waiting for
183: * <p/>
184: * new commands. A negative value means to wait forever. A zero
185: * <p/>
186: * value means not to wait at all.
187: */
188:
189: public void setKeepAliveTime(long time) {
190:
191: _pooledExecutor.setKeepAliveTime(time);
192:
193: }
194:
195: /**
196: * Return the number of milliseconds to keep threads alive waiting
197: * <p/>
198: * for new commands. A negative value means to wait forever. A zero
199: * <p/>
200: * value means not to wait at all.
201: */
202:
203: public long getKeepAliveTime() {
204:
205: return _pooledExecutor.getKeepAliveTime();
206:
207: }
208:
209: /**
210: * Return the current number of active threads in the pool. This
211: * <p/>
212: * number is just a snaphot, and may change immediately upon
213: * <p/>
214: * returning
215: */
216:
217: public long getPoolSize() {
218:
219: return _pooledExecutor.getPoolSize();
220:
221: }
222:
223: /**
224: * Return the response from an asynchronous task
225: * <p/>
226: * The call returns within the timeout defined
227: * <p/>
228: * in the process method
229: */
230:
231: public ThreadManagerResponse waitForResponse(AsynchronousTask input) {
232:
233: AsynchronousTask[] tTask = { input };
234:
235: return waitForResponses(tTask)[0];
236:
237: }
238:
239: /**
240: * Return an array of responses from an array of asynchronous task
241: * <p/>
242: * The call returns within the maximum timeout from the array of tasks
243: */
244:
245: public ThreadManagerResponse[] waitForResponses(
246: AsynchronousTask[] inputImpl) {
247:
248: if (inputImpl == null) {
249:
250: System.err
251: .println("PPMImpl:waitForResponses NULL PARAMETER");
252:
253: return null;
254:
255: }
256:
257: ThreadManagerResponse[] response =
258:
259: new ThreadManagerResponseImpl[inputImpl.length];
260:
261: for (int i = 0; i < inputImpl.length; i++) {
262:
263: AsynchronousTask fr = inputImpl[i];
264:
265: response[i] = fr.getResponse();
266:
267: }
268:
269: return response;
270:
271: }
272:
273: public AsynchronousTask process(ThreadManagerRequest ppmRequest) {
274:
275: return process(ppmRequest.getId(),
276:
277: ppmRequest.getTaskClassImpl(),
278:
279: ppmRequest.getInputParameters(),
280:
281: ppmRequest.getTimeout());
282:
283: }
284:
285: /**
286: * Create, start and return a new asynchronous task from :
287: * <p/>
288: * <p/>
289: * <p/>
290: * <p><b>taskImpl</b> class instance defining the task to process
291: * <p/>
292: * <p><b>inputParametersImpl</b> class instance defining the input parameters
293: * <p/>
294: * <p><b>timeout</b> defined the given time limit to process the task
295: */
296:
297: private AsynchronousTask process(String id,
298:
299: AsynchronousUserTask taskImpl,
300:
301: AsynchronousParameters inputParametersImpl,
302:
303: long timeout) {
304:
305: try {
306:
307: if (this .getPoolSize() + 1 > this .getMaximumPoolSize())
308:
309: System.err
310: .println("process: The maximum pool size defined at "
311:
312: + this .getMaximumPoolSize()
313:
314: + " is reached before processing task["
315:
316: + id
317:
318: + "] !");
319:
320: org.jboss.aspects.asynchronous.concurrent.AsynchronousTask ft =
321:
322: new AsynchronousTaskImpl(id,
323:
324: taskImpl,
325:
326: inputParametersImpl,
327:
328: timeout);
329:
330: Runnable cmd = ft.add();
331:
332: if (isPooling())
333:
334: _pooledExecutor.execute(cmd);
335:
336: else {
337:
338: Thread thread = getThreadFactory().newThread(cmd);
339:
340: thread.start();
341:
342: }
343:
344: Thread.yield();
345:
346: Thread.sleep(0);
347:
348: Thread.yield();
349:
350: return ft;
351:
352: } catch (Exception e) {
353:
354: return new AsynchronousEmptyTask(id,
355:
356: AsynchronousConstants.CAN_NOT_PROCESS,
357:
358: e,
359:
360: e.getMessage(),
361:
362: System.currentTimeMillis());
363:
364: }
365:
366: }
367:
368: public boolean isPooling() {
369:
370: return isPooling;
371:
372: }
373:
374: public void setPooling(boolean isPooling) {
375:
376: this.isPooling = isPooling;
377:
378: }
379:
380: }
|