001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017:
018: package org.apache.jetspeed.aggregator.impl;
019:
020: import java.security.AccessControlContext;
021: import java.security.AccessController;
022: import java.util.Iterator;
023: import java.util.ArrayList;
024: import java.util.List;
025: import java.util.Stack;
026: import java.util.LinkedList;
027: import java.util.Collections;
028:
029: import org.apache.commons.logging.Log;
030: import org.apache.commons.logging.LogFactory;
031: import org.apache.jetspeed.aggregator.RenderingJob;
032: import org.apache.jetspeed.aggregator.Worker;
033: import org.apache.jetspeed.aggregator.WorkerMonitor;
034: import org.apache.jetspeed.aggregator.PortletContent;
035: import org.apache.jetspeed.util.Queue;
036: import org.apache.jetspeed.util.FIFOQueue;
037:
038: import org.apache.pluto.om.window.PortletWindow;
039: import org.apache.pluto.om.common.ObjectID;
040:
041: /**
042: * The WorkerMonitor is responsible for dispatching jobs to workers
043: * It uses an Apache HTTPd configuration style of min/max/spare workers
044: * threads to throttle the rendering work.
045: * If jobs come in faster that processing, they are stored in a queue
046: * which is flushed periodically by a QueueMonitor.
047: *
048: * @author <a href="mailto:raphael@apache.org">Rapha\u00ebl Luta</a>
049: * @author <a href="mailto:taylor@apache.org">David Sean Taylor </a>
050: * @version $Id: WorkerMonitorImpl.java 591867 2007-11-05 02:20:06Z woonsan $
051: */
052: public class WorkerMonitorImpl implements WorkerMonitor {
053: public static final String ACCESS_CONTROL_CONTEXT_WORKER_ATTR = AccessControlContext.class
054: .getName();
055:
056: public WorkerMonitorImpl(int minWorkers, int maxWorkers,
057: int spareWorkers, int maxJobsPerWorker) {
058: this .minWorkers = minWorkers;
059: this .maxWorkers = maxWorkers;
060: this .spareWorkers = spareWorkers;
061: this .maxJobsPerWorker = maxJobsPerWorker;
062: }
063:
064: /** Commons logging */
065: protected final static Log log = LogFactory
066: .getLog(WorkerMonitorImpl.class);
067:
068: /** Static counters for identifying workers */
069: protected static long sCount = 0;
070:
071: /** Count of running jobs **/
072: protected int runningJobs = 0;
073:
074: /** Minimum number of wokers to create */
075: protected int minWorkers = 5;
076:
077: /** Maximum number of workers */
078: protected int maxWorkers = 50;
079:
080: /** Minimum amount of spare workers */
081: protected int spareWorkers = 3;
082:
083: /** Maximum of job processed by a worker before being released */
084: protected int maxJobsPerWorker = 10;
085:
086: /** Stack containing currently idle workers */
087: protected Stack workers = new Stack();
088:
089: /** The thread group used to group all worker threads */
090: protected ThreadGroup tg = new ThreadGroup("Workers");
091:
092: /** Job queue */
093: protected Queue queue;
094:
095: /** Workers to be monitored for timeout checking */
096: protected List workersMonitored = Collections
097: .synchronizedList(new LinkedList());
098:
099: /** Renering Job Timeout monitor */
100: protected RenderingJobTimeoutMonitor jobMonitor = null;
101:
102: public void start() {
103: addWorkers(this .minWorkers);
104: this .queue = new FIFOQueue();
105:
106: jobMonitor = new RenderingJobTimeoutMonitor(1000);
107: jobMonitor.start();
108: }
109:
110: public void stop() {
111: if (jobMonitor != null)
112: jobMonitor.endThread();
113: jobMonitor = null;
114:
115: }
116:
117: /**
118: * Create the request number of workers and add them to
119: * list of available workers.
120: *
121: * @param wCount the number of workers to create
122: */
123: protected synchronized void addWorkers(int wCount) {
124: int wCurrent = this .tg.activeCount();
125:
126: if (wCurrent < maxWorkers) {
127: if (wCurrent + wCount > maxWorkers) {
128: wCount = maxWorkers - wCurrent;
129: }
130:
131: log.info("Creating " + wCount + " workers -> "
132: + (wCurrent + wCount));
133:
134: for (int i = 0; i < wCount; ++i) {
135: Worker worker = new WorkerImpl(this , this .tg, "WORKER_"
136: + (++sCount));
137: worker.start();
138: workers.push(worker);
139: }
140: }
141: }
142:
143: /**
144: * Retrieves an idle worker
145: *
146: * @return a Worker from the idle pool or null if non available
147: */
148: protected Worker getWorker() {
149: synchronized (this .workers) {
150: if (this .workers.size() < spareWorkers) {
151: addWorkers(spareWorkers);
152: }
153:
154: if (this .workers.size() == 0) {
155: return null;
156: }
157:
158: return (Worker) workers.pop();
159: }
160: }
161:
162: /**
163: * Assign a job to a worker and execute it or queue the job if no
164: * worker is available.
165: *
166: * @param job the Job to process
167: */
168: public void process(RenderingJob job) {
169: Worker worker = this .getWorker();
170:
171: AccessControlContext context = AccessController.getContext();
172: job.setWorkerAttribute(ACCESS_CONTROL_CONTEXT_WORKER_ATTR,
173: context);
174:
175: if (worker == null) {
176: queue.push(job);
177: } else {
178: try {
179: synchronized (worker) {
180: worker.setJob(job, context);
181:
182: if (job.getTimeout() > 0) {
183: workersMonitored.add(worker);
184: }
185:
186: worker.notify();
187: runningJobs++;
188: }
189: } catch (Throwable t) {
190: log.error("Worker exception", t);
191: }
192: }
193: }
194:
195: /**
196: * Wait for all rendering jobs in the collection to finish successfully or otherwise.
197: * @param renderingJobs the Collection of rendering job objects to wait for.
198: */
199: public void waitForRenderingJobs(List renderingJobs) {
200: try {
201: for (Iterator iter = renderingJobs.iterator(); iter
202: .hasNext();) {
203: RenderingJob job = (RenderingJob) iter.next();
204: PortletContent portletContent = job.getPortletContent();
205:
206: synchronized (portletContent) {
207: if (!portletContent.isComplete()) {
208: portletContent.wait();
209: }
210: }
211: }
212: } catch (Exception e) {
213: log
214: .error(
215: "Exception during synchronizing all portlet rendering jobs.",
216: e);
217: }
218: }
219:
220: /**
221: * Put back the worker in the idle queue unless there are pending jobs and
222: * worker can still be committed to a new job before being released.
223: */
224: protected void release(Worker worker) {
225: // if worker can still proces some jobs assign the first
226: // backlog job to this worker, else reset job count and put
227: // it on the idle queue.
228:
229: long jobTimeout = 0;
230:
231: RenderingJob oldJob = (RenderingJob) worker.getJob();
232: if (oldJob != null) {
233: jobTimeout = oldJob.getTimeout();
234: }
235:
236: synchronized (worker) {
237: RenderingJob job = null;
238:
239: if (worker.getJobCount() < this .maxJobsPerWorker) {
240: job = (RenderingJob) queue.pop();
241:
242: if (job != null) {
243: AccessControlContext context = (AccessControlContext) job
244: .getWorkerAttribute(ACCESS_CONTROL_CONTEXT_WORKER_ATTR);
245: worker.setJob(job, context);
246: runningJobs--;
247: return;
248: }
249: }
250:
251: if (job == null) {
252: worker.setJob(null);
253: worker.resetJobCount();
254: runningJobs--;
255: }
256: }
257:
258: if (jobTimeout > 0) {
259: workersMonitored.remove(worker);
260: }
261:
262: synchronized (this .workers) {
263: this .workers.push(worker);
264: }
265: }
266:
267: public int getQueuedJobsCount() {
268: return queue.size();
269: }
270:
271: /**
272: * Returns a snapshot of the available jobs
273: * @return available jobs
274: */
275: public int getAvailableJobsCount() {
276: return workers.size();
277: }
278:
279: public int getRunningJobsCount() {
280: return this .tg.activeCount();
281: }
282:
283: class RenderingJobTimeoutMonitor extends Thread {
284:
285: long interval = 1000;
286: boolean shouldRun = true;
287:
288: RenderingJobTimeoutMonitor(long interval) {
289: super ("RenderingJobTimeoutMonitor");
290:
291: if (interval > 0) {
292: this .interval = interval;
293: }
294: }
295:
296: /**
297: * Thread.stop() is deprecated.
298: * This method achieves the same by setting the run varaible "shouldRun" to false and interrupting the Thread,
299: * effectively causing the thread to shutdown correctly.
300: *
301: */
302: public void endThread() {
303: shouldRun = false;
304: this .interrupt();
305: }
306:
307: public void run() {
308: while (shouldRun) {
309: try {
310: // Because a timeout worker can be removed
311: // in the workersMonitored collection during iterating,
312: // copy timeout workers in the following collection to kill later.
313:
314: List timeoutWorkers = new ArrayList();
315:
316: synchronized (workersMonitored) {
317: for (Iterator it = workersMonitored.iterator(); it
318: .hasNext();) {
319: WorkerImpl worker = (WorkerImpl) it.next();
320: RenderingJob job = (RenderingJob) worker
321: .getJob();
322:
323: if ((null != job) && (job.isTimeout())) {
324: timeoutWorkers.add(worker);
325: }
326: }
327: }
328:
329: // Now, we can kill the timeout worker(s).
330: for (Iterator it = timeoutWorkers.iterator(); it
331: .hasNext();) {
332: WorkerImpl worker = (WorkerImpl) it.next();
333: RenderingJob job = (RenderingJob) worker
334: .getJob();
335:
336: // If the job is just completed, then do not kill the worker.
337: if ((null != job) && (job.isTimeout())) {
338: killJob(worker, job);
339: }
340: }
341: } catch (Exception e) {
342: log.error("Exception during job monitoring.", e);
343: }
344:
345: try {
346: synchronized (this ) {
347: wait(this .interval);
348: }
349: } catch (InterruptedException e) {
350: ;
351: }
352: }
353: }
354:
355: public void killJob(WorkerImpl worker, RenderingJob job) {
356: try {
357: if (log.isWarnEnabled()) {
358: PortletWindow window = job.getWindow();
359: ObjectID windowId = (null != window ? window
360: .getId() : null);
361: log
362: .warn("Portlet Rendering job to be interrupted by timeout ("
363: + job.getTimeout()
364: + "ms): "
365: + windowId);
366: }
367:
368: PortletContent content = job.getPortletContent();
369:
370: synchronized (content) {
371: if (!content.isComplete()) {
372: worker.interrupt();
373: content.wait();
374: }
375: }
376:
377: } catch (Exception e) {
378: log.error("Exceptiong during job killing.", e);
379: }
380: }
381:
382: }
383: }
|