Source Code Cross Referenced for CommonjWorkerMonitorImpl.java in  » Portal » jetspeed-2.1.3 » org » apache » jetspeed » aggregator » impl » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Portal » jetspeed 2.1.3 » org.apache.jetspeed.aggregator.impl 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        /*
002:         * Licensed to the Apache Software Foundation (ASF) under one or more
003:         * contributor license agreements.  See the NOTICE file distributed with
004:         * this work for additional information regarding copyright ownership.
005:         * The ASF licenses this file to You under the Apache License, Version 2.0
006:         * (the "License"); you may not use this file except in compliance with
007:         * the License.  You may obtain a copy of the License at
008:         * 
009:         *      http://www.apache.org/licenses/LICENSE-2.0
010:         * 
011:         * Unless required by applicable law or agreed to in writing, software
012:         * distributed under the License is distributed on an "AS IS" BASIS,
013:         * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014:         * See the License for the specific language governing permissions and
015:         * limitations under the License.
016:         */
017:
018:        package org.apache.jetspeed.aggregator.impl;
019:
020:        import java.security.AccessControlContext;
021:        import java.security.AccessController;
022:        import java.util.List;
023:        import java.util.ArrayList;
024:        import java.util.Iterator;
025:        import java.util.Collection;
026:        import java.util.Collections;
027:        import java.util.Map;
028:        import java.util.HashMap;
029:        import java.util.Arrays;
030:
031:        import org.apache.commons.logging.Log;
032:        import org.apache.commons.logging.LogFactory;
033:        import org.apache.jetspeed.aggregator.RenderingJob;
034:        import org.apache.jetspeed.aggregator.Worker;
035:        import org.apache.jetspeed.aggregator.WorkerMonitor;
036:        import org.apache.jetspeed.aggregator.PortletContent;
037:
038:        import org.apache.pluto.om.window.PortletWindow;
039:        import org.apache.pluto.om.common.ObjectID;
040:
041:        import commonj.work.WorkManager;
042:        import commonj.work.Work;
043:        import commonj.work.WorkItem;
044:        import commonj.work.WorkListener;
045:        import commonj.work.WorkEvent;
046:
047:        /**
048:         * The CommonjWorkerMonitorImpl is responsible for dispatching jobs to workers
049:         * It wraps CommonJ WorkManager supported by IBM WebSphere and BEA WebLogic sever.
050:         *
051:         * @author <a href="mailto:woon_san@apache.org">Woonsan Ko</a>
052:         * @version $Id: CommonjWorkerMonitorImpl.java 568339 2007-08-22 00:14:51Z ate $
053:         */
054:        public class CommonjWorkerMonitorImpl implements  WorkerMonitor,
055:                WorkListener {
056:
057:            public static final String ACCESS_CONTROL_CONTEXT_WORKER_ATTR = AccessControlContext.class
058:                    .getName();
059:            public static final String COMMONJ_WORK_ITEM_ATTR = WorkItem.class
060:                    .getName();
061:            public static final String WORKER_THREAD_ATTR = Worker.class
062:                    .getName();
063:
064:            /** CommonJ Work Manamger provided by JavaEE container */
065:            protected WorkManager workManager;
066:
067:            /** If true, invoke interrupt() on the worker thread when the job is timeout. */
068:            protected boolean interruptOnTimeout = true;
069:
070:            /** Enable rendering job works monitor thread for timeout checking */
071:            protected boolean jobWorksMonitorEnabled = true;
072:
073:            /** Rendering job works to be monitored for timeout checking */
074:            protected Map jobWorksMonitored = Collections
075:                    .synchronizedMap(new HashMap());
076:
077:            public CommonjWorkerMonitorImpl(WorkManager workManager) {
078:                this (workManager, true);
079:            }
080:
081:            public CommonjWorkerMonitorImpl(WorkManager workManager,
082:                    boolean jobWorksMonitorEnabled) {
083:                this (workManager, jobWorksMonitorEnabled, true);
084:            }
085:
086:            public CommonjWorkerMonitorImpl(WorkManager workManager,
087:                    boolean jobWorksMonitorEnabled, boolean interruptOnTimeout) {
088:                this .workManager = workManager;
089:                this .jobWorksMonitorEnabled = jobWorksMonitorEnabled;
090:                this .interruptOnTimeout = interruptOnTimeout;
091:            }
092:
093:            /** Commons logging */
094:            protected final static Log log = LogFactory
095:                    .getLog(CommonjWorkerMonitorImpl.class);
096:
097:            /** Renering Job Timeout monitor */
098:            protected CommonjWorkerRenderingJobTimeoutMonitor jobMonitor = null;
099:
100:            public void start() {
101:                if (this .jobWorksMonitorEnabled) {
102:                    jobMonitor = new CommonjWorkerRenderingJobTimeoutMonitor(
103:                            1000);
104:                    jobMonitor.start();
105:                }
106:            }
107:
108:            public void stop() {
109:                if (jobMonitor != null) {
110:                    jobMonitor.endThread();
111:                }
112:
113:                jobMonitor = null;
114:            }
115:
116:            /**
117:             * Assign a job to a worker and execute it or queue the job if no
118:             * worker is available.
119:             *
120:             * @param job the Job to process
121:             */
122:            public void process(RenderingJob job) {
123:                AccessControlContext context = AccessController.getContext();
124:                job.setWorkerAttribute(ACCESS_CONTROL_CONTEXT_WORKER_ATTR,
125:                        context);
126:
127:                try {
128:                    RenderingJobCommonjWork jobWork = new RenderingJobCommonjWork(
129:                            job);
130:                    WorkItem workItem = this .workManager
131:                            .schedule(jobWork, this );
132:                    job.setWorkerAttribute(COMMONJ_WORK_ITEM_ATTR, workItem);
133:
134:                    if (this .jobWorksMonitorEnabled) {
135:                        this .jobWorksMonitored.put(workItem, jobWork);
136:                    }
137:                } catch (Throwable t) {
138:                    log.error("Worker exception", t);
139:                }
140:            }
141:
142:            public int getQueuedJobsCount() {
143:                return 0;
144:            }
145:
146:            /**
147:             * Wait for all rendering jobs in the collection to finish successfully or otherwise. 
148:             * @param renderingJobs the Collection of rendering job objects to wait for.
149:             */
150:            public void waitForRenderingJobs(List renderingJobs) {
151:                if (this .jobWorksMonitorEnabled) {
152:                    try {
153:                        for (Iterator iter = renderingJobs.iterator(); iter
154:                                .hasNext();) {
155:                            RenderingJob job = (RenderingJob) iter.next();
156:                            PortletContent portletContent = job
157:                                    .getPortletContent();
158:
159:                            synchronized (portletContent) {
160:                                if (!portletContent.isComplete()) {
161:                                    portletContent.wait();
162:                                }
163:                            }
164:                        }
165:                    } catch (Exception e) {
166:                        log
167:                                .error(
168:                                        "Exception during synchronizing all portlet rendering jobs.",
169:                                        e);
170:                    }
171:                } else {
172:                    // We cannot use WorkingManager#waitForAll(workitems, timeout_ms) for timeout.
173:                    // The second argument could be either WorkManager.IMMEDIATE or WorkManager.INDEFINITE.
174:
175:                    try {
176:                        if (!renderingJobs.isEmpty()) {
177:                            Object lock = new Object();
178:                            MonitoringJobCommonjWork monitoringWork = new MonitoringJobCommonjWork(
179:                                    lock, renderingJobs);
180:
181:                            synchronized (lock) {
182:                                WorkItem monitorWorkItem = this .workManager
183:                                        .schedule(monitoringWork, this );
184:                                lock.wait();
185:                            }
186:                        }
187:                    } catch (Exception e) {
188:                        log
189:                                .error(
190:                                        "Exception during synchronizing all portlet rendering jobs.",
191:                                        e);
192:                    }
193:                }
194:            }
195:
196:            /**
197:             * Returns a snapshot of the available jobs
198:             * @return available jobs
199:             */
200:            public int getAvailableJobsCount() {
201:                return 0;
202:            }
203:
204:            public int getRunningJobsCount() {
205:                return 0;
206:            }
207:
208:            // commonj.work.WorkListener implementations
209:
210:            public void workAccepted(WorkEvent we) {
211:                WorkItem workItem = we.getWorkItem();
212:                if (log.isDebugEnabled())
213:                    log.debug("[CommonjWorkMonitorImpl] workAccepted: "
214:                            + workItem);
215:            }
216:
217:            public void workRejected(WorkEvent we) {
218:                WorkItem workItem = we.getWorkItem();
219:                if (log.isDebugEnabled())
220:                    log.debug("[CommonjWorkMonitorImpl] workRejected: "
221:                            + workItem);
222:
223:                if (this .jobWorksMonitorEnabled) {
224:                    removeMonitoredJobWork(workItem);
225:                }
226:            }
227:
228:            public void workStarted(WorkEvent we) {
229:                WorkItem workItem = we.getWorkItem();
230:                if (log.isDebugEnabled())
231:                    log.debug("[CommonjWorkMonitorImpl] workStarted: "
232:                            + workItem);
233:            }
234:
235:            public void workCompleted(WorkEvent we) {
236:                WorkItem workItem = we.getWorkItem();
237:                if (log.isDebugEnabled())
238:                    log.debug("[CommonjWorkMonitorImpl] workCompleted: "
239:                            + workItem);
240:
241:                if (this .jobWorksMonitorEnabled) {
242:                    removeMonitoredJobWork(workItem);
243:                }
244:            }
245:
246:            protected Object removeMonitoredJobWork(WorkItem workItem) {
247:                return this .jobWorksMonitored.remove(workItem);
248:            }
249:
250:            class RenderingJobCommonjWork implements  Work {
251:
252:                protected RenderingJob job;
253:
254:                public RenderingJobCommonjWork(RenderingJob job) {
255:                    this .job = job;
256:                }
257:
258:                public boolean isDaemon() {
259:                    return false;
260:                }
261:
262:                public void run() {
263:                    if (jobWorksMonitorEnabled || interruptOnTimeout) {
264:                        this .job.setWorkerAttribute(WORKER_THREAD_ATTR, Thread
265:                                .currentThread());
266:                    }
267:
268:                    this .job.run();
269:                }
270:
271:                public void release() {
272:                }
273:
274:                public RenderingJob getRenderingJob() {
275:                    return this .job;
276:                }
277:            }
278:
279:            class MonitoringJobCommonjWork implements  Work {
280:
281:                protected Object lock;
282:                protected List renderingJobs;
283:
284:                public MonitoringJobCommonjWork(Object lock, List jobs) {
285:                    this .lock = lock;
286:                    this .renderingJobs = new ArrayList(jobs);
287:                }
288:
289:                public boolean isDaemon() {
290:                    return false;
291:                }
292:
293:                public void run() {
294:                    try {
295:                        while (!this .renderingJobs.isEmpty()) {
296:                            for (Iterator it = this .renderingJobs.iterator(); it
297:                                    .hasNext();) {
298:                                RenderingJob job = (RenderingJob) it.next();
299:                                WorkItem workItem = (WorkItem) job
300:                                        .getWorkerAttribute(COMMONJ_WORK_ITEM_ATTR);
301:                                int status = WorkEvent.WORK_ACCEPTED;
302:
303:                                if (workItem != null) {
304:                                    status = workItem.getStatus();
305:                                }
306:
307:                                boolean isTimeout = job.isTimeout();
308:
309:                                if (isTimeout) {
310:                                    PortletContent content = job
311:                                            .getPortletContent();
312:
313:                                    if (interruptOnTimeout) {
314:                                        Thread worker = (Thread) job
315:                                                .getWorkerAttribute(WORKER_THREAD_ATTR);
316:
317:                                        if (worker != null) {
318:                                            synchronized (content) {
319:                                                if (!content.isComplete()) {
320:                                                    worker.interrupt();
321:                                                    content.wait();
322:                                                }
323:                                            }
324:                                        }
325:                                    } else {
326:                                        synchronized (content) {
327:                                            content.completeWithError();
328:                                        }
329:                                    }
330:                                }
331:
332:                                if (status == WorkEvent.WORK_COMPLETED
333:                                        || status == WorkEvent.WORK_REJECTED
334:                                        || isTimeout) {
335:                                    it.remove();
336:                                }
337:                            }
338:
339:                            if (!this .renderingJobs.isEmpty()) {
340:                                synchronized (this ) {
341:                                    wait(100);
342:                                }
343:                            }
344:                        }
345:
346:                        synchronized (this .lock) {
347:                            this .lock.notify();
348:                        }
349:                    } catch (Exception e) {
350:                        log.error("Exceptiong during job timeout monitoring.",
351:                                e);
352:                    }
353:                }
354:
355:                public void release() {
356:                }
357:
358:            }
359:
360:            class CommonjWorkerRenderingJobTimeoutMonitor extends Thread {
361:
362:                long interval = 1000;
363:                boolean shouldRun = true;
364:
365:                CommonjWorkerRenderingJobTimeoutMonitor(long interval) {
366:                    super ("CommonjWorkerRenderingJobTimeoutMonitor");
367:
368:                    if (interval > 0) {
369:                        this .interval = interval;
370:                    }
371:                }
372:
373:                /**
374:                 * Thread.stop() is deprecated.
375:                 * This method achieves the same by setting the run varaible "shouldRun" to false and interrupting the Thread, 
376:                 * effectively causing the thread to shutdown correctly.
377:                 *
378:                 */
379:                public void endThread() {
380:                    shouldRun = false;
381:                    this .interrupt();
382:                }
383:
384:                public void run() {
385:                    while (shouldRun) {
386:                        try {
387:                            List timeoutJobWorks = new ArrayList();
388:                            Collection jobWorks = Arrays
389:                                    .asList(jobWorksMonitored.values()
390:                                            .toArray());
391:
392:                            for (Iterator it = jobWorks.iterator(); it
393:                                    .hasNext();) {
394:                                RenderingJobCommonjWork jobWork = (RenderingJobCommonjWork) it
395:                                        .next();
396:                                RenderingJob job = jobWork.getRenderingJob();
397:
398:                                if (job.isTimeout()) {
399:                                    timeoutJobWorks.add(jobWork);
400:                                }
401:                            }
402:
403:                            // Now, we can kill the timeout worker(s).
404:                            for (Iterator it = timeoutJobWorks.iterator(); it
405:                                    .hasNext();) {
406:                                RenderingJobCommonjWork jobWork = (RenderingJobCommonjWork) it
407:                                        .next();
408:                                RenderingJob job = jobWork.getRenderingJob();
409:
410:                                // If the job is just completed, then do not kill the worker.
411:                                if (job.isTimeout()) {
412:                                    killJobWork(jobWork);
413:                                }
414:                            }
415:                        } catch (Exception e) {
416:                            log.error("Exception during job monitoring.", e);
417:                        }
418:
419:                        try {
420:                            synchronized (this ) {
421:                                wait(this .interval);
422:                            }
423:                        } catch (InterruptedException e) {
424:                            ;
425:                        }
426:                    }
427:                }
428:
429:                public void killJobWork(RenderingJobCommonjWork jobWork) {
430:                    RenderingJob job = jobWork.getRenderingJob();
431:
432:                    try {
433:                        if (log.isWarnEnabled()) {
434:                            PortletWindow window = job.getWindow();
435:                            ObjectID windowId = (null != window ? window
436:                                    .getId() : null);
437:                            log
438:                                    .warn("Portlet Rendering job to be interrupted by timeout ("
439:                                            + job.getTimeout()
440:                                            + "ms): "
441:                                            + windowId);
442:                        }
443:
444:                        PortletContent content = job.getPortletContent();
445:                        Thread worker = (Thread) job
446:                                .getWorkerAttribute(WORKER_THREAD_ATTR);
447:
448:                        if (worker != null) {
449:                            synchronized (content) {
450:                                if (!content.isComplete()) {
451:                                    worker.interrupt();
452:                                    content.wait();
453:                                }
454:                            }
455:                        }
456:                    } catch (Exception e) {
457:                        log.error("Exceptiong during job killing.", e);
458:                    } finally {
459:                        WorkItem workItem = (WorkItem) job
460:                                .getWorkerAttribute(COMMONJ_WORK_ITEM_ATTR);
461:
462:                        if (workItem != null) {
463:                            removeMonitoredJobWork(workItem);
464:                        }
465:                    }
466:                }
467:
468:            }
469:
470:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.