001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one
003: * or more contributor license agreements. See the NOTICE file
004: * distributed with this work for additional information
005: * regarding copyright ownership. The ASF licenses this file
006: * to you under the Apache License, Version 2.0 (the
007: * "License"); you may not use this file except in compliance
008: * with the License. You may obtain a copy of the License at
009: *
010: * http://www.apache.org/licenses/LICENSE-2.0
011: *
012: * Unless required by applicable law or agreed to in writing,
013: * software distributed under the License is distributed on an
014: * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015: * KIND, either express or implied. See the License for the
016: * specific language governing permissions and limitations
017: * under the License.
018: *
019: */
020: package org.apache.mina.filter.executor;
021:
022: import java.util.ArrayList;
023: import java.util.HashSet;
024: import java.util.List;
025: import java.util.Queue;
026: import java.util.Set;
027: import java.util.concurrent.BlockingQueue;
028: import java.util.concurrent.Executors;
029: import java.util.concurrent.LinkedBlockingQueue;
030: import java.util.concurrent.RejectedExecutionHandler;
031: import java.util.concurrent.SynchronousQueue;
032: import java.util.concurrent.ThreadFactory;
033: import java.util.concurrent.ThreadPoolExecutor;
034: import java.util.concurrent.TimeUnit;
035: import java.util.concurrent.atomic.AtomicInteger;
036:
037: import org.apache.mina.common.AttributeKey;
038: import org.apache.mina.common.DummySession;
039: import org.apache.mina.common.IoEvent;
040: import org.apache.mina.common.IoSession;
041: import org.apache.mina.util.CircularQueue;
042:
043: /**
044: * A {@link ThreadPoolExecutor} that maintains the order of {@link IoEvent}s.
045: * <p>
046: * If you don't need to maintain the order of events per session, please use
047: * {@link UnorderedThreadPoolExecutor}.
048:
049: * @author The Apache MINA Project (dev@mina.apache.org)
050: * @version $Rev: 595549 $, $Date: 2007-11-15 21:45:36 -0700 (Thu, 15 Nov 2007) $
051: */
052: public class OrderedThreadPoolExecutor extends ThreadPoolExecutor {
053:
054: private static final IoSession EXIT_SIGNAL = new DummySession();
055: private static final IoEventQueueHandler NOOP_QUEUE_HANDLER = new IoEventQueueHandler() {
056: public boolean accept(ThreadPoolExecutor executor, IoEvent event) {
057: return true;
058: }
059:
060: public void offered(ThreadPoolExecutor executor, IoEvent event) {
061: }
062:
063: public void polled(ThreadPoolExecutor executor, IoEvent event) {
064: }
065: };
066:
067: private final AttributeKey BUFFER = new AttributeKey(getClass(),
068: "buffer");
069: private final BlockingQueue<IoSession> waitingSessions = new LinkedBlockingQueue<IoSession>();
070:
071: private final Set<Worker> workers = new HashSet<Worker>();
072:
073: private volatile int corePoolSize;
074: private volatile int maximumPoolSize;
075: private volatile int largestPoolSize;
076: private final AtomicInteger idleWorkers = new AtomicInteger();
077:
078: private long completedTaskCount;
079: private volatile boolean shutdown;
080:
081: private final IoEventQueueHandler queueHandler;
082:
083: public OrderedThreadPoolExecutor() {
084: this (16);
085: }
086:
087: public OrderedThreadPoolExecutor(int maximumPoolSize) {
088: this (0, maximumPoolSize);
089: }
090:
091: public OrderedThreadPoolExecutor(int corePoolSize,
092: int maximumPoolSize) {
093: this (corePoolSize, maximumPoolSize, 30, TimeUnit.SECONDS);
094: }
095:
096: public OrderedThreadPoolExecutor(int corePoolSize,
097: int maximumPoolSize, long keepAliveTime, TimeUnit unit) {
098: this (corePoolSize, maximumPoolSize, keepAliveTime, unit,
099: Executors.defaultThreadFactory());
100: }
101:
102: public OrderedThreadPoolExecutor(int corePoolSize,
103: int maximumPoolSize, long keepAliveTime, TimeUnit unit,
104: IoEventQueueHandler queueHandler) {
105: this (corePoolSize, maximumPoolSize, keepAliveTime, unit,
106: Executors.defaultThreadFactory(), queueHandler);
107: }
108:
109: public OrderedThreadPoolExecutor(int corePoolSize,
110: int maximumPoolSize, long keepAliveTime, TimeUnit unit,
111: ThreadFactory threadFactory) {
112: this (corePoolSize, maximumPoolSize, keepAliveTime, unit,
113: threadFactory, null);
114: }
115:
116: public OrderedThreadPoolExecutor(int corePoolSize,
117: int maximumPoolSize, long keepAliveTime, TimeUnit unit,
118: ThreadFactory threadFactory,
119: IoEventQueueHandler queueHandler) {
120: super (0, 1, keepAliveTime, unit,
121: new SynchronousQueue<Runnable>(), threadFactory,
122: new AbortPolicy());
123: if (corePoolSize < 0) {
124: throw new IllegalArgumentException("corePoolSize: "
125: + corePoolSize);
126: }
127:
128: if (maximumPoolSize == 0 || maximumPoolSize < corePoolSize) {
129: throw new IllegalArgumentException("maximumPoolSize: "
130: + maximumPoolSize);
131: }
132:
133: if (queueHandler == null) {
134: queueHandler = NOOP_QUEUE_HANDLER;
135: }
136:
137: this .corePoolSize = corePoolSize;
138: this .maximumPoolSize = maximumPoolSize;
139: this .queueHandler = queueHandler;
140: }
141:
142: public IoEventQueueHandler getQueueHandler() {
143: return queueHandler;
144: }
145:
146: @Override
147: public void setRejectedExecutionHandler(
148: RejectedExecutionHandler handler) {
149: // Ignore the request. It must always be AbortPolicy.
150: }
151:
152: private void addWorker() {
153: synchronized (workers) {
154: if (workers.size() >= maximumPoolSize) {
155: return;
156: }
157:
158: Worker worker = new Worker();
159: Thread thread = getThreadFactory().newThread(worker);
160: idleWorkers.incrementAndGet();
161: thread.start();
162: workers.add(worker);
163:
164: if (workers.size() > largestPoolSize) {
165: largestPoolSize = workers.size();
166: }
167: }
168: }
169:
170: private void addWorkerIfNecessary() {
171: if (idleWorkers.get() == 0) {
172: synchronized (workers) {
173: if (workers.isEmpty() || idleWorkers.get() == 0) {
174: addWorker();
175: }
176: }
177: }
178: }
179:
180: private void removeWorker() {
181: synchronized (workers) {
182: if (workers.size() <= corePoolSize) {
183: return;
184: }
185: waitingSessions.offer(EXIT_SIGNAL);
186: }
187: }
188:
189: @Override
190: public int getMaximumPoolSize() {
191: return maximumPoolSize;
192: }
193:
194: @Override
195: public void setMaximumPoolSize(int maximumPoolSize) {
196: if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize) {
197: throw new IllegalArgumentException("maximumPoolSize: "
198: + maximumPoolSize);
199: }
200:
201: synchronized (workers) {
202: this .maximumPoolSize = maximumPoolSize;
203: int difference = workers.size() - maximumPoolSize;
204: while (difference > 0) {
205: removeWorker();
206: --difference;
207: }
208: }
209: }
210:
211: @Override
212: public boolean awaitTermination(long timeout, TimeUnit unit)
213: throws InterruptedException {
214:
215: long deadline = System.currentTimeMillis()
216: + unit.toMillis(timeout);
217:
218: synchronized (workers) {
219: while (!isTerminated()) {
220: long waitTime = deadline - System.currentTimeMillis();
221: if (waitTime <= 0) {
222: break;
223: }
224:
225: workers.wait(waitTime);
226: }
227: }
228: return isTerminated();
229: }
230:
231: @Override
232: public boolean isShutdown() {
233: return shutdown;
234: }
235:
236: @Override
237: public boolean isTerminated() {
238: if (!shutdown) {
239: return false;
240: }
241:
242: synchronized (workers) {
243: return workers.isEmpty();
244: }
245: }
246:
247: @Override
248: public void shutdown() {
249: if (shutdown) {
250: return;
251: }
252:
253: shutdown = true;
254:
255: synchronized (workers) {
256: for (int i = workers.size(); i > 0; i--) {
257: waitingSessions.offer(EXIT_SIGNAL);
258: }
259: }
260: }
261:
262: @Override
263: public List<Runnable> shutdownNow() {
264: shutdown();
265:
266: List<Runnable> answer = new ArrayList<Runnable>();
267: IoSession session;
268: while ((session = waitingSessions.poll()) != null) {
269: if (session == EXIT_SIGNAL) {
270: waitingSessions.offer(EXIT_SIGNAL);
271: Thread.yield(); // Let others take the signal.
272: continue;
273: }
274:
275: SessionBuffer buf = (SessionBuffer) session
276: .getAttribute(BUFFER);
277: synchronized (buf.queue) {
278: for (Runnable task : buf.queue) {
279: getQueueHandler().polled(this , (IoEvent) task);
280: answer.add(task);
281: }
282: buf.queue.clear();
283: }
284: }
285:
286: return answer;
287: }
288:
289: @Override
290: public void execute(Runnable task) {
291: if (shutdown) {
292: rejectTask(task);
293: }
294:
295: checkTaskType(task);
296:
297: IoEvent e = (IoEvent) task;
298: IoSession s = e.getSession();
299: SessionBuffer buf = getSessionBuffer(s);
300: Queue<Runnable> queue = buf.queue;
301: boolean offerSession;
302: boolean offerEvent = queueHandler.accept(this , e);
303: if (offerEvent) {
304: synchronized (queue) {
305: queue.offer(e);
306: if (buf.processingCompleted) {
307: buf.processingCompleted = false;
308: offerSession = true;
309: } else {
310: offerSession = false;
311: }
312: }
313: } else {
314: offerSession = false;
315: }
316:
317: if (offerSession) {
318: waitingSessions.offer(s);
319: }
320:
321: addWorkerIfNecessary();
322:
323: if (offerEvent) {
324: queueHandler.offered(this , e);
325: }
326: }
327:
328: private void rejectTask(Runnable task) {
329: getRejectedExecutionHandler().rejectedExecution(task, this );
330: }
331:
332: private void checkTaskType(Runnable task) {
333: if (!(task instanceof IoEvent)) {
334: throw new IllegalArgumentException(
335: "task must be an IoEvent or its subclass.");
336: }
337: }
338:
339: @Override
340: public int getActiveCount() {
341: synchronized (workers) {
342: return workers.size() - idleWorkers.get();
343: }
344: }
345:
346: @Override
347: public long getCompletedTaskCount() {
348: synchronized (workers) {
349: long answer = completedTaskCount;
350: for (Worker w : workers) {
351: answer += w.completedTaskCount;
352: }
353:
354: return answer;
355: }
356: }
357:
358: @Override
359: public int getLargestPoolSize() {
360: return largestPoolSize;
361: }
362:
363: @Override
364: public int getPoolSize() {
365: synchronized (workers) {
366: return workers.size();
367: }
368: }
369:
370: @Override
371: public long getTaskCount() {
372: return getCompletedTaskCount();
373: }
374:
375: @Override
376: public boolean isTerminating() {
377: synchronized (workers) {
378: return isShutdown() && !isTerminated();
379: }
380: }
381:
382: @Override
383: public int prestartAllCoreThreads() {
384: int answer = 0;
385: synchronized (workers) {
386: for (int i = corePoolSize - workers.size(); i > 0; i--) {
387: addWorker();
388: answer++;
389: }
390: }
391: return answer;
392: }
393:
394: @Override
395: public boolean prestartCoreThread() {
396: synchronized (workers) {
397: if (workers.size() < corePoolSize) {
398: addWorker();
399: return true;
400: } else {
401: return false;
402: }
403: }
404: }
405:
406: @Override
407: public BlockingQueue<Runnable> getQueue() {
408: throw new UnsupportedOperationException();
409: }
410:
411: @Override
412: public void purge() {
413: }
414:
415: @Override
416: public boolean remove(Runnable task) {
417: checkTaskType(task);
418: IoEvent e = (IoEvent) task;
419: IoSession s = e.getSession();
420: SessionBuffer buffer = (SessionBuffer) s.getAttribute(BUFFER);
421: if (buffer == null) {
422: return false;
423: }
424:
425: boolean removed;
426: synchronized (buffer.queue) {
427: removed = buffer.queue.remove(task);
428: }
429:
430: if (removed) {
431: getQueueHandler().polled(this , e);
432: }
433:
434: return removed;
435: }
436:
437: @Override
438: public int getCorePoolSize() {
439: return corePoolSize;
440: }
441:
442: @Override
443: public void setCorePoolSize(int corePoolSize) {
444: if (corePoolSize < 0) {
445: throw new IllegalArgumentException("corePoolSize: "
446: + corePoolSize);
447: }
448: if (corePoolSize > maximumPoolSize) {
449: throw new IllegalArgumentException(
450: "corePoolSize exceeds maximumPoolSize");
451: }
452:
453: synchronized (workers) {
454: if (this .corePoolSize > corePoolSize) {
455: for (int i = this .corePoolSize - corePoolSize; i > 0; i--) {
456: removeWorker();
457: }
458: }
459: this .corePoolSize = corePoolSize;
460: }
461: }
462:
463: private SessionBuffer getSessionBuffer(IoSession session) {
464: SessionBuffer buffer = (SessionBuffer) session
465: .getAttribute(BUFFER);
466: if (buffer == null) {
467: buffer = new SessionBuffer();
468: SessionBuffer oldBuffer = (SessionBuffer) session
469: .setAttributeIfAbsent(BUFFER, buffer);
470: if (oldBuffer != null) {
471: buffer = oldBuffer;
472: }
473: }
474: return buffer;
475: }
476:
477: private static class SessionBuffer {
478: private final Queue<Runnable> queue = new CircularQueue<Runnable>();
479: private boolean processingCompleted = true;
480: }
481:
482: private class Worker implements Runnable {
483:
484: private volatile long completedTaskCount;
485: private Thread thread;
486:
487: public void run() {
488: thread = Thread.currentThread();
489:
490: try {
491: for (;;) {
492: IoSession session = fetchSession();
493:
494: idleWorkers.decrementAndGet();
495:
496: if (session == null) {
497: synchronized (workers) {
498: if (workers.size() > corePoolSize) {
499: // Remove now to prevent duplicate exit.
500: workers.remove(this );
501: break;
502: }
503: }
504: }
505:
506: if (session == EXIT_SIGNAL) {
507: break;
508: }
509:
510: try {
511: runTasks(getSessionBuffer(session));
512: } finally {
513: idleWorkers.incrementAndGet();
514: }
515: }
516: } finally {
517: synchronized (workers) {
518: workers.remove(this );
519: OrderedThreadPoolExecutor.this .completedTaskCount += completedTaskCount;
520: workers.notifyAll();
521: }
522: }
523: }
524:
525: private IoSession fetchSession() {
526: IoSession session = null;
527: long currentTime = System.currentTimeMillis();
528: long deadline = currentTime
529: + getKeepAliveTime(TimeUnit.MILLISECONDS);
530: for (;;) {
531: try {
532: long waitTime = deadline - currentTime;
533: if (waitTime <= 0) {
534: break;
535: }
536:
537: try {
538: session = waitingSessions.poll(waitTime,
539: TimeUnit.MILLISECONDS);
540: break;
541: } finally {
542: if (session == null) {
543: currentTime = System.currentTimeMillis();
544: }
545: }
546: } catch (InterruptedException e) {
547: // Ignore.
548: continue;
549: }
550: }
551: return session;
552: }
553:
554: private void runTasks(SessionBuffer buf) {
555: for (;;) {
556: Runnable task;
557: synchronized (buf.queue) {
558: task = buf.queue.poll();
559:
560: if (task == null) {
561: buf.processingCompleted = true;
562: break;
563: }
564: }
565:
566: queueHandler.polled(OrderedThreadPoolExecutor.this ,
567: (IoEvent) task);
568:
569: runTask(task);
570: }
571: }
572:
573: private void runTask(Runnable task) {
574: beforeExecute(thread, task);
575: boolean ran = false;
576: try {
577: task.run();
578: ran = true;
579: afterExecute(task, null);
580: completedTaskCount++;
581: } catch (RuntimeException e) {
582: if (!ran)
583: afterExecute(task, e);
584: throw e;
585: }
586: }
587: }
588: }
|