001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017:
018: package org.apache.jetspeed.aggregator.impl;
019:
020: import java.security.AccessControlContext;
021: import java.security.AccessController;
022: import java.util.List;
023: import java.util.ArrayList;
024: import java.util.Iterator;
025: import java.util.Collection;
026: import java.util.Collections;
027: import java.util.Map;
028: import java.util.HashMap;
029: import java.util.Arrays;
030:
031: import org.apache.commons.logging.Log;
032: import org.apache.commons.logging.LogFactory;
033: import org.apache.jetspeed.aggregator.RenderingJob;
034: import org.apache.jetspeed.aggregator.Worker;
035: import org.apache.jetspeed.aggregator.WorkerMonitor;
036: import org.apache.jetspeed.aggregator.PortletContent;
037:
038: import org.apache.pluto.om.window.PortletWindow;
039: import org.apache.pluto.om.common.ObjectID;
040:
041: import commonj.work.WorkManager;
042: import commonj.work.Work;
043: import commonj.work.WorkItem;
044: import commonj.work.WorkListener;
045: import commonj.work.WorkEvent;
046:
047: /**
048: * The CommonjWorkerMonitorImpl is responsible for dispatching jobs to workers
049: * It wraps CommonJ WorkManager supported by IBM WebSphere and BEA WebLogic sever.
050: *
051: * @author <a href="mailto:woon_san@apache.org">Woonsan Ko</a>
052: * @version $Id: CommonjWorkerMonitorImpl.java 568339 2007-08-22 00:14:51Z ate $
053: */
054: public class CommonjWorkerMonitorImpl implements WorkerMonitor,
055: WorkListener {
056:
057: public static final String ACCESS_CONTROL_CONTEXT_WORKER_ATTR = AccessControlContext.class
058: .getName();
059: public static final String COMMONJ_WORK_ITEM_ATTR = WorkItem.class
060: .getName();
061: public static final String WORKER_THREAD_ATTR = Worker.class
062: .getName();
063:
064: /** CommonJ Work Manamger provided by JavaEE container */
065: protected WorkManager workManager;
066:
067: /** If true, invoke interrupt() on the worker thread when the job is timeout. */
068: protected boolean interruptOnTimeout = true;
069:
070: /** Enable rendering job works monitor thread for timeout checking */
071: protected boolean jobWorksMonitorEnabled = true;
072:
073: /** Rendering job works to be monitored for timeout checking */
074: protected Map jobWorksMonitored = Collections
075: .synchronizedMap(new HashMap());
076:
077: public CommonjWorkerMonitorImpl(WorkManager workManager) {
078: this (workManager, true);
079: }
080:
081: public CommonjWorkerMonitorImpl(WorkManager workManager,
082: boolean jobWorksMonitorEnabled) {
083: this (workManager, jobWorksMonitorEnabled, true);
084: }
085:
086: public CommonjWorkerMonitorImpl(WorkManager workManager,
087: boolean jobWorksMonitorEnabled, boolean interruptOnTimeout) {
088: this .workManager = workManager;
089: this .jobWorksMonitorEnabled = jobWorksMonitorEnabled;
090: this .interruptOnTimeout = interruptOnTimeout;
091: }
092:
093: /** Commons logging */
094: protected final static Log log = LogFactory
095: .getLog(CommonjWorkerMonitorImpl.class);
096:
097: /** Renering Job Timeout monitor */
098: protected CommonjWorkerRenderingJobTimeoutMonitor jobMonitor = null;
099:
100: public void start() {
101: if (this .jobWorksMonitorEnabled) {
102: jobMonitor = new CommonjWorkerRenderingJobTimeoutMonitor(
103: 1000);
104: jobMonitor.start();
105: }
106: }
107:
108: public void stop() {
109: if (jobMonitor != null) {
110: jobMonitor.endThread();
111: }
112:
113: jobMonitor = null;
114: }
115:
116: /**
117: * Assign a job to a worker and execute it or queue the job if no
118: * worker is available.
119: *
120: * @param job the Job to process
121: */
122: public void process(RenderingJob job) {
123: AccessControlContext context = AccessController.getContext();
124: job.setWorkerAttribute(ACCESS_CONTROL_CONTEXT_WORKER_ATTR,
125: context);
126:
127: try {
128: RenderingJobCommonjWork jobWork = new RenderingJobCommonjWork(
129: job);
130: WorkItem workItem = this .workManager
131: .schedule(jobWork, this );
132: job.setWorkerAttribute(COMMONJ_WORK_ITEM_ATTR, workItem);
133:
134: if (this .jobWorksMonitorEnabled) {
135: this .jobWorksMonitored.put(workItem, jobWork);
136: }
137: } catch (Throwable t) {
138: log.error("Worker exception", t);
139: }
140: }
141:
142: public int getQueuedJobsCount() {
143: return 0;
144: }
145:
146: /**
147: * Wait for all rendering jobs in the collection to finish successfully or otherwise.
148: * @param renderingJobs the Collection of rendering job objects to wait for.
149: */
150: public void waitForRenderingJobs(List renderingJobs) {
151: if (this .jobWorksMonitorEnabled) {
152: try {
153: for (Iterator iter = renderingJobs.iterator(); iter
154: .hasNext();) {
155: RenderingJob job = (RenderingJob) iter.next();
156: PortletContent portletContent = job
157: .getPortletContent();
158:
159: synchronized (portletContent) {
160: if (!portletContent.isComplete()) {
161: portletContent.wait();
162: }
163: }
164: }
165: } catch (Exception e) {
166: log
167: .error(
168: "Exception during synchronizing all portlet rendering jobs.",
169: e);
170: }
171: } else {
172: // We cannot use WorkingManager#waitForAll(workitems, timeout_ms) for timeout.
173: // The second argument could be either WorkManager.IMMEDIATE or WorkManager.INDEFINITE.
174:
175: try {
176: if (!renderingJobs.isEmpty()) {
177: Object lock = new Object();
178: MonitoringJobCommonjWork monitoringWork = new MonitoringJobCommonjWork(
179: lock, renderingJobs);
180:
181: synchronized (lock) {
182: WorkItem monitorWorkItem = this .workManager
183: .schedule(monitoringWork, this );
184: lock.wait();
185: }
186: }
187: } catch (Exception e) {
188: log
189: .error(
190: "Exception during synchronizing all portlet rendering jobs.",
191: e);
192: }
193: }
194: }
195:
196: /**
197: * Returns a snapshot of the available jobs
198: * @return available jobs
199: */
200: public int getAvailableJobsCount() {
201: return 0;
202: }
203:
204: public int getRunningJobsCount() {
205: return 0;
206: }
207:
208: // commonj.work.WorkListener implementations
209:
210: public void workAccepted(WorkEvent we) {
211: WorkItem workItem = we.getWorkItem();
212: if (log.isDebugEnabled())
213: log.debug("[CommonjWorkMonitorImpl] workAccepted: "
214: + workItem);
215: }
216:
217: public void workRejected(WorkEvent we) {
218: WorkItem workItem = we.getWorkItem();
219: if (log.isDebugEnabled())
220: log.debug("[CommonjWorkMonitorImpl] workRejected: "
221: + workItem);
222:
223: if (this .jobWorksMonitorEnabled) {
224: removeMonitoredJobWork(workItem);
225: }
226: }
227:
228: public void workStarted(WorkEvent we) {
229: WorkItem workItem = we.getWorkItem();
230: if (log.isDebugEnabled())
231: log.debug("[CommonjWorkMonitorImpl] workStarted: "
232: + workItem);
233: }
234:
235: public void workCompleted(WorkEvent we) {
236: WorkItem workItem = we.getWorkItem();
237: if (log.isDebugEnabled())
238: log.debug("[CommonjWorkMonitorImpl] workCompleted: "
239: + workItem);
240:
241: if (this .jobWorksMonitorEnabled) {
242: removeMonitoredJobWork(workItem);
243: }
244: }
245:
246: protected Object removeMonitoredJobWork(WorkItem workItem) {
247: return this .jobWorksMonitored.remove(workItem);
248: }
249:
250: class RenderingJobCommonjWork implements Work {
251:
252: protected RenderingJob job;
253:
254: public RenderingJobCommonjWork(RenderingJob job) {
255: this .job = job;
256: }
257:
258: public boolean isDaemon() {
259: return false;
260: }
261:
262: public void run() {
263: if (jobWorksMonitorEnabled || interruptOnTimeout) {
264: this .job.setWorkerAttribute(WORKER_THREAD_ATTR, Thread
265: .currentThread());
266: }
267:
268: this .job.run();
269: }
270:
271: public void release() {
272: }
273:
274: public RenderingJob getRenderingJob() {
275: return this .job;
276: }
277: }
278:
279: class MonitoringJobCommonjWork implements Work {
280:
281: protected Object lock;
282: protected List renderingJobs;
283:
284: public MonitoringJobCommonjWork(Object lock, List jobs) {
285: this .lock = lock;
286: this .renderingJobs = new ArrayList(jobs);
287: }
288:
289: public boolean isDaemon() {
290: return false;
291: }
292:
293: public void run() {
294: try {
295: while (!this .renderingJobs.isEmpty()) {
296: for (Iterator it = this .renderingJobs.iterator(); it
297: .hasNext();) {
298: RenderingJob job = (RenderingJob) it.next();
299: WorkItem workItem = (WorkItem) job
300: .getWorkerAttribute(COMMONJ_WORK_ITEM_ATTR);
301: int status = WorkEvent.WORK_ACCEPTED;
302:
303: if (workItem != null) {
304: status = workItem.getStatus();
305: }
306:
307: boolean isTimeout = job.isTimeout();
308:
309: if (isTimeout) {
310: PortletContent content = job
311: .getPortletContent();
312:
313: if (interruptOnTimeout) {
314: Thread worker = (Thread) job
315: .getWorkerAttribute(WORKER_THREAD_ATTR);
316:
317: if (worker != null) {
318: synchronized (content) {
319: if (!content.isComplete()) {
320: worker.interrupt();
321: content.wait();
322: }
323: }
324: }
325: } else {
326: synchronized (content) {
327: content.completeWithError();
328: }
329: }
330: }
331:
332: if (status == WorkEvent.WORK_COMPLETED
333: || status == WorkEvent.WORK_REJECTED
334: || isTimeout) {
335: it.remove();
336: }
337: }
338:
339: if (!this .renderingJobs.isEmpty()) {
340: synchronized (this ) {
341: wait(100);
342: }
343: }
344: }
345:
346: synchronized (this .lock) {
347: this .lock.notify();
348: }
349: } catch (Exception e) {
350: log.error("Exceptiong during job timeout monitoring.",
351: e);
352: }
353: }
354:
355: public void release() {
356: }
357:
358: }
359:
360: class CommonjWorkerRenderingJobTimeoutMonitor extends Thread {
361:
362: long interval = 1000;
363: boolean shouldRun = true;
364:
365: CommonjWorkerRenderingJobTimeoutMonitor(long interval) {
366: super ("CommonjWorkerRenderingJobTimeoutMonitor");
367:
368: if (interval > 0) {
369: this .interval = interval;
370: }
371: }
372:
373: /**
374: * Thread.stop() is deprecated.
375: * This method achieves the same by setting the run varaible "shouldRun" to false and interrupting the Thread,
376: * effectively causing the thread to shutdown correctly.
377: *
378: */
379: public void endThread() {
380: shouldRun = false;
381: this .interrupt();
382: }
383:
384: public void run() {
385: while (shouldRun) {
386: try {
387: List timeoutJobWorks = new ArrayList();
388: Collection jobWorks = Arrays
389: .asList(jobWorksMonitored.values()
390: .toArray());
391:
392: for (Iterator it = jobWorks.iterator(); it
393: .hasNext();) {
394: RenderingJobCommonjWork jobWork = (RenderingJobCommonjWork) it
395: .next();
396: RenderingJob job = jobWork.getRenderingJob();
397:
398: if (job.isTimeout()) {
399: timeoutJobWorks.add(jobWork);
400: }
401: }
402:
403: // Now, we can kill the timeout worker(s).
404: for (Iterator it = timeoutJobWorks.iterator(); it
405: .hasNext();) {
406: RenderingJobCommonjWork jobWork = (RenderingJobCommonjWork) it
407: .next();
408: RenderingJob job = jobWork.getRenderingJob();
409:
410: // If the job is just completed, then do not kill the worker.
411: if (job.isTimeout()) {
412: killJobWork(jobWork);
413: }
414: }
415: } catch (Exception e) {
416: log.error("Exception during job monitoring.", e);
417: }
418:
419: try {
420: synchronized (this ) {
421: wait(this .interval);
422: }
423: } catch (InterruptedException e) {
424: ;
425: }
426: }
427: }
428:
429: public void killJobWork(RenderingJobCommonjWork jobWork) {
430: RenderingJob job = jobWork.getRenderingJob();
431:
432: try {
433: if (log.isWarnEnabled()) {
434: PortletWindow window = job.getWindow();
435: ObjectID windowId = (null != window ? window
436: .getId() : null);
437: log
438: .warn("Portlet Rendering job to be interrupted by timeout ("
439: + job.getTimeout()
440: + "ms): "
441: + windowId);
442: }
443:
444: PortletContent content = job.getPortletContent();
445: Thread worker = (Thread) job
446: .getWorkerAttribute(WORKER_THREAD_ATTR);
447:
448: if (worker != null) {
449: synchronized (content) {
450: if (!content.isComplete()) {
451: worker.interrupt();
452: content.wait();
453: }
454: }
455: }
456: } catch (Exception e) {
457: log.error("Exceptiong during job killing.", e);
458: } finally {
459: WorkItem workItem = (WorkItem) job
460: .getWorkerAttribute(COMMONJ_WORK_ITEM_ATTR);
461:
462: if (workItem != null) {
463: removeMonitoredJobWork(workItem);
464: }
465: }
466: }
467:
468: }
469:
470: }
|