001: /*
002: * JBoss, Home of Professional Open Source.
003: * Copyright 2006, Red Hat Middleware LLC, and individual contributors
004: * as indicated by the @author tags. See the copyright.txt file in the
005: * distribution for a full listing of individual contributors.
006: *
007: * This is free software; you can redistribute it and/or modify it
008: * under the terms of the GNU Lesser General Public License as
009: * published by the Free Software Foundation; either version 2.1 of
010: * the License, or (at your option) any later version.
011: *
012: * This software is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015: * Lesser General Public License for more details.
016: *
017: * You should have received a copy of the GNU Lesser General Public
018: * License along with this software; if not, write to the Free
019: * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020: * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
021: */
022: package org.jboss.resource.work;
023:
024: import javax.management.ObjectName;
025: import javax.resource.spi.work.ExecutionContext;
026: import javax.resource.spi.work.Work;
027: import javax.resource.spi.work.WorkException;
028: import javax.resource.spi.work.WorkListener;
029: import javax.resource.spi.work.WorkManager;
030: import javax.transaction.xa.Xid;
031:
032: import org.jboss.system.ServiceMBeanSupport;
033: import org.jboss.tm.JBossXATerminator;
034: import org.jboss.util.threadpool.Task;
035: import org.jboss.util.threadpool.ThreadPool;
036:
037: /**
038: * The work manager implementation
039: *
040: * @author <a href="mailto:adrian@jboss.org">Adrian Brock</a>
041: * @version $Revision: 59886 $
042: */
043: public class JBossWorkManager extends ServiceMBeanSupport implements
044: WorkManager, JBossWorkManagerMBean {
045: /** Whether trace is enabled */
046: private boolean trace = log.isTraceEnabled();
047:
048: /** The thread pool */
049: private ThreadPool threadPool;
050:
051: /** The thread pool name */
052: private ObjectName threadPoolName;
053:
054: /** The xa terminator */
055: private JBossXATerminator xaTerminator;
056:
057: /** The xa terminator name */
058: private ObjectName xaTerminatorName;
059:
060: /**
061: * Retrieve the thread pool
062: *
063: * @return the thread pool
064: */
065: public ThreadPool getThreadPool() {
066: return threadPool;
067: }
068:
069: /**
070: * Set the thread pool
071: *
072: * @param threadPool the thread pool
073: */
074: public void setThreadPool(ThreadPool threadPool) {
075: this .threadPool = threadPool;
076: }
077:
078: public ObjectName getThreadPoolName() {
079: return threadPoolName;
080: }
081:
082: public void setThreadPoolName(ObjectName threadPoolName) {
083: this .threadPoolName = threadPoolName;
084: }
085:
086: public ObjectName getXATerminatorName() {
087: return xaTerminatorName;
088: }
089:
090: public void setXATerminatorName(ObjectName xaTerminatorName) {
091: this .xaTerminatorName = xaTerminatorName;
092: }
093:
094: public WorkManager getInstance() {
095: return this ;
096: }
097:
098: public void doWork(Work work, long startTimeout,
099: ExecutionContext ctx, WorkListener listener)
100: throws WorkException {
101: if (ctx == null)
102: ctx = new ExecutionContext();
103: WorkWrapper wrapper = new WorkWrapper(this , work,
104: Task.WAIT_FOR_COMPLETE, startTimeout, ctx, listener);
105: importWork(wrapper);
106: executeWork(wrapper);
107: if (wrapper.getWorkException() != null)
108: throw wrapper.getWorkException();
109: }
110:
111: public void doWork(Work work) throws WorkException {
112: doWork(work, WorkManager.INDEFINITE, null, null);
113: }
114:
115: public long startWork(Work work, long startTimeout,
116: ExecutionContext ctx, WorkListener listener)
117: throws WorkException {
118: if (ctx == null)
119: ctx = new ExecutionContext();
120: WorkWrapper wrapper = new WorkWrapper(this , work,
121: Task.WAIT_FOR_START, startTimeout, ctx, listener);
122: importWork(wrapper);
123: executeWork(wrapper);
124: if (wrapper.getWorkException() != null)
125: throw wrapper.getWorkException();
126: return wrapper.getBlockedElapsed();
127: }
128:
129: public long startWork(Work work) throws WorkException {
130: return startWork(work, WorkManager.INDEFINITE, null, null);
131: }
132:
133: public void scheduleWork(Work work, long startTimeout,
134: ExecutionContext ctx, WorkListener listener)
135: throws WorkException {
136: if (ctx == null)
137: ctx = new ExecutionContext();
138: WorkWrapper wrapper = new WorkWrapper(this , work,
139: Task.WAIT_NONE, startTimeout, ctx, listener);
140: importWork(wrapper);
141: executeWork(wrapper);
142: if (wrapper.getWorkException() != null)
143: throw wrapper.getWorkException();
144: }
145:
146: public void scheduleWork(Work work) throws WorkException {
147: scheduleWork(work, WorkManager.INDEFINITE, null, null);
148: }
149:
150: protected void startService() throws Exception {
151: if (threadPoolName == null)
152: throw new IllegalStateException("No thread pool name");
153:
154: threadPool = (ThreadPool) server.getAttribute(threadPoolName,
155: "Instance");
156:
157: if (xaTerminatorName == null)
158: throw new IllegalStateException("No xa terminator name");
159:
160: xaTerminator = (JBossXATerminator) server.getAttribute(
161: xaTerminatorName, "XATerminator");
162: }
163:
164: /**
165: * Import any work
166: *
167: * @param wrapper the work wrapper
168: * @throws WorkException for any error
169: */
170: protected void importWork(WorkWrapper wrapper) throws WorkException {
171: trace = log.isTraceEnabled();
172: if (trace)
173: log.trace("Importing work " + wrapper);
174:
175: ExecutionContext ctx = wrapper.getExecutionContext();
176: if (ctx != null) {
177: Xid xid = ctx.getXid();
178: if (xid != null) {
179: //JBAS-4002 base value is in seconds as per the API, here we convert to millis
180: long timeout = (ctx.getTransactionTimeout() * 1000);
181: xaTerminator.registerWork(wrapper.getWork(), xid,
182: timeout);
183: }
184: }
185: if (trace)
186: log.trace("Imported work " + wrapper);
187: }
188:
189: /**
190: * Execute the work
191: *
192: * @param wrapper the work wrapper
193: * @throws WorkException for any error
194: */
195: protected void executeWork(WorkWrapper wrapper)
196: throws WorkException {
197: if (trace)
198: log.trace("Submitting work to thread pool " + wrapper);
199:
200: threadPool.runTaskWrapper(wrapper);
201:
202: if (trace)
203: log.trace("Submitted work to thread pool " + wrapper);
204: }
205:
206: /**
207: * Start work
208: *
209: * @param wrapper the work wrapper
210: * @throws WorkException for any error
211: */
212: protected void startWork(WorkWrapper wrapper) throws WorkException {
213: if (trace)
214: log.trace("Starting work " + wrapper);
215:
216: ExecutionContext ctx = wrapper.getExecutionContext();
217: if (ctx != null) {
218: Xid xid = ctx.getXid();
219: if (xid != null) {
220: xaTerminator.startWork(wrapper.getWork(), xid);
221: }
222: }
223: if (trace)
224: log.trace("Started work " + wrapper);
225: }
226:
227: /**
228: * End work
229: *
230: * @param wrapper the work wrapper
231: * @throws WorkException for any error
232: */
233: protected void endWork(WorkWrapper wrapper) {
234: if (trace)
235: log.trace("Ending work " + wrapper);
236:
237: ExecutionContext ctx = wrapper.getExecutionContext();
238: if (ctx != null) {
239: Xid xid = ctx.getXid();
240: if (xid != null) {
241: xaTerminator.endWork(wrapper.getWork(), xid);
242: }
243: }
244: if (trace)
245: log.trace("Ended work " + wrapper);
246: }
247:
248: /**
249: * Cancel work
250: *
251: * @param wrapper the work wrapper
252: * @throws WorkException for any error
253: */
254: protected void cancelWork(WorkWrapper wrapper) {
255: if (trace)
256: log.trace("Cancel work " + wrapper);
257:
258: ExecutionContext ctx = wrapper.getExecutionContext();
259: if (ctx != null) {
260: Xid xid = ctx.getXid();
261: if (xid != null) {
262: xaTerminator.cancelWork(wrapper.getWork(), xid);
263: }
264: }
265: if (trace)
266: log.trace("Canceled work " + wrapper);
267: }
268: }
|