001: /* Copyright 2001 The JA-SIG Collaborative. All rights reserved.
002: * See license distributed with this file and
003: * available online at http://www.uportal.org/license.html
004: */
005:
006: package org.jasig.portal;
007:
008: import org.jasig.portal.channels.error.CError;
009: import org.jasig.portal.properties.PropertiesManager;
010: import org.jasig.portal.utils.threading.PriorityThreadFactory;
011: import org.apache.commons.logging.Log;
012: import org.apache.commons.logging.LogFactory;
013:
014: import edu.emory.mathcs.backport.java.util.concurrent.BlockingQueue;
015: import edu.emory.mathcs.backport.java.util.concurrent.LinkedBlockingQueue;
016: import edu.emory.mathcs.backport.java.util.concurrent.ThreadFactory;
017: import edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor;
018: import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
019: import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicLong;
020:
021: /**
022: * <p>The <code>ChannelRendererFactoryImpl</code> creates
023: * <code>IChannelRenderer</code> objects which use a bounded thread pool.</p>
024: *
025: * @author <a href="mailto:jnielsen@sct.com">Jan Nielsen</a>
026: *
027: * @version $Revision: 42516 $
028: **/
029: public final class ChannelRendererFactoryImpl implements
030: IChannelRendererFactory {
031: /** <p> Class version identifier.</p> */
032: public final static String RCS_ID = "@(#) $Header$";
033:
034: private static final Log log = LogFactory
035: .getLog(ChannelRendererFactoryImpl.class);
036:
037: /** <p>Thread pool per factory.</p> */
038: private ThreadPoolExecutor mThreadPool = null;
039:
040: private static ThreadPoolExecutor cErrorThreadPool = null;
041:
042: /** <p>Shared thread pool for all factories.</p> */
043: private static ThreadPoolExecutor cSharedThreadPool = null;
044:
045: private class ChannelRenderThreadPoolExecutor extends
046: ThreadPoolExecutor {
047: final AtomicLong activeThreads;
048: final AtomicLong maxActiveThreads;
049:
050: public ChannelRenderThreadPoolExecutor(
051: final AtomicLong activeThreads,
052: final AtomicLong maxActiveThreads, int corePoolSize,
053: int maximumPoolSize, long keepAliveTime, TimeUnit unit,
054: BlockingQueue workQueue, ThreadFactory threadFactory) {
055: super (corePoolSize, maximumPoolSize, keepAliveTime, unit,
056: workQueue, threadFactory);
057:
058: this .activeThreads = activeThreads;
059: this .maxActiveThreads = maxActiveThreads;
060: }
061:
062: protected void beforeExecute(java.lang.Thread t,
063: java.lang.Runnable r) {
064: super .beforeExecute(t, r);
065: final long current = activeThreads.incrementAndGet();
066: if (current > maxActiveThreads.get()) {
067: maxActiveThreads.set(current);
068: }
069: }
070:
071: protected void afterExecute(java.lang.Runnable r,
072: java.lang.Throwable t) {
073: super .afterExecute(r, t);
074: activeThreads.decrementAndGet();
075: }
076: }
077:
078: /**
079: * <p>Creates a new instance of a bounded thread pool channel
080: * renderer factory object. The constructor should not be invoked
081: * directly; it should only be constructed by the
082: * <code>ChannelRendererFactory</code> object.</p>
083: *
084: * <p>This factory implooks for the properties:
085: *
086: * <pre><code>
087: * keyBase + ".threadPool_initialThreads"
088: * keyBase + ".threadPool_maxThreads"
089: * keyBase + ".threadPool_threadPriority"
090: * keyBase + ".threadPool_shared"
091: * </code></pre>
092: *
093: * in the configuration system and then reflectively constructs the
094: * factory class with the default (no-argument) constructor.</p>
095: *
096: * @param keyBase configuration base key
097: *
098: * or <code>null</code>
099: */
100: public ChannelRendererFactoryImpl(final String keyBase,
101: final AtomicLong activeThreads,
102: final AtomicLong maxActiveThreads) {
103: int initialThreads = 1;
104: int maxThreads = 20;
105: int threadPriority = 5;
106: boolean sharedPool = false;
107:
108: try {
109: initialThreads = PropertiesManager.getPropertyAsInt(keyBase
110: + ".threadPool_initialThreads");
111:
112: maxThreads = PropertiesManager.getPropertyAsInt(keyBase
113: + ".threadPool_maxThreads");
114:
115: threadPriority = PropertiesManager.getPropertyAsInt(keyBase
116: + ".threadPool_threadPriority");
117:
118: sharedPool = PropertiesManager.getPropertyAsBoolean(keyBase
119: + ".threadPool_shared");
120: } catch (Exception x) {
121: log
122: .error(
123: "ChannelRendererFactoryImpl("
124: + keyBase
125: + ") failed to find configuration parameters. Constructing with: "
126: + "threadPool_initialThreads = "
127: + initialThreads + " "
128: + "threadPool_maxThreads = "
129: + maxThreads + " "
130: + "threadPool_threadPriority = "
131: + threadPriority + " "
132: + "threadPool_shared = "
133: + sharedPool, x);
134: }
135:
136: cErrorThreadPool = new ThreadPoolExecutor(20, 20, 0L,
137: TimeUnit.MILLISECONDS, new LinkedBlockingQueue(),
138: new PriorityThreadFactory(threadPriority,
139: "ErrorRendering", PortalSessionManager
140: .getThreadGroup()));
141:
142: if (sharedPool) {
143: cSharedThreadPool = new ChannelRenderThreadPoolExecutor(
144: activeThreads, maxActiveThreads, initialThreads,
145: maxThreads, 0L, TimeUnit.MILLISECONDS,
146: new LinkedBlockingQueue(),
147: new PriorityThreadFactory(threadPriority, keyBase,
148: PortalSessionManager.getThreadGroup()));
149: } else {
150: this .mThreadPool = new ChannelRenderThreadPoolExecutor(
151: activeThreads, maxActiveThreads, initialThreads,
152: maxThreads, 0L, TimeUnit.MILLISECONDS,
153: new LinkedBlockingQueue(),
154: new PriorityThreadFactory(threadPriority, keyBase,
155: PortalSessionManager.getThreadGroup()));
156: }
157: }
158:
159: /**
160: * <p>Creates a new instance of a channel renderer object.</p>
161: *
162: * @param channel channel to render
163: *
164: * @param channelRuntimeData runtime data for the channel to render
165: *
166: * @return new instance of a channel renderer for the specified channel
167: **/
168: public IChannelRenderer newInstance(IChannel channel,
169: ChannelRuntimeData channelRuntimeData,
170: PortalControlStructures pcs) {
171:
172: ThreadPoolExecutor threadPoolExecutor = null;
173: // Use special thread pool for CError channel rendering
174: if (channel instanceof CError) {
175: threadPoolExecutor = cErrorThreadPool;
176: } else if (cSharedThreadPool != null) {
177: int activeCount = cSharedThreadPool.getActiveCount();
178: int queueSize = cSharedThreadPool.getQueue().size();
179:
180: if (queueSize > 50 || activeCount > 40) {
181: log.warn("queueSize: " + queueSize + " activeCount: "
182: + activeCount + " " + "largestPoolSize: "
183: + cSharedThreadPool.getLargestPoolSize());
184: }
185:
186: log.debug("stp-activeCount: "
187: + cSharedThreadPool.getActiveCount() + " "
188: + "stp-completedTaskCount: "
189: + cSharedThreadPool.getCompletedTaskCount() + " "
190: + "stp-corePoolSize: "
191: + cSharedThreadPool.getCorePoolSize() + " "
192: + "stp-queue-size: "
193: + cSharedThreadPool.getQueue().size() + " " + "");
194:
195: threadPoolExecutor = cSharedThreadPool;
196: } else {
197: threadPoolExecutor = this .mThreadPool;
198: }
199:
200: return new ChannelRenderer(channel, channelRuntimeData, pcs,
201: threadPoolExecutor);
202: }
203: }
|