001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017:
018: package org.apache.jetspeed.aggregator.impl;
019:
020: import java.security.AccessControlContext;
021: import java.security.PrivilegedAction;
022:
023: import javax.security.auth.Subject;
024:
025: import org.apache.commons.logging.Log;
026: import org.apache.commons.logging.LogFactory;
027: import org.apache.jetspeed.aggregator.RenderingJob;
028: import org.apache.jetspeed.aggregator.Worker;
029: import org.apache.jetspeed.aggregator.WorkerMonitor;
030: import org.apache.jetspeed.security.JSSubject;
031:
032: /**
033: * Worker thread processes jobs and notify its WorkerMonitor when completed.
034: * When no work is available, the worker simply sets itself in a waiting mode
035: * pending reactivation by the WorkerMonitor
036: *
037: * @author <a href="mailto:raphael@apache.org">Raphael Luta</a>
038: * @author <a>Woonsan Ko</a>
039: * @version $Id: WorkerImpl.java 587064 2007-10-22 11:54:11Z woonsan $
040: */
041: public class WorkerImpl extends Thread implements Worker {
042: /** Commons logging */
043: protected final static Log log = LogFactory
044: .getLog(WorkerImpl.class);
045:
046: /** Running status of this worker */
047: private boolean running = true;
048:
049: /** Counter of consecutive jobs that can be processed before the
050: worker being actually put back on the idle queue */
051: private int jobCount = 0;
052:
053: /** Job to process */
054: private Runnable job = null;
055:
056: /** Context to process job within */
057: private AccessControlContext context = null;
058:
059: /** Monitor for this Worker */
060: private WorkerMonitor monitor = null;
061:
062: public WorkerImpl(WorkerMonitor monitor) {
063: super ();
064: this .setMonitor(monitor);
065: this .setDaemon(true);
066: }
067:
068: public WorkerImpl(WorkerMonitor monitor, ThreadGroup tg, String name) {
069: super (tg, name);
070: this .setMonitor(monitor);
071: this .setDaemon(true);
072: }
073:
074: /**
075: * Return the number of jobs processed by this worker since the last time it
076: * has been on the idle queue
077: */
078: public int getJobCount() {
079: return this .jobCount;
080: }
081:
082: /**
083: * Reset the processed job counter
084: */
085: public void resetJobCount() {
086: this .jobCount = 0;
087: }
088:
089: /**
090: * Sets the running status of this Worker. If set to false, the Worker will
091: * stop after processing its current job.
092: */
093: public void setRunning(boolean status) {
094: this .running = status;
095: }
096:
097: /**
098: * Sets the moitor of this worker
099: */
100: public void setMonitor(WorkerMonitor monitor) {
101: this .monitor = monitor;
102: }
103:
104: /**
105: * Sets the job to execute in security context
106: */
107: public void setJob(Runnable job, AccessControlContext context) {
108: this .job = job;
109: this .context = context;
110: }
111:
112: /**
113: * Sets the job to execute
114: */
115: public void setJob(Runnable job) {
116: this .job = job;
117: this .context = null;
118: }
119:
120: /**
121: * Retrieves the job to execute
122: */
123: public Runnable getJob() {
124: return this .job;
125: }
126:
127: /**
128: * Process the job assigned, then notify Monitor. If no job available,
129: * go into sleep mode
130: */
131: public void run() {
132: while (running) {
133: // wait for a job to come
134: synchronized (this ) {
135: if (this .job == null) {
136: try {
137: this .wait();
138: } catch (InterruptedException e) {
139: // nothing done
140: }
141: }
142: }
143:
144: // process it
145: if (this .job != null) {
146: log.debug("Processing job for window :"
147: + ((RenderingJob) job).getWindow().getId());
148: Subject subject = null;
149: if (this .context != null) {
150: subject = JSSubject.getSubject(this .context);
151: }
152: if (subject != null) {
153: JSSubject.doAsPrivileged(subject,
154: new PrivilegedAction() {
155: public Object run() {
156: try {
157: WorkerImpl.this .job.run();
158: } catch (Throwable t) {
159: log.error("Thread error", t);
160: }
161: return null;
162: }
163: }, this .context);
164: } else {
165: try {
166: this .job.run();
167: } catch (Throwable t) {
168: log.error("Thread error", t);
169: }
170: }
171: }
172:
173: this .jobCount++;
174:
175: // release the worker
176: ((WorkerMonitorImpl) monitor).release(this);
177: }
178: }
179:
180: }
|