001: /*--
002:
003: Copyright (C) 2002-2005 Adrian Price.
004: All rights reserved.
005:
006: Redistribution and use in source and binary forms, with or without
007: modification, are permitted provided that the following conditions
008: are met:
009:
010: 1. Redistributions of source code must retain the above copyright
011: notice, this list of conditions, and the following disclaimer.
012:
013: 2. Redistributions in binary form must reproduce the above copyright
014: notice, this list of conditions, and the disclaimer that follows
015: these conditions in the documentation and/or other materials
016: provided with the distribution.
017:
018: 3. The names "OBE" and "Open Business Engine" must not be used to
019: endorse or promote products derived from this software without prior
020: written permission. For written permission, please contact
021: adrianprice@sourceforge.net.
022:
023: 4. Products derived from this software may not be called "OBE" or
024: "Open Business Engine", nor may "OBE" or "Open Business Engine"
025: appear in their name, without prior written permission from
026: Adrian Price (adrianprice@users.sourceforge.net).
027:
028: THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
029: WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
030: OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
031: DISCLAIMED. IN NO EVENT SHALL THE AUTHOR(S) BE LIABLE FOR ANY DIRECT,
032: INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
033: (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
034: SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
035: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
036: STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
037: IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
038: POSSIBILITY OF SUCH DAMAGE.
039:
040: For more information on OBE, please see
041: <http://obe.sourceforge.net/>.
042:
043: */
044:
045: package org.obe.engine.persistence.memory;
046:
047: import org.apache.commons.logging.Log;
048: import org.apache.commons.logging.LogFactory;
049: import org.obe.client.api.repository.ObjectAlreadyExistsException;
050: import org.obe.client.api.repository.ObjectNotFoundException;
051: import org.obe.client.api.repository.RepositoryException;
052: import org.obe.engine.repository.AbstractRepository;
053: import org.obe.engine.util.AttributeFilter;
054: import org.obe.spi.service.ApplicationEventBroker;
055: import org.obe.spi.service.ProcessRepository;
056: import org.obe.spi.service.ServerConfig;
057: import org.obe.spi.service.ServiceManager;
058: import org.obe.util.CommonConfig;
059: import org.obe.xpdl.model.misc.PublicationStatus;
060: import org.obe.xpdl.model.misc.RedefinableHeader;
061: import org.obe.xpdl.model.pkg.XPDLPackage;
062: import org.obe.xpdl.model.workflow.WorkflowProcess;
063: import org.obe.client.api.xpdl.PackageValidator;
064: import org.obe.xpdl.parser.XPDLParser;
065: import org.obe.xpdl.parser.dom4j.Dom4JXPDLParser;
066: import org.obe.xpdl.serializer.XPDLSerializer;
067: import org.obe.xpdl.serializer.XPDLSerializerException;
068: import org.obe.xpdl.serializer.dom4j.Dom4JXPDLSerializer;
069: import org.wfmc.wapi.WMFilter;
070: import org.wfmc.wapi.WMProcessDefinitionState;
071: import org.wfmc.wapi.WMWorkflowException;
072:
073: import java.io.*;
074: import java.util.*;
075:
076: /**
077: * Provides file system-based storage for packages and process definitions.
078: *
079: * @author Anthony Eden
080: * @author Adrian Price
081: */
082: public final class BasicProcessRepository extends AbstractRepository
083: implements ProcessRepository {
084:
085: private static final Log _logger = LogFactory
086: .getLog(BasicProcessRepository.class);
087: private static final PackageValidator _validator = new PackageValidator();
088: private static final WMFilter[] EMPTY_FILTERS = {};
089: private static final String STATES_FILE = "states.properties";
090: private static final String DEFAULT_EXT = ".xpdl";
091: private static final String[] _extensions = { "xml", "xpdl" };
092: private static final long POLL_INTERVAL = ServerConfig
093: .getDirectoryPollInterval();
094: private final Thread _pollerThread = new Thread(
095: new DirectoryPoller());
096: private final Map _packages = new HashMap();
097: private final Map _timestamps = new HashMap();
098: private final Map _ignore = new HashMap();
099: private final Map _workflows = new HashMap();
100: private final Properties _workflowStates = new Properties();
101: private File _procDir;
102:
103: private class DirectoryPoller implements Runnable {
104: public void run() {
105: _logger.info("Polling process directory every "
106: + POLL_INTERVAL + " milliseconds");
107:
108: do {
109: try {
110: Thread.sleep(POLL_INTERVAL);
111: loadProcessDirectory();
112: } catch (InterruptedException e) {
113: // Just wake up and look for work.
114: } catch (Exception e) {
115: _logger
116: .error("Error scanning process directory",
117: e);
118: }
119: } while (isInitialized());
120:
121: _logger.info("Process directory polling stopped");
122: }
123: }
124:
125: public BasicProcessRepository(ServiceManager svcMgr) {
126: super (svcMgr, null);
127: _pollerThread.setDaemon(true);
128: _pollerThread.setName("ProcessDirectoryPoller");
129: }
130:
131: public synchronized void exit() {
132: super .exit();
133: _packages.clear();
134: _timestamps.clear();
135: _ignore.clear();
136: _workflows.clear();
137: _workflowStates.clear();
138: _procDir = null;
139: _pollerThread.interrupt();
140: }
141:
142: protected synchronized void load() throws IOException,
143: RepositoryException {
144: // Check/create config & processes directory.
145: File configDir = CommonConfig.getConfigDir();
146: if (!configDir.exists())
147: configDir.mkdirs();
148: File procDir = new File(configDir, "processes");
149: if (!procDir.exists()) {
150: procDir.mkdir();
151: } else if (!procDir.isDirectory()) {
152: throw new RepositoryException(procDir
153: + " is not a directory");
154: } else if (!procDir.canRead()) {
155: throw new RepositoryException("Cannot read from directory "
156: + procDir);
157: }
158: _procDir = procDir;
159:
160: // Read the workflow states from the properties file.
161: readWorkflowStates();
162:
163: // Load process definitions into memory.
164: loadProcessDirectory();
165:
166: if (POLL_INTERVAL > 0)
167: _pollerThread.start();
168: }
169:
170: public synchronized void purge() throws RepositoryException {
171: ApplicationEventBroker eventBroker = _svcMgr
172: .getApplicationEventBroker();
173: String[] keys = new String[1];
174: File[] files = scanProcessDirectory();
175: for (int i = 0; i < files.length; i++) {
176: File file = files[i];
177: String filename = file.getName();
178: int index = filename.lastIndexOf('.');
179: String extension = filename.substring(index);
180: if (extension.equalsIgnoreCase(".xpdl")) {
181: String pkgId = filename.substring(0, index);
182: XPDLPackage pkg = (XPDLPackage) _packages.get(pkgId);
183: if (pkg != null) {
184: WorkflowProcess[] workflows = pkg
185: .getWorkflowProcess();
186: for (int j = 0; j < workflows.length; j++) {
187: keys[0] = workflows[j].getProcessDefinitionId();
188: eventBroker.unsubscribe(keys, false);
189: }
190: }
191: _svcMgr.getApplicationEventBroker().unsubscribe(keys,
192: false);
193: if (!file.delete()) {
194: _logger.warn("Unable to delete file: " + file);
195: }
196: }
197: }
198: _packages.clear();
199: _timestamps.clear();
200: _ignore.clear();
201: _workflows.clear();
202: _workflowStates.clear();
203: writeWorkflowStates();
204: }
205:
206: /**
207: * Creates a package using the supplied content.
208: *
209: * @param pkg The pre-parsed package object.
210: * @throws RepositoryException
211: */
212: public synchronized void createPackage(XPDLPackage pkg)
213: throws RepositoryException {
214:
215: createPackage(pkg, true);
216: }
217:
218: private void createPackage(XPDLPackage pkg, boolean writeToDisk)
219: throws RepositoryException {
220:
221: // Register the Package, its WorkflowProcesses and their states.
222: String pkgId = pkg.getId();
223: if (_packages.containsKey(pkgId)) {
224: throw new ObjectAlreadyExistsException(pkgId);
225: }
226: _packages.put(pkgId, pkg);
227: WorkflowProcess[] workflows = pkg.getWorkflowProcess();
228: for (int i = 0, n = workflows.length; i < n; i++) {
229: WorkflowProcess workflow = workflows[i];
230: String procDefId = workflow.getId();
231: _workflows.put(procDefId, workflow);
232: String state = _workflowStates.getProperty(procDefId);
233: if (state == null) {
234: _workflowStates.setProperty(procDefId,
235: state = WMProcessDefinitionState.ENABLED_TAG);
236: }
237: workflow.setState(WMProcessDefinitionState.valueOf(state)
238: .value());
239: }
240:
241: try {
242: _svcMgr.getEngine().createTriggers(pkg);
243: } catch (WMWorkflowException e) {
244: throw new RepositoryException(e);
245: }
246:
247: // Write the XPDL file to disk, basing the filename on the package ID.
248: if (writeToDisk) {
249: writePackage(pkg);
250: writeWorkflowStates();
251: }
252: }
253:
254: /**
255: * Permanently deletes the specified package.
256: *
257: * @param packageId The package ID.
258: * @throws RepositoryException
259: */
260: public synchronized void deletePackage(String packageId)
261: throws RepositoryException {
262:
263: XPDLPackage pkg = (XPDLPackage) _packages.remove(packageId);
264: if (pkg == null) {
265: throw new ObjectNotFoundException(packageId);
266: }
267: _timestamps.remove(packageId);
268: WorkflowProcess[] workflows = pkg.getWorkflowProcess();
269: for (int i = 0, n = workflows.length; i < n; i++) {
270: WorkflowProcess workflow = workflows[i];
271: String id = workflow.getId();
272: _workflows.remove(id);
273: _workflowStates.remove(id);
274: }
275: writeWorkflowStates();
276:
277: try {
278: _svcMgr.getEngine().deleteTriggers(pkg, false);
279: } catch (WMWorkflowException e) {
280: throw new RepositoryException(e);
281: }
282:
283: // Erase the underlying XPDL file.
284: File file = new File(_procDir, packageId + '.' + DEFAULT_EXT);
285: if (file.exists() && file.isFile()) {
286: if (!file.delete()) {
287: _logger.warn("Unable to delete file: " + file);
288: }
289: }
290: }
291:
292: /**
293: * Retrieves the specified package.
294: *
295: * @param packageId The ID of the package to retrieve.
296: * @return The requested package.
297: */
298: public synchronized XPDLPackage findPackage(String packageId)
299: throws RepositoryException {
300:
301: XPDLPackage pkg = (XPDLPackage) _packages.get(packageId);
302: if (pkg == null) {
303: throw new ObjectNotFoundException(packageId);
304: }
305: return pkg;
306: }
307:
308: /**
309: * Retrieves a set of packages.
310: *
311: * @param filter A package filter specification.
312: * @param countFlag Flag to return just a count of the matching packages.
313: * @return An array of matching packages.
314: */
315: public synchronized XPDLPackage[] findPackages(WMFilter filter,
316: boolean countFlag) throws RepositoryException {
317:
318: return (XPDLPackage[]) AttributeFilter.findByFilter(
319: filter == null ? EMPTY_FILTERS
320: : new WMFilter[] { filter }, XPDLPackage.class,
321: _packages.values(), countFlag);
322: }
323:
324: /**
325: * Sets the content of the specified package.
326: *
327: * @param pkg The package to write.
328: * @throws RepositoryException
329: */
330: public synchronized void updatePackage(XPDLPackage pkg)
331: throws RepositoryException {
332:
333: updatePackage(pkg, true);
334: }
335:
336: private synchronized void updatePackage(XPDLPackage pkg,
337: boolean writeToDisk) throws RepositoryException {
338:
339: // Make sure the package already exists.
340: XPDLPackage oldPkg = findPackage(pkg.getId());
341:
342: // Remember the old workflow states so that we can reapply them.
343: WorkflowProcess[] workflows = oldPkg.getWorkflowProcess();
344: WorkflowProcess workflow;
345: String procDefId;
346: Properties oldStates = new Properties();
347: for (int i = 0, n = workflows.length; i < n; i++) {
348: workflow = workflows[i];
349: procDefId = workflow.getId();
350: _workflows.remove(procDefId);
351: oldStates.setProperty(procDefId, (String) _workflowStates
352: .remove(procDefId));
353: }
354:
355: // Register the package and its workflows, reapply workflow states.
356: _packages.put(pkg.getId(), pkg);
357: workflows = pkg.getWorkflowProcess();
358: for (int i = 0, n = workflows.length; i < n; i++) {
359: workflow = workflows[i];
360: procDefId = workflow.getId();
361: String state = oldStates.getProperty(procDefId,
362: WMProcessDefinitionState.ENABLED_TAG);
363: workflow.setState(WMProcessDefinitionState.valueOf(state)
364: .value());
365: _workflows.put(procDefId, workflow);
366: _workflowStates.setProperty(procDefId, state);
367: }
368:
369: try {
370: _svcMgr.getEngine().updateTriggers(pkg);
371: } catch (WMWorkflowException e) {
372: throw new RepositoryException(e);
373: }
374:
375: // Write the XPDL file to disk, basing the filename on the package ID.
376: if (writeToDisk) {
377: writePackage(pkg);
378: writeWorkflowStates();
379: }
380: }
381:
382: /**
383: * Retrieves a process definition in executable form.
384: *
385: * @param processDefinitionId The process definition ID.
386: * @return The requested process definition.
387: */
388: public synchronized WorkflowProcess findWorkflowProcess(
389: String processDefinitionId) throws RepositoryException {
390:
391: WorkflowProcess workflow = (WorkflowProcess) _workflows
392: .get(processDefinitionId);
393: if (workflow == null)
394: throw new ObjectNotFoundException(processDefinitionId);
395: return workflow;
396: }
397:
398: public WorkflowProcess[] findWorkflowProcesses(String name,
399: boolean validOnly) {
400:
401: Collection workflows = new ArrayList();
402: for (Iterator iter = _workflows.values().iterator(); iter
403: .hasNext();) {
404: WorkflowProcess workflow = (WorkflowProcess) iter.next();
405: if (workflow.getName().equals(name)
406: && !validOnly
407: || workflow.getState() == WMProcessDefinitionState.ENABLED_INT) {
408:
409: RedefinableHeader rhdr = workflow
410: .getRedefinableHeader();
411: if (!validOnly
412: || rhdr == null
413: || rhdr.getPublicationStatus() != PublicationStatus.UNDER_REVISION) {
414:
415: workflows.add(workflow);
416: }
417: }
418: }
419: return (WorkflowProcess[]) workflows
420: .toArray(new WorkflowProcess[workflows.size()]);
421: }
422:
423: /**
424: * Retrieves a list of process definitions.
425: *
426: * @param filter A process definition filter specification.
427: * @return An array of matching process definitions.
428: */
429: public synchronized WorkflowProcess[] findWorkflowProcesses(
430: WMFilter filter, boolean countFlag)
431: throws RepositoryException {
432:
433: return (WorkflowProcess[]) AttributeFilter.findByFilter(
434: filter == null ? EMPTY_FILTERS
435: : new WMFilter[] { filter },
436: WorkflowProcess.class, _workflows.values(), countFlag);
437: }
438:
439: public synchronized int findProcessDefinitionState(
440: String processDefinitionId) throws RepositoryException {
441:
442: return findWorkflowProcess(processDefinitionId).getState();
443: }
444:
445: /**
446: * Changes the process definition state.
447: *
448: * @param processDefinitionId The process definition id
449: * @param newState The new process definition state
450: * @throws RepositoryException Workflow client exception
451: */
452: public synchronized void updateProcessDefinitionState(
453: String processDefinitionId, int newState)
454: throws RepositoryException {
455:
456: WMProcessDefinitionState state = WMProcessDefinitionState
457: .valueOf(newState);
458: WorkflowProcess workflow = (WorkflowProcess) _workflows
459: .get(processDefinitionId);
460: workflow.setState(newState);
461: _workflowStates.setProperty(processDefinitionId, state
462: .toString());
463: writeWorkflowStates();
464: }
465:
466: protected Log getLogger() {
467: return _logger;
468: }
469:
470: private synchronized void loadProcessDirectory()
471: throws IOException, RepositoryException {
472:
473: // Track whether states.properties needs re-writing.
474: boolean statesModified = false;
475:
476: // Scan process directory and extract filename roots.
477: File[] files = scanProcessDirectory();
478: String[] roots = new String[files.length];
479: for (int i = 0; i < files.length; i++) {
480: File file = files[i];
481: String name = file.getName();
482: int index = name.lastIndexOf('.');
483: roots[i] = name.substring(0, index);
484: }
485:
486: // Remove deleted workflows and states.
487: for (Iterator iter = _packages.entrySet().iterator(); iter
488: .hasNext();) {
489: Map.Entry entry = (Map.Entry) iter.next();
490: String pkgId = (String) entry.getKey();
491: boolean onDisk = false;
492: for (int j = 0; j < roots.length; j++) {
493: if (roots[j].equalsIgnoreCase(pkgId)) {
494: onDisk = true;
495: break;
496: }
497: }
498: if (!onDisk) {
499: XPDLPackage pkg = (XPDLPackage) entry.getValue();
500: for (int j = 0, n = pkg.getWorkflowProcess().length; j < n; j++) {
501:
502: WorkflowProcess workflow = pkg
503: .getWorkflowProcess(j);
504: String procDefId = workflow.getId();
505: _workflows.remove(procDefId);
506: _timestamps.remove(procDefId);
507: _workflowStates.remove(procDefId);
508: statesModified = true;
509: }
510: try {
511: _svcMgr.getEngine().deleteTriggers(pkg, false);
512: } catch (WMWorkflowException e) {
513: throw new RepositoryException(e);
514: }
515:
516: iter.remove();
517: _logger.info("Removed deleted package: " + pkg.getId());
518: }
519: }
520:
521: // Load any new files.
522: XPDLParser parser = new Dom4JXPDLParser();
523: for (int i = 0; i < files.length; i++) {
524: File file = files[i];
525: String root = roots[i];
526:
527: // If we overwrote this entry when renaming another file, skip it.
528: if (root == null) {
529: _logger.info("Skipping overwritten file: " + files[i]);
530: continue;
531: }
532:
533: // Ignore existing file if it hasn't changed or has problems.
534: boolean pkgExists = _packages.containsKey(root);
535: Long lastModified = new Long(file.lastModified());
536: Object timestamp = _timestamps.get(root);
537: if (pkgExists
538: && (timestamp == null || timestamp
539: .equals(lastModified))
540: || _ignore.containsKey(root)
541: && _ignore.get(root).equals(lastModified)) {
542:
543: continue;
544: }
545:
546: FileInputStream in = null;
547: try {
548: // Parse the XPDL file and register the contents.
549: in = new FileInputStream(file);
550: XPDLPackage pkg = parser.parse(in);
551:
552: // Make sure the filename matches the package ID.
553: String pkgId = pkg.getId();
554: if (!pkgId.equals(root)) {
555: File correctFile = new File(_procDir, pkgId
556: + DEFAULT_EXT);
557: if (correctFile.exists()) {
558: // If another file with the correct name already exists
559: // but is the same or a later version, ignore the
560: // incoming file.
561: if (correctFile.lastModified() >= file
562: .lastModified()) {
563: _logger
564: .warn("Unable to rename file '"
565: + file
566: + "' to match XPDL package ID: "
567: + pkgId
568: + ", because the target file already exists and"
569: + " is more recent.");
570: _ignore.put(root, lastModified);
571: continue;
572: }
573: // Otherwise, replace the old version with the new one.
574: if (!correctFile.delete()) {
575: _logger
576: .warn("Unable to delete outdated file '"
577: + file + '\'');
578: _ignore.put(root, lastModified);
579: continue;
580: }
581: _logger.info("Replacing outdated file '" + file
582: + '\'');
583: }
584: if (!file.renameTo(correctFile)) {
585: _logger.warn("Unable to rename file '" + file
586: + "' to match XPDL package ID: "
587: + pkgId + ", ignoring.");
588: _ignore.put(root, lastModified);
589: continue;
590: }
591: pkgExists = _packages.containsKey(pkgId);
592: _logger.warn("Renamed file '" + file
593: + "' to match XPDL package ID: " + pkgId);
594:
595: // Flag over-written files, to avoid processing them twice.
596: for (int j = i + 1; j < roots.length; j++) {
597: String s = roots[j];
598: if (s != null && s.equals(pkgId)) {
599: roots[j] = null;
600: }
601: }
602: file = correctFile;
603: }
604: _validator.validate(pkg, true);
605: _timestamps.put(pkgId, lastModified);
606:
607: // Register the package contents.
608: if (pkgExists) {
609: updatePackage(pkg, false);
610: _logger.info("Updated package: " + pkgId);
611: } else {
612: createPackage(pkg, false);
613: _logger.info("Loaded package: " + pkgId);
614: }
615: statesModified = true;
616: } catch (Exception e) {
617: _logger.error("Error processing XPDL file " + file, e);
618: _ignore.put(root, lastModified);
619: } finally {
620: if (in != null) {
621: in.close();
622: }
623: }
624: }
625:
626: if (statesModified)
627: writeWorkflowStates();
628: }
629:
630: private File[] scanProcessDirectory() {
631: return _procDir.listFiles(new FileFilter() {
632: // Only process *.xml and *.xpdl files.
633: public boolean accept(File pathname) {
634: if (!pathname.isFile())
635: return false;
636: String name = pathname.getName();
637: int index = name.lastIndexOf('.');
638: String ext = index == -1 ? null : name
639: .substring(index + 1);
640: for (int i = 0; i < _extensions.length; i++) {
641: if (_extensions[i].equalsIgnoreCase(ext))
642: return true;
643: }
644: return false;
645: }
646: });
647: }
648:
649: private void writePackage(XPDLPackage pkg)
650: throws RepositoryException {
651: String pkgId = pkg.getId();
652: File file = new File(_procDir, pkgId + DEFAULT_EXT);
653: try {
654: XPDLSerializer serializer = new Dom4JXPDLSerializer();
655: OutputStream out = new FileOutputStream(file);
656: try {
657: serializer.serialize(pkg, out);
658: } finally {
659: out.close();
660: }
661: _timestamps.put(pkgId, new Long(file.lastModified()));
662: } catch (IOException e) {
663: throw new RepositoryException(e);
664: } catch (XPDLSerializerException e) {
665: throw new RepositoryException(e);
666: }
667: }
668:
669: private void readWorkflowStates() throws RepositoryException {
670: _workflowStates.clear();
671: try {
672: File file = new File(_procDir, STATES_FILE);
673: InputStream in = new FileInputStream(file);
674: try {
675: _workflowStates.load(in);
676: } finally {
677: in.close();
678: }
679: } catch (FileNotFoundException e) {
680: // A missing properties file is not an error.
681: } catch (IOException e) {
682: throw new RepositoryException(e);
683: }
684: }
685:
686: private void writeWorkflowStates() throws RepositoryException {
687: try {
688: File file = new File(_procDir, STATES_FILE);
689: OutputStream out = new FileOutputStream(file);
690: try {
691: _workflowStates.store(out, "WorkflowProcess States");
692: } finally {
693: out.close();
694: }
695: } catch (IOException e) {
696: throw new RepositoryException(e);
697: }
698: }
699:
700: public String getServiceName() {
701: return SERVICE_NAME;
702: }
703: }
|