001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017: package org.bpmscript;
018:
019: import java.lang.reflect.InvocationTargetException;
020: import java.sql.Timestamp;
021: import java.util.ArrayList;
022: import java.util.Collections;
023: import java.util.Comparator;
024: import java.util.Iterator;
025: import java.util.LinkedList;
026: import java.util.List;
027: import java.util.Map;
028: import java.util.Queue;
029: import java.util.UUID;
030: import java.util.concurrent.ConcurrentHashMap;
031:
032: import org.apache.commons.beanutils.BeanUtils;
033:
034: public class ProcessManager implements IProcessManager,
035: IProcessInstanceManager {
036:
037: protected Map<String, Process> processesById = new ConcurrentHashMap<String, Process>();
038: protected Map<String, Process> processesByName = new ConcurrentHashMap<String, Process>();
039: protected Map<String, ProcessInstance> processInstances = new ConcurrentHashMap<String, ProcessInstance>();
040: protected Queue<String> completedQueue = new LinkedList<String>();
041: protected Map<String, List<IProcessInstance>> processInstanceChildren = new ConcurrentHashMap<String, List<IProcessInstance>>();
042: protected Map<String, ProcessSource> mainSources = new ConcurrentHashMap<String, ProcessSource>();
043: protected Map<String, ProcessSource> processSourcesById = new ConcurrentHashMap<String, ProcessSource>();
044: protected Map<String, List<ProcessSource>> processSources = new ConcurrentHashMap<String, List<ProcessSource>>();
045:
046: protected int maxInstances = Integer.MAX_VALUE;
047: protected int maxCompleted = 1000;
048:
049: public String createProcessInstance(String parentProcessInstanceId,
050: String processName, String operation)
051: throws BpmScriptException {
052: if (processInstances.size() > maxInstances) {
053: throw new BpmScriptException(
054: "Can't create any more process instances as maximum has been reached "
055: + maxInstances);
056: }
057: Process process = processesByName.get(processName);
058: ProcessInstance processInstance = new ProcessInstance(UUID
059: .randomUUID().toString(), parentProcessInstanceId,
060: operation, process);
061: processInstance.setProcessState(ProcessState.CREATED);
062: processInstances.put(processInstance.getId(), processInstance);
063: if (parentProcessInstanceId != null) {
064: List<IProcessInstance> children = processInstanceChildren
065: .get(parentProcessInstanceId);
066: if (children == null) {
067: children = new ArrayList<IProcessInstance>();
068: processInstanceChildren.put(parentProcessInstanceId,
069: children);
070: }
071: children.add(processInstance);
072: }
073: return processInstance.getId();
074: }
075:
076: public ExecutorResult doWithProcessInstance(
077: String processInstanceId, IProcessInstanceCallback callback)
078: throws Throwable {
079: IProcessInstance instance = checkoutProcessInstance(processInstanceId);
080: synchronized (instance) {
081: ExecutorResult executorResult = callback.execute(instance);
082: if (executorResult.getProcessState() == ProcessState.PAUSED) {
083: PausedResult pausedResult = ((PausedResult) executorResult);
084: checkinInstance(processInstanceId, pausedResult
085: .getContinuation(), pausedResult
086: .getStackTraceElements());
087: } else if (executorResult.getProcessState() == ProcessState.COMPLETED) {
088: complete(processInstanceId);
089: } else if (executorResult.getProcessState() == ProcessState.FAILED) {
090: FailedResult failedResult = ((FailedResult) executorResult);
091: Throwable throwable = failedResult.getThrowable();
092: failed(processInstanceId, throwable, failedResult
093: .getStackTraceElements());
094: throw throwable;
095: } else {
096: // noop
097: }
098: return executorResult;
099: }
100: }
101:
102: public IProcessInstance checkoutProcessInstance(
103: String processInstanceId) {
104: ProcessInstance instance = processInstances
105: .get(processInstanceId);
106: // instance.setProcessState(ProcessState.RUNNING);
107: instance.setLastModified(new Timestamp(System
108: .currentTimeMillis()));
109: return instance;
110: }
111:
112: public void checkinInstance(String processInstanceId,
113: byte[] continuation, StackTraceElement[] stackTraceElements) {
114: ProcessInstance instance = processInstances
115: .get(processInstanceId);
116: instance.setContinuation(continuation);
117: instance.setLastModified(new Timestamp(System
118: .currentTimeMillis()));
119: instance.setStackTraceElements(stackTraceElements);
120: instance.setProcessState(ProcessState.PAUSED);
121: }
122:
123: public void complete(String processInstanceId) {
124: ProcessInstance instance = processInstances
125: .get(processInstanceId);
126: instance.setLastModified(new Timestamp(System
127: .currentTimeMillis()));
128: instance.setContinuation(null);
129: instance.setStackTraceElements(null);
130: instance.setProcessState(ProcessState.COMPLETED);
131: synchronized (completedQueue) {
132: completedQueue.add(processInstanceId);
133: if (completedQueue.size() >= maxCompleted) {
134: String oldId = completedQueue.poll();
135: processInstances.remove(oldId);
136: processInstanceChildren.remove(oldId);
137: }
138: }
139: }
140:
141: public void failed(String processInstanceId, Throwable ex,
142: StackTraceElement[] stackTraceElements) {
143: ProcessInstance instance = processInstances
144: .get(processInstanceId);
145: instance.setLastModified(new Timestamp(System
146: .currentTimeMillis()));
147: instance.setStackTraceElements(stackTraceElements);
148: instance.setThrowable(ex);
149: instance.setProcessState(ProcessState.FAILED);
150: }
151:
152: public IProcessSource getMainSource(String processId) {
153: return mainSources.get(processId);
154: }
155:
156: Object getProperty(Object object, String property)
157: throws BpmScriptException {
158: try {
159: return BeanUtils.getProperty(object, property);
160: } catch (IllegalAccessException e) {
161: throw new BpmScriptException(e);
162: } catch (InvocationTargetException e) {
163: throw new BpmScriptException(e);
164: } catch (NoSuchMethodException e) {
165: throw new BpmScriptException(e);
166: }
167: }
168:
169: public IPagedResult<IProcessInstance> getProcessInstances(
170: IQuery query) throws BpmScriptException {
171:
172: List<IProcessInstance> results = new ArrayList<IProcessInstance>(
173: processInstances.values());
174:
175: String filter = query.getFilter();
176: if (filter != null) {
177: filter = filter.trim().toUpperCase();
178: if (filter.length() > 0) {
179: for (Iterator resultIterator = results.iterator(); resultIterator
180: .hasNext();) {
181: IProcessInstance result = (IProcessInstance) resultIterator
182: .next();
183: if (!result.getProcess().getName().toUpperCase()
184: .contains(filter)) {
185: resultIterator.remove();
186: }
187: }
188: }
189: }
190:
191: final List<IOrderBy> orderBys = query.getOrderBys();
192: if (orderBys != null && orderBys.size() > 0) {
193:
194: try {
195: Collections.sort(results, new Comparator<Object>() {
196:
197: @SuppressWarnings("unchecked")
198: public int compare(Object o1, Object o2) {
199: for (IOrderBy orderBy : orderBys) {
200: try {
201: Comparable c1 = (Comparable) getProperty(
202: o1, orderBy.getField());
203: Comparable c2 = (Comparable) getProperty(
204: o2, orderBy.getField());
205: int compareTo = c1.compareTo(c2);
206: if (compareTo != 0) {
207: return orderBy.isAsc() ? compareTo
208: : -compareTo;
209: }
210: } catch (BpmScriptException e) {
211: throw new RuntimeException(e);
212: }
213: }
214: return 0;
215: }
216:
217: });
218: } catch (RuntimeException e) {
219: Throwable cause = e.getCause();
220: if (cause != null
221: && cause instanceof BpmScriptException) {
222: throw (BpmScriptException) cause;
223: } else {
224: throw e;
225: }
226: }
227: }
228: List<IProcessInstance> trimmedResults = null;
229: int lastResult = query.getFirstResult() + query.getMaxResults();
230:
231: if (query.getFirstResult() >= 0 && query.getMaxResults() >= 0) {
232: trimmedResults = new ArrayList<IProcessInstance>(results
233: .subList(query.getFirstResult(),
234: lastResult < results.size() ? lastResult
235: : results.size()));
236: } else {
237: trimmedResults = results;
238: }
239:
240: return new PagedResult<IProcessInstance>(trimmedResults,
241: lastResult > 0 && lastResult < results.size(), results
242: .size());
243: }
244:
245: public List<IProcess> getPrimaryProcesses() {
246: return new ArrayList<IProcess>(processesByName.values());
247: }
248:
249: public IProcess getProcess(String processId) {
250: return processesById.get(processId);
251: }
252:
253: public List<IProcessSource> getProcessSources(String processId) {
254: return new ArrayList<IProcessSource>(processSources
255: .get(processId));
256: }
257:
258: public void setProcessAsPrimary(String processId) {
259: Process process = processesById.get(processId);
260: Process oldProcess = processesByName.get(process.getName());
261: if (oldProcess != null) {
262: oldProcess.setPrimaryProcess(false);
263: oldProcess.setLastModified(new Timestamp(System
264: .currentTimeMillis()));
265: }
266: process.setLastModified(new Timestamp(System
267: .currentTimeMillis()));
268: process.setPrimaryProcess(true);
269: processesByName.put(process.getName(), process);
270: }
271:
272: public String createProcess(String source, String name,
273: boolean pinned) {
274: Process process = new Process();
275: process.setId(UUID.randomUUID().toString());
276: process.setName(name);
277: process.setPinned(pinned);
278: processesById.put(process.getId(), process);
279: return process.getId();
280: }
281:
282: public void addSourceToProcess(String processId, String name,
283: String source, boolean main) {
284: ProcessSource processSource = new ProcessSource();
285: processSource.setId(UUID.randomUUID().toString());
286: processSource.setMain(main);
287: processSource.setName(name);
288: processSource.setSource(source);
289: List<ProcessSource> sources = processSources.get(processId);
290: if (sources == null) {
291: sources = new ArrayList<ProcessSource>();
292: processSources.put(processId, sources);
293: }
294: sources.add(processSource);
295: processSourcesById.put(processSource.getId(), processSource);
296: if (main) {
297: mainSources.put(processId, processSource);
298: }
299: }
300:
301: public IProcessInstance getProcessInstance(String processInstanceId)
302: throws BpmScriptException {
303: return processInstances.get(processInstanceId);
304: }
305:
306: public IProcessSource getProcessSource(String processSourceId)
307: throws BpmScriptException {
308: return processSourcesById.get(processSourceId);
309: }
310:
311: public List<IProcessInstance> getChildProcessInstances(
312: String processInstanceId) throws BpmScriptException {
313: return processInstanceChildren.get(processInstanceId);
314: }
315:
316: public IProcess getPrimaryProcess(String name)
317: throws BpmScriptException {
318: return processesByName.get(name);
319: }
320:
321: public void setMaxCompleted(int maxCompleted) {
322: this .maxCompleted = maxCompleted;
323: }
324:
325: public void setMaxInstances(int maxInstances) {
326: this .maxInstances = maxInstances;
327: }
328:
329: public Timestamp getProcessInstanceVersion(String processInstanceId)
330: throws BpmScriptException {
331: return getProcessInstance(processInstanceId).getLastModified();
332: }
333:
334: /** this is slow but it's not really used **/
335: public List<IProcess> getProcessVersions(String name)
336: throws BpmScriptException {
337:
338: List<IProcess> result = new ArrayList<IProcess>();
339:
340: for (IProcess process : processesById.values()) {
341: if (process.getName().equals(name)) {
342: result.add(process);
343: }
344: }
345:
346: return result;
347: }
348:
349: }
|