001: /*--
002:
003: Copyright (C) 2002-2005 Adrian Price.
004: All rights reserved.
005:
006: Redistribution and use in source and binary forms, with or without
007: modification, are permitted provided that the following conditions
008: are met:
009:
010: 1. Redistributions of source code must retain the above copyright
011: notice, this list of conditions, and the following disclaimer.
012:
013: 2. Redistributions in binary form must reproduce the above copyright
014: notice, this list of conditions, and the disclaimer that follows
015: these conditions in the documentation and/or other materials
016: provided with the distribution.
017:
018: 3. The names "OBE" and "Open Business Engine" must not be used to
019: endorse or promote products derived from this software without prior
020: written permission. For written permission, please contact
021: adrianprice@sourceforge.net.
022:
023: 4. Products derived from this software may not be called "OBE" or
024: "Open Business Engine", nor may "OBE" or "Open Business Engine"
025: appear in their name, without prior written permission from
026: Adrian Price (adrianprice@users.sourceforge.net).
027:
028: THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
029: WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
030: OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
031: DISCLAIMED. IN NO EVENT SHALL THE AUTHOR(S) BE LIABLE FOR ANY DIRECT,
032: INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
033: (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
034: SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
035: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
036: STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
037: IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
038: POSSIBILITY OF SUCH DAMAGE.
039:
040: For more information on OBE, please see
041: <http://obe.sourceforge.net/>.
042:
043: */
044:
045: package org.obe.engine.async;
046:
047: import org.apache.commons.logging.Log;
048: import org.apache.commons.logging.LogFactory;
049: import org.apache.commons.pool.BasePoolableObjectFactory;
050: import org.apache.commons.pool.ObjectPool;
051: import org.apache.commons.pool.PoolableObjectFactory;
052: import org.apache.commons.pool.impl.GenericObjectPool;
053: import org.obe.spi.service.AsyncManager;
054: import org.obe.spi.service.ServerConfig;
055: import org.obe.spi.service.ServiceManager;
056: import org.obe.spi.util.AsyncStartActivity;
057: import org.obe.spi.util.AsyncStartProcess;
058:
059: /**
060: * A basic async execution service. This implementation does not provide
061: * persistence (nor does it support recovery).
062: *
063: * @author Adrian Price
064: */
065: public class BasicAsyncManager implements AsyncManager {
066: private static final Log _logger = LogFactory
067: .getLog(BasicAsyncManager.class);
068: private final ServiceManager _svcMgr;
069: private final PoolableObjectFactory _factory = new ThreadFactory();
070: private final ObjectPool _threadPool;
071:
072: private final class AsyncThread extends Thread {
073: private Runnable _target;
074: private volatile boolean _running;
075:
076: AsyncThread(ThreadGroup group, String name) {
077: super (group, name);
078: start();
079: }
080:
081: synchronized void runTarget(Runnable target) {
082: if (target == null)
083: throw new IllegalArgumentException();
084: _target = target;
085: notify();
086: }
087:
088: public synchronized void start() {
089: if (_running)
090: throw new IllegalStateException();
091: _running = true;
092: super .start();
093: }
094:
095: synchronized void terminate() {
096: _running = false;
097: notify();
098: }
099:
100: public void run() {
101: while (_running) {
102: Runnable target;
103: synchronized (this ) {
104: if (_target == null) {
105: try {
106: wait();
107: } catch (InterruptedException e) {
108: continue;
109: }
110: }
111: target = _target;
112: _target = null;
113: }
114: try {
115: target.run();
116: } catch (Exception e) {
117: _logger.error("Error executing async request", e);
118: } finally {
119: // When we're done, we must return the thread to the pool.
120: try {
121: _threadPool.returnObject(this );
122: } catch (Exception e) {
123: _logger.error(
124: "Error returning async thread to pool",
125: e);
126: }
127: }
128: }
129: }
130: }
131:
132: private class ThreadFactory extends BasePoolableObjectFactory {
133: private final ThreadGroup _threadGroup = new ThreadGroup(
134: "obe-async");
135: private int threadNum;
136:
137: public Object makeObject() throws Exception {
138: return new AsyncThread(_threadGroup, "thread-"
139: + threadNum++);
140: }
141: }
142:
143: public BasicAsyncManager(ServiceManager svcMgr) {
144: _svcMgr = svcMgr;
145: _threadPool = new GenericObjectPool(_factory, ServerConfig
146: .getAsyncThreadpoolSize(),
147: GenericObjectPool.WHEN_EXHAUSTED_BLOCK, -1);
148: }
149:
150: public void asyncRequest(AsyncRequest request) {
151: try {
152: AsyncThread thread = (AsyncThread) _threadPool
153: .borrowObject();
154: if (_logger.isDebugEnabled()) {
155: _logger.debug("Enqueuing request " + request + " on "
156: + thread);
157: }
158: thread.runTarget(request);
159: } catch (Exception e) {
160: _logger.error(e);
161: }
162: }
163:
164: public void asyncStartActivity(String processInstanceId,
165: String activityInstanceId) {
166:
167: asyncRequest(new AsyncStartActivity(processInstanceId,
168: activityInstanceId));
169: }
170:
171: public void asyncStartProcess(String processInstanceId) {
172: asyncRequest(new AsyncStartProcess(processInstanceId));
173: }
174:
175: public ServiceManager getServiceManager() {
176: return _svcMgr;
177: }
178:
179: public String getServiceName() {
180: return SERVICE_NAME;
181: }
182:
183: public void init() {
184: }
185:
186: public void exit() {
187: }
188: }
|