001: package org.jacorb.poa;
002:
003: /*
004: * JacORB - a free Java ORB
005: *
006: * Copyright (C) 1997-2004 Gerald Brose.
007: *
008: * This library is free software; you can redistribute it and/or
009: * modify it under the terms of the GNU Library General Public
010: * License as published by the Free Software Foundation; either
011: * version 2 of the License, or (at your option) any later version.
012: *
013: * This library is distributed in the hope that it will be useful,
014: * but WITHOUT ANY WARRANTY; without even the implied warranty of
015: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
016: * Library General Public License for more details.
017: *
018: * You should have received a copy of the GNU Library General Public
019: * License along with this library; if not, write to the Free
020: * Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
021: */
022:
023: import org.apache.avalon.framework.configuration.*;
024: import org.apache.avalon.framework.logger.Logger;
025:
026: import org.jacorb.poa.except.POAInternalError;
027: import java.util.*;
028:
029: /**
030: * This class provides and manages a pool of ready started threads for
031: * request processing.
032: *
033: * @author Gerald Brose
034: * @author Reimo Tiedemann
035: * @version $Id: RPPoolManager.java,v 1.22 2006/07/07 12:09:50 alphonse.bendt Exp $
036: * @see org.jacorb.poa.RequestProcessor
037: */
038:
039: public abstract class RPPoolManager {
040: private RPPoolManagerListener pmListener;
041:
042: // the current for (un)registering the invocation contexts
043: private final Current current;
044: /**
045: * <code>pool</code> is the set of currently available (inactive) request processors
046: */
047: private Vector pool;
048: /**
049: * <code>activeProcessors</code> is the set of currently active processors
050: */
051: private Vector activeProcessors;
052: /**
053: * <code>unused_size</code> represents the current number of unused request processors
054: * in the pool.
055: */
056: private int unused_size;
057: /**
058: * <code>max_pool_size</code> is the maximum size of the pool.
059: */
060: private final int max_pool_size;
061: /**
062: * <code>min_pool_size</code> is the minimum number of request processors.
063: */
064: private final int min_pool_size;
065: // a flag for delay the pool initialization
066: private boolean inUse = false;
067:
068: private final Configuration configuration;
069: private final Logger logger;
070:
071: protected RPPoolManager(Current _current, int min, int max,
072: Logger _logger, Configuration _configuration) {
073: current = _current;
074: max_pool_size = max;
075: min_pool_size = min;
076: logger = _logger;
077: configuration = _configuration;
078: }
079:
080: private void addProcessor() {
081: RequestProcessor rp = new RequestProcessor(this );
082: try {
083: rp.configure(this .configuration);
084: } catch (ConfigurationException ex) {
085: throw new RuntimeException(ex.toString());
086: }
087: current._addContext(rp, rp);
088: rp.setDaemon(true);
089: pool.addElement(rp);
090: unused_size++;
091: rp.start();
092: }
093:
094: protected synchronized void addRPPoolManagerListener(
095: RPPoolManagerListener listener) {
096: pmListener = EventMulticaster.add(pmListener, listener);
097: }
098:
099: /**
100: * invoked by clients to indicate that they won't use this poolManager anymore.
101: */
102: abstract void destroy();
103:
104: /**
105: * shutdown this poolManager. clients should invoke {@link #destroy()} instead.
106: */
107: protected synchronized void destroy(boolean really) {
108: if (pool == null || inUse == false) {
109: return;
110: }
111:
112: // wait until all active processors complete
113: while (!activeProcessors.isEmpty()) {
114: try {
115: wait();
116: } catch (InterruptedException ex) {
117: // ignore
118: }
119: }
120:
121: RequestProcessor[] rps = new RequestProcessor[pool.size()];
122: pool.copyInto(rps);
123: for (int i = 0; i < rps.length; i++) {
124: if (rps[i].isActive()) {
125: throw new POAInternalError(
126: "error: request processor is active (RequestProcessorPM.destroy)");
127: }
128:
129: pool.removeElement(rps[i]);
130: unused_size--;
131: current._removeContext(rps[i]);
132: rps[i].end();
133: }
134: inUse = false;
135: }
136:
137: /**
138: * returns the number of unused processors contained in the pool
139: */
140:
141: protected int getPoolCount() {
142: return (pool == null) ? 0 : pool.size();
143: }
144:
145: /**
146: * returns the size of the processor pool (used and unused processors)
147: */
148:
149: protected synchronized int getPoolSize() {
150: return unused_size;
151: }
152:
153: /**
154: * returns a processor from pool, the first call causes
155: * the initialization of the processor pool,
156: * if no processor available the number of processors
157: * will increased until the max_pool_size is reached,
158: * this method blocks if no processor available and the
159: * max_pool_size is reached until a processor will released
160: */
161:
162: protected synchronized RequestProcessor getProcessor() {
163: if (!inUse) {
164: init();
165: inUse = true;
166: }
167:
168: if (pool.isEmpty() && unused_size < max_pool_size) {
169: addProcessor();
170: }
171:
172: while (pool.isEmpty()) {
173: warnPoolIsEmpty();
174:
175: try {
176: wait();
177: } catch (InterruptedException e) {
178: }
179: }
180: RequestProcessor requestProcessor = (RequestProcessor) pool
181: .remove(pool.size() - 1);
182: activeProcessors.add(requestProcessor);
183:
184: // notify a pool manager listener
185: if (pmListener != null) {
186: pmListener.processorRemovedFromPool(requestProcessor, pool
187: .size(), unused_size);
188: }
189:
190: return requestProcessor;
191: }
192:
193: protected void warnPoolIsEmpty() {
194: if (logger.isWarnEnabled()) {
195: logger.warn("Thread pool exhausted, consider increasing "
196: + "jacorb.poa.thread_pool_max (currently: "
197: + max_pool_size + ")");
198: }
199: }
200:
201: private void init() {
202: pool = new Vector(max_pool_size);
203: activeProcessors = new Vector(max_pool_size);
204: for (int i = 0; i < min_pool_size; i++) {
205: addProcessor();
206: }
207: }
208:
209: /**
210: * gives a processor back into the pool if the number of
211: * available processors is smaller than min_pool_size,
212: * otherwise the processor will terminate
213: */
214:
215: protected synchronized void releaseProcessor(RequestProcessor rp) {
216: activeProcessors.remove(rp);
217:
218: if (pool.size() < min_pool_size) {
219: pool.addElement(rp);
220: } else {
221: unused_size--;
222: current._removeContext(rp);
223: rp.end();
224: }
225: // notify a pool manager listener
226: if (pmListener != null)
227: pmListener.processorAddedToPool(rp, pool.size(),
228: unused_size);
229:
230: // notify whoever is waiting for the release of active processors
231: notifyAll();
232: }
233:
234: protected synchronized void removeRPPoolManagerListener(
235: RPPoolManagerListener listener) {
236: pmListener = EventMulticaster.remove(pmListener, listener);
237: }
238: }
|