001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017:
018: package org.apache.catalina.tribes.transport;
019:
020: import java.util.Iterator;
021: import java.util.LinkedList;
022: import java.util.List;
023: import java.util.concurrent.ThreadFactory;
024:
025: /**
026: * @author not attributable
027: * @version 1.0
028: */
029:
030: public class RxTaskPool {
031: /**
032: * A very simple thread pool class. The pool size is set at
033: * construction time and remains fixed. Threads are cycled
034: * through a FIFO idle queue.
035: */
036:
037: List idle = new LinkedList();
038: List used = new LinkedList();
039:
040: Object mutex = new Object();
041: boolean running = true;
042:
043: private static int counter = 1;
044: private int maxTasks;
045: private int minTasks;
046:
047: private TaskCreator creator = null;
048:
049: private static synchronized int inc() {
050: return counter++;
051: }
052:
053: public RxTaskPool(int maxTasks, int minTasks, TaskCreator creator)
054: throws Exception {
055: // fill up the pool with worker threads
056: this .maxTasks = maxTasks;
057: this .minTasks = minTasks;
058: this .creator = creator;
059: }
060:
061: protected void configureTask(AbstractRxTask task) {
062: synchronized (task) {
063: task.setTaskPool(this );
064: // task.setName(task.getClass().getName() + "[" + inc() + "]");
065: // task.setDaemon(true);
066: // task.setPriority(Thread.MAX_PRIORITY);
067: // task.start();
068: }
069: }
070:
071: /**
072: * Find an idle worker thread, if any. Could return null.
073: */
074: public AbstractRxTask getRxTask() {
075: AbstractRxTask worker = null;
076: synchronized (mutex) {
077: while (worker == null && running) {
078: if (idle.size() > 0) {
079: try {
080: worker = (AbstractRxTask) idle.remove(0);
081: } catch (java.util.NoSuchElementException x) {
082: //this means that there are no available workers
083: worker = null;
084: }
085: } else if (used.size() < this .maxTasks
086: && creator != null) {
087: worker = creator.createRxTask();
088: configureTask(worker);
089: } else {
090: try {
091: mutex.wait();
092: } catch (java.lang.InterruptedException x) {
093: Thread.currentThread().interrupted();
094: }
095: }
096: }//while
097: if (worker != null)
098: used.add(worker);
099: }
100: return (worker);
101: }
102:
103: public int available() {
104: return idle.size();
105: }
106:
107: /**
108: * Called by the worker thread to return itself to the
109: * idle pool.
110: */
111: public void returnWorker(AbstractRxTask worker) {
112: if (running) {
113: synchronized (mutex) {
114: used.remove(worker);
115: //if ( idle.size() < minThreads && !idle.contains(worker)) idle.add(worker);
116: if (idle.size() < maxTasks && !idle.contains(worker))
117: idle.add(worker); //let max be the upper limit
118: else {
119: worker.setDoRun(false);
120: synchronized (worker) {
121: worker.notify();
122: }
123: }
124: mutex.notify();
125: }
126: } else {
127: worker.setDoRun(false);
128: synchronized (worker) {
129: worker.notify();
130: }
131: }
132: }
133:
134: public int getMaxThreads() {
135: return maxTasks;
136: }
137:
138: public int getMinThreads() {
139: return minTasks;
140: }
141:
142: public void stop() {
143: running = false;
144: synchronized (mutex) {
145: Iterator i = idle.iterator();
146: while (i.hasNext()) {
147: AbstractRxTask worker = (AbstractRxTask) i.next();
148: returnWorker(worker);
149: i.remove();
150: }
151: }
152: }
153:
154: public void setMaxTasks(int maxThreads) {
155: this .maxTasks = maxThreads;
156: }
157:
158: public void setMinTasks(int minThreads) {
159: this .minTasks = minThreads;
160: }
161:
162: public TaskCreator getTaskCreator() {
163: return this .creator;
164: }
165:
166: public static interface TaskCreator {
167: public AbstractRxTask createRxTask();
168: }
169: }
|