001: /*
002: * Copyright 2002-2007 the original author or authors.
003: *
004: * Licensed under the Apache License, Version 2.0 (the "License");
005: * you may not use this file except in compliance with the License.
006: * You may obtain a copy of the License at
007: *
008: * http://www.apache.org/licenses/LICENSE-2.0
009: *
010: * Unless required by applicable law or agreed to in writing, software
011: * distributed under the License is distributed on an "AS IS" BASIS,
012: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013: * See the License for the specific language governing permissions and
014: * limitations under the License.
015: */
016:
017: package org.springframework.jca.work;
018:
019: import javax.resource.spi.work.ExecutionContext;
020: import javax.resource.spi.work.Work;
021: import javax.resource.spi.work.WorkAdapter;
022: import javax.resource.spi.work.WorkCompletedException;
023: import javax.resource.spi.work.WorkEvent;
024: import javax.resource.spi.work.WorkException;
025: import javax.resource.spi.work.WorkListener;
026: import javax.resource.spi.work.WorkManager;
027: import javax.resource.spi.work.WorkRejectedException;
028:
029: import org.springframework.core.task.AsyncTaskExecutor;
030: import org.springframework.core.task.SimpleAsyncTaskExecutor;
031: import org.springframework.core.task.SyncTaskExecutor;
032: import org.springframework.core.task.TaskExecutor;
033: import org.springframework.core.task.TaskRejectedException;
034: import org.springframework.core.task.TaskTimeoutException;
035: import org.springframework.util.Assert;
036:
037: /**
038: * Simple JCA 1.5 {@link javax.resource.spi.work.WorkManager} implementation that
039: * delegates to a Spring {@link org.springframework.core.task.TaskExecutor}.
040: * Provides simple task execution including start timeouts, but without support
041: * for a JCA ExecutionContext (i.e. without support for imported transactions).
042: *
043: * <p>Uses a {@link org.springframework.core.task.SyncTaskExecutor} for {@link #doWork}
044: * calls and a {@link org.springframework.core.task.SimpleAsyncTaskExecutor}
045: * for {@link #startWork} and {@link #scheduleWork} calls, by default.
046: * These default task executors can be overridden through configuration.
047: *
048: * <p><b>NOTE: This WorkManager does not provide thread pooling by default!</b>
049: * Specify a {@link org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor}
050: * (or any other thread-pooling TaskExecutor) as "asyncTaskExecutor" in order to
051: * achieve actual thread pooling.
052: *
053: * <p>This WorkManager automatically detects a specified
054: * {@link org.springframework.core.task.AsyncTaskExecutor} implementation
055: * and uses its extended timeout functionality where appropriate.
056: * JCA WorkListeners are fully supported in any case.
057: *
058: * @author Juergen Hoeller
059: * @since 2.0.3
060: * @see #setSyncTaskExecutor
061: * @see #setAsyncTaskExecutor
062: */
063: public class SimpleTaskWorkManager implements WorkManager {
064:
065: private TaskExecutor syncTaskExecutor = new SyncTaskExecutor();
066:
067: private TaskExecutor asyncTaskExecutor = new SimpleAsyncTaskExecutor();
068:
069: /**
070: * Specify the TaskExecutor to use for <i>synchronous</i> work execution
071: * (i.e. {@link #doWork} calls).
072: * <p>Default is a {@link org.springframework.core.task.SyncTaskExecutor}.
073: */
074: public void setSyncTaskExecutor(TaskExecutor syncTaskExecutor) {
075: this .syncTaskExecutor = syncTaskExecutor;
076: }
077:
078: /**
079: * Specify the TaskExecutor to use for <i>asynchronous</i> work execution
080: * (i.e. {@link #startWork} and {@link #scheduleWork} calls).
081: * <p>This will typically (but not necessarily) be an
082: * {@link org.springframework.core.task.AsyncTaskExecutor} implementation.
083: * Default is a {@link org.springframework.core.task.SimpleAsyncTaskExecutor}.
084: */
085: public void setAsyncTaskExecutor(TaskExecutor asyncTaskExecutor) {
086: this .asyncTaskExecutor = asyncTaskExecutor;
087: }
088:
089: public void doWork(Work work) throws WorkException {
090: doWork(work, WorkManager.INDEFINITE, null, null);
091: }
092:
093: public void doWork(Work work, long startTimeout,
094: ExecutionContext executionContext, WorkListener workListener)
095: throws WorkException {
096:
097: Assert.state(this .syncTaskExecutor != null,
098: "No 'syncTaskExecutor' set");
099: executeWork(this .syncTaskExecutor, work, startTimeout, false,
100: executionContext, workListener);
101: }
102:
103: public long startWork(Work work) throws WorkException {
104: return startWork(work, WorkManager.INDEFINITE, null, null);
105: }
106:
107: public long startWork(Work work, long startTimeout,
108: ExecutionContext executionContext, WorkListener workListener)
109: throws WorkException {
110:
111: Assert.state(this .asyncTaskExecutor != null,
112: "No 'asyncTaskExecutor' set");
113: return executeWork(this .asyncTaskExecutor, work, startTimeout,
114: true, executionContext, workListener);
115: }
116:
117: public void scheduleWork(Work work) throws WorkException {
118: scheduleWork(work, WorkManager.INDEFINITE, null, null);
119: }
120:
121: public void scheduleWork(Work work, long startTimeout,
122: ExecutionContext executionContext, WorkListener workListener)
123: throws WorkException {
124:
125: Assert.state(this .asyncTaskExecutor != null,
126: "No 'asyncTaskExecutor' set");
127: executeWork(this .asyncTaskExecutor, work, startTimeout, false,
128: executionContext, workListener);
129: }
130:
131: /**
132: * Execute the given Work on the specified TaskExecutor.
133: * @param taskExecutor the TaskExecutor to use
134: * @param work the Work to execute
135: * @param startTimeout the time duration within which the Work is supposed to start
136: * @param blockUntilStarted whether to block until the Work has started
137: * @param executionContext the JCA ExecutionContext for the given Work
138: * @param workListener the WorkListener to clal for the given Work
139: * @return the time elapsed from Work acceptance until start of execution
140: * (or -1 if not applicable or not known)
141: * @throws WorkException if the TaskExecutor did not accept the Work
142: */
143: protected long executeWork(TaskExecutor taskExecutor, Work work,
144: long startTimeout, boolean blockUntilStarted,
145: ExecutionContext executionContext, WorkListener workListener)
146: throws WorkException {
147:
148: if (executionContext != null
149: && executionContext.getXid() != null) {
150: throw new WorkException(
151: "SimpleTaskWorkManager does not supported imported XIDs: "
152: + executionContext.getXid());
153: }
154: WorkListener workListenerToUse = workListener;
155: if (workListenerToUse == null) {
156: workListenerToUse = new WorkAdapter();
157: }
158:
159: boolean isAsync = (taskExecutor instanceof AsyncTaskExecutor);
160: DelegatingWorkAdapter workHandle = new DelegatingWorkAdapter(
161: work, workListenerToUse, !isAsync);
162: try {
163: if (isAsync) {
164: ((AsyncTaskExecutor) taskExecutor).execute(workHandle,
165: startTimeout);
166: } else {
167: taskExecutor.execute(workHandle);
168: }
169: } catch (TaskTimeoutException ex) {
170: WorkException wex = new WorkRejectedException(
171: "TaskExecutor rejected Work because of timeout: "
172: + work, ex);
173: wex.setErrorCode(WorkException.START_TIMED_OUT);
174: workListenerToUse.workRejected(new WorkEvent(this ,
175: WorkEvent.WORK_REJECTED, work, wex));
176: throw wex;
177: } catch (TaskRejectedException ex) {
178: WorkException wex = new WorkRejectedException(
179: "TaskExecutor rejected Work: " + work, ex);
180: wex.setErrorCode(WorkException.INTERNAL);
181: workListenerToUse.workRejected(new WorkEvent(this ,
182: WorkEvent.WORK_REJECTED, work, wex));
183: throw wex;
184: } catch (Throwable ex) {
185: WorkException wex = new WorkException(
186: "TaskExecutor failed to execute Work: " + work, ex);
187: wex.setErrorCode(WorkException.INTERNAL);
188: throw wex;
189: }
190: if (isAsync) {
191: workListenerToUse.workAccepted(new WorkEvent(this ,
192: WorkEvent.WORK_ACCEPTED, work, null));
193: }
194:
195: if (blockUntilStarted) {
196: long acceptanceTime = System.currentTimeMillis();
197: synchronized (workHandle.monitor) {
198: try {
199: while (!workHandle.started) {
200: workHandle.monitor.wait();
201: }
202: } catch (InterruptedException ex) {
203: Thread.currentThread().interrupt();
204: }
205: }
206: return (System.currentTimeMillis() - acceptanceTime);
207: } else {
208: return WorkManager.UNKNOWN;
209: }
210: }
211:
212: /**
213: * Work adapter that supports start timeouts and WorkListener callbacks
214: * for a given Work that it delegates to.
215: */
216: private static class DelegatingWorkAdapter implements Work {
217:
218: private final Work work;
219:
220: private final WorkListener workListener;
221:
222: private final boolean acceptOnExecution;
223:
224: public final Object monitor = new Object();
225:
226: public boolean started = false;
227:
228: public DelegatingWorkAdapter(Work work,
229: WorkListener workListener, boolean acceptOnExecution) {
230: this .work = work;
231: this .workListener = workListener;
232: this .acceptOnExecution = acceptOnExecution;
233: }
234:
235: public void run() {
236: if (this .acceptOnExecution) {
237: this .workListener.workAccepted(new WorkEvent(this ,
238: WorkEvent.WORK_ACCEPTED, work, null));
239: }
240: synchronized (this .monitor) {
241: this .started = true;
242: this .monitor.notify();
243: }
244: this .workListener.workStarted(new WorkEvent(this ,
245: WorkEvent.WORK_STARTED, this .work, null));
246: try {
247: this .work.run();
248: } catch (RuntimeException ex) {
249: this .workListener.workCompleted(new WorkEvent(this ,
250: WorkEvent.WORK_COMPLETED, this .work,
251: new WorkCompletedException(ex)));
252: throw ex;
253: } catch (Error err) {
254: this .workListener.workCompleted(new WorkEvent(this ,
255: WorkEvent.WORK_COMPLETED, this .work,
256: new WorkCompletedException(err)));
257: throw err;
258: }
259: this .workListener.workCompleted(new WorkEvent(this ,
260: WorkEvent.WORK_COMPLETED, this .work, null));
261: }
262:
263: public void release() {
264: this.work.release();
265: }
266: }
267:
268: }
|