001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one
003: * or more contributor license agreements. See the NOTICE file
004: * distributed with this work for additional information
005: * regarding copyright ownership. The ASF licenses this file
006: * to you under the Apache License, Version 2.0 (the
007: * "License"); you may not use this file except in compliance
008: * with the License. You may obtain a copy of the License at
009: *
010: * http://www.apache.org/licenses/LICENSE-2.0
011: *
012: * Unless required by applicable law or agreed to in writing,
013: * software distributed under the License is distributed on an
014: * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015: * KIND, either express or implied. See the License for the
016: * specific language governing permissions and limitations
017: * under the License.
018: *
019: */
020: package org.apache.mina.filter.executor;
021:
022: import java.util.ArrayList;
023: import java.util.HashSet;
024: import java.util.List;
025: import java.util.Set;
026: import java.util.concurrent.Executors;
027: import java.util.concurrent.LinkedBlockingQueue;
028: import java.util.concurrent.RejectedExecutionHandler;
029: import java.util.concurrent.ThreadFactory;
030: import java.util.concurrent.ThreadPoolExecutor;
031: import java.util.concurrent.TimeUnit;
032: import java.util.concurrent.atomic.AtomicInteger;
033:
034: import org.apache.mina.common.IoEvent;
035:
036: /**
037: * A {@link ThreadPoolExecutor} that does not maintain the order of {@link IoEvent}s.
038: * This means more than one event handler methods can be invoked at the same
039: * time with mixed order. For example, let's assume that messageReceived, messageSent,
040: * and sessionClosed events are fired.
041: * <ul>
042: * <li>All event handler methods can be called simultaneously.
043: * (e.g. messageReceived and messageSent can be invoked at the same time.)</li>
044: * <li>The event order can be mixed up.
045: * (e.g. sessionClosed or messageSent can be invoked before messageReceived
046: * is invoked.)</li>
047: * </ul>
048: * If you need to maintain the order of events per session, please use
049: * {@link OrderedThreadPoolExecutor}.
050: *
051: * @author The Apache MINA Project (dev@mina.apache.org)
052: * @version $Rev: 595549 $, $Date: 2007-11-15 21:45:36 -0700 (Thu, 15 Nov 2007) $
053: */
054: public class UnorderedThreadPoolExecutor extends ThreadPoolExecutor {
055:
056: private static final Runnable EXIT_SIGNAL = new Runnable() {
057: public void run() {
058: }
059: };
060: private static final IoEventQueueHandler NOOP_QUEUE_HANDLER = new IoEventQueueHandler() {
061: public boolean accept(ThreadPoolExecutor executor, IoEvent event) {
062: return true;
063: }
064:
065: public void offered(ThreadPoolExecutor executor, IoEvent event) {
066: }
067:
068: public void polled(ThreadPoolExecutor executor, IoEvent event) {
069: }
070: };
071:
072: private final Set<Worker> workers = new HashSet<Worker>();
073:
074: private volatile int corePoolSize;
075: private volatile int maximumPoolSize;
076: private volatile int largestPoolSize;
077: private final AtomicInteger idleWorkers = new AtomicInteger();
078:
079: private long completedTaskCount;
080: private volatile boolean shutdown;
081:
082: private final IoEventQueueHandler queueHandler;
083:
084: public UnorderedThreadPoolExecutor() {
085: this (16);
086: }
087:
088: public UnorderedThreadPoolExecutor(int maximumPoolSize) {
089: this (0, maximumPoolSize);
090: }
091:
092: public UnorderedThreadPoolExecutor(int corePoolSize,
093: int maximumPoolSize) {
094: this (corePoolSize, maximumPoolSize, 30, TimeUnit.SECONDS);
095: }
096:
097: public UnorderedThreadPoolExecutor(int corePoolSize,
098: int maximumPoolSize, long keepAliveTime, TimeUnit unit) {
099: this (corePoolSize, maximumPoolSize, keepAliveTime, unit,
100: Executors.defaultThreadFactory());
101: }
102:
103: public UnorderedThreadPoolExecutor(int corePoolSize,
104: int maximumPoolSize, long keepAliveTime, TimeUnit unit,
105: IoEventQueueHandler queueHandler) {
106: this (corePoolSize, maximumPoolSize, keepAliveTime, unit,
107: Executors.defaultThreadFactory(), queueHandler);
108: }
109:
110: public UnorderedThreadPoolExecutor(int corePoolSize,
111: int maximumPoolSize, long keepAliveTime, TimeUnit unit,
112: ThreadFactory threadFactory) {
113: this (corePoolSize, maximumPoolSize, keepAliveTime, unit,
114: threadFactory, null);
115: }
116:
117: public UnorderedThreadPoolExecutor(int corePoolSize,
118: int maximumPoolSize, long keepAliveTime, TimeUnit unit,
119: ThreadFactory threadFactory,
120: IoEventQueueHandler queueHandler) {
121: super (0, 1, keepAliveTime, unit,
122: new LinkedBlockingQueue<Runnable>(), threadFactory,
123: new AbortPolicy());
124: if (corePoolSize < 0) {
125: throw new IllegalArgumentException("corePoolSize: "
126: + corePoolSize);
127: }
128:
129: if (maximumPoolSize == 0 || maximumPoolSize < corePoolSize) {
130: throw new IllegalArgumentException("maximumPoolSize: "
131: + maximumPoolSize);
132: }
133:
134: if (queueHandler == null) {
135: queueHandler = NOOP_QUEUE_HANDLER;
136: }
137:
138: this .corePoolSize = corePoolSize;
139: this .maximumPoolSize = maximumPoolSize;
140: this .queueHandler = queueHandler;
141: }
142:
143: public IoEventQueueHandler getQueueHandler() {
144: return queueHandler;
145: }
146:
147: @Override
148: public void setRejectedExecutionHandler(
149: RejectedExecutionHandler handler) {
150: // Ignore the request. It must always be AbortPolicy.
151: }
152:
153: private void addWorker() {
154: synchronized (workers) {
155: if (workers.size() >= maximumPoolSize) {
156: return;
157: }
158:
159: Worker worker = new Worker();
160: Thread thread = getThreadFactory().newThread(worker);
161: idleWorkers.incrementAndGet();
162: thread.start();
163: workers.add(worker);
164:
165: if (workers.size() > largestPoolSize) {
166: largestPoolSize = workers.size();
167: }
168: }
169: }
170:
171: private void addWorkerIfNecessary() {
172: if (idleWorkers.get() == 0) {
173: synchronized (workers) {
174: if (workers.isEmpty() || idleWorkers.get() == 0) {
175: addWorker();
176: }
177: }
178: }
179: }
180:
181: private void removeWorker() {
182: synchronized (workers) {
183: if (workers.size() <= corePoolSize) {
184: return;
185: }
186: getQueue().offer(EXIT_SIGNAL);
187: }
188: }
189:
190: @Override
191: public int getMaximumPoolSize() {
192: return maximumPoolSize;
193: }
194:
195: @Override
196: public void setMaximumPoolSize(int maximumPoolSize) {
197: if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize) {
198: throw new IllegalArgumentException("maximumPoolSize: "
199: + maximumPoolSize);
200: }
201:
202: synchronized (workers) {
203: this .maximumPoolSize = maximumPoolSize;
204: int difference = workers.size() - maximumPoolSize;
205: while (difference > 0) {
206: removeWorker();
207: --difference;
208: }
209: }
210: }
211:
212: @Override
213: public boolean awaitTermination(long timeout, TimeUnit unit)
214: throws InterruptedException {
215:
216: long deadline = System.currentTimeMillis()
217: + unit.toMillis(timeout);
218:
219: synchronized (workers) {
220: while (!isTerminated()) {
221: long waitTime = deadline - System.currentTimeMillis();
222: if (waitTime <= 0) {
223: break;
224: }
225:
226: workers.wait(waitTime);
227: }
228: }
229: return isTerminated();
230: }
231:
232: @Override
233: public boolean isShutdown() {
234: return shutdown;
235: }
236:
237: @Override
238: public boolean isTerminated() {
239: if (!shutdown) {
240: return false;
241: }
242:
243: synchronized (workers) {
244: return workers.isEmpty();
245: }
246: }
247:
248: @Override
249: public void shutdown() {
250: if (shutdown) {
251: return;
252: }
253:
254: shutdown = true;
255:
256: synchronized (workers) {
257: for (int i = workers.size(); i > 0; i--) {
258: getQueue().offer(EXIT_SIGNAL);
259: }
260: }
261: }
262:
263: @Override
264: public List<Runnable> shutdownNow() {
265: shutdown();
266:
267: List<Runnable> answer = new ArrayList<Runnable>();
268: Runnable task;
269: while ((task = getQueue().poll()) != null) {
270: if (task == EXIT_SIGNAL) {
271: getQueue().offer(EXIT_SIGNAL);
272: Thread.yield(); // Let others take the signal.
273: continue;
274: }
275:
276: getQueueHandler().polled(this , (IoEvent) task);
277: answer.add(task);
278: }
279:
280: return answer;
281: }
282:
283: @Override
284: public void execute(Runnable task) {
285: if (shutdown) {
286: rejectTask(task);
287: }
288:
289: checkTaskType(task);
290:
291: IoEvent e = (IoEvent) task;
292: boolean offeredEvent = queueHandler.accept(this , e);
293: if (offeredEvent) {
294: getQueue().offer(e);
295: }
296:
297: addWorkerIfNecessary();
298:
299: if (offeredEvent) {
300: queueHandler.offered(this , e);
301: }
302: }
303:
304: private void rejectTask(Runnable task) {
305: getRejectedExecutionHandler().rejectedExecution(task, this );
306: }
307:
308: private void checkTaskType(Runnable task) {
309: if (!(task instanceof IoEvent)) {
310: throw new IllegalArgumentException(
311: "task must be an IoEvent or its subclass.");
312: }
313: }
314:
315: @Override
316: public int getActiveCount() {
317: synchronized (workers) {
318: return workers.size() - idleWorkers.get();
319: }
320: }
321:
322: @Override
323: public long getCompletedTaskCount() {
324: synchronized (workers) {
325: long answer = completedTaskCount;
326: for (Worker w : workers) {
327: answer += w.completedTaskCount;
328: }
329:
330: return answer;
331: }
332: }
333:
334: @Override
335: public int getLargestPoolSize() {
336: return largestPoolSize;
337: }
338:
339: @Override
340: public int getPoolSize() {
341: synchronized (workers) {
342: return workers.size();
343: }
344: }
345:
346: @Override
347: public long getTaskCount() {
348: return getCompletedTaskCount();
349: }
350:
351: @Override
352: public boolean isTerminating() {
353: synchronized (workers) {
354: return isShutdown() && !isTerminated();
355: }
356: }
357:
358: @Override
359: public int prestartAllCoreThreads() {
360: int answer = 0;
361: synchronized (workers) {
362: for (int i = corePoolSize - workers.size(); i > 0; i--) {
363: addWorker();
364: answer++;
365: }
366: }
367: return answer;
368: }
369:
370: @Override
371: public boolean prestartCoreThread() {
372: synchronized (workers) {
373: if (workers.size() < corePoolSize) {
374: addWorker();
375: return true;
376: } else {
377: return false;
378: }
379: }
380: }
381:
382: @Override
383: public void purge() {
384: }
385:
386: @Override
387: public boolean remove(Runnable task) {
388: boolean removed = super .remove(task);
389: if (removed) {
390: getQueueHandler().polled(this , (IoEvent) task);
391: }
392: return removed;
393: }
394:
395: @Override
396: public int getCorePoolSize() {
397: return corePoolSize;
398: }
399:
400: @Override
401: public void setCorePoolSize(int corePoolSize) {
402: if (corePoolSize < 0) {
403: throw new IllegalArgumentException("corePoolSize: "
404: + corePoolSize);
405: }
406: if (corePoolSize > maximumPoolSize) {
407: throw new IllegalArgumentException(
408: "corePoolSize exceeds maximumPoolSize");
409: }
410:
411: synchronized (workers) {
412: if (this .corePoolSize > corePoolSize) {
413: for (int i = this .corePoolSize - corePoolSize; i > 0; i--) {
414: removeWorker();
415: }
416: }
417: this .corePoolSize = corePoolSize;
418: }
419: }
420:
421: private class Worker implements Runnable {
422:
423: private volatile long completedTaskCount;
424: private Thread thread;
425:
426: public void run() {
427: thread = Thread.currentThread();
428:
429: try {
430: for (;;) {
431: Runnable task = fetchTask();
432:
433: idleWorkers.decrementAndGet();
434:
435: if (task == null) {
436: synchronized (workers) {
437: if (workers.size() > corePoolSize) {
438: // Remove now to prevent duplicate exit.
439: workers.remove(this );
440: break;
441: }
442: }
443: }
444:
445: if (task == EXIT_SIGNAL) {
446: break;
447: }
448:
449: queueHandler.polled(
450: UnorderedThreadPoolExecutor.this ,
451: (IoEvent) task);
452: try {
453: runTask(task);
454: } finally {
455: idleWorkers.incrementAndGet();
456: }
457: }
458: } finally {
459: synchronized (workers) {
460: workers.remove(this );
461: UnorderedThreadPoolExecutor.this .completedTaskCount += completedTaskCount;
462: workers.notifyAll();
463: }
464: }
465: }
466:
467: private Runnable fetchTask() {
468: Runnable task = null;
469: long currentTime = System.currentTimeMillis();
470: long deadline = currentTime
471: + getKeepAliveTime(TimeUnit.MILLISECONDS);
472: for (;;) {
473: try {
474: long waitTime = deadline - currentTime;
475: if (waitTime <= 0) {
476: break;
477: }
478:
479: try {
480: task = getQueue().poll(waitTime,
481: TimeUnit.MILLISECONDS);
482: break;
483: } finally {
484: if (task == null) {
485: currentTime = System.currentTimeMillis();
486: }
487: }
488: } catch (InterruptedException e) {
489: // Ignore.
490: continue;
491: }
492: }
493: return task;
494: }
495:
496: private void runTask(Runnable task) {
497: beforeExecute(thread, task);
498: boolean ran = false;
499: try {
500: task.run();
501: ran = true;
502: afterExecute(task, null);
503: completedTaskCount++;
504: } catch (RuntimeException e) {
505: if (!ran)
506: afterExecute(task, e);
507: throw e;
508: }
509: }
510: }
511: }
|