001: package dalma.container;
002:
003: import dalma.Conversation;
004: import dalma.Description;
005: import dalma.Engine;
006: import dalma.EngineListener;
007: import dalma.Program;
008: import static dalma.container.WorkflowState.*;
009: import dalma.container.model.IllegalResourceException;
010: import dalma.container.model.InjectionException;
011: import dalma.container.model.Model;
012: import dalma.impl.EngineImpl;
013: import dalma.impl.Util;
014:
015: import javax.management.JMException;
016: import javax.management.ObjectName;
017: import java.io.BufferedInputStream;
018: import java.io.BufferedOutputStream;
019: import java.io.File;
020: import java.io.FileInputStream;
021: import java.io.FileOutputStream;
022: import java.io.IOException;
023: import java.io.InputStream;
024: import java.io.OutputStream;
025: import java.net.URL;
026: import java.text.ParseException;
027: import java.util.Enumeration;
028: import java.util.Map;
029: import java.util.Properties;
030: import java.util.Calendar;
031: import java.util.GregorianCalendar;
032: import java.util.jar.Manifest;
033: import java.util.logging.Level;
034: import java.util.logging.Logger;
035:
036: /**
037: * Wrapper around each workflow application.
038: *
039: * Each workflow application will have a separate {@link Engine}
040: * and {@link ClassLoader}.
041: *
042: * @author Kohsuke Kawaguchi
043: */
044: public final class WorkflowApplication implements
045: WorkflowApplicationMBean {
046: private static final Logger logger = Logger
047: .getLogger(WorkflowApplication.class.getName());
048:
049: private final String name;
050:
051: /**
052: * The engine that executes workflow.
053: *
054: * non-null in {@link WorkflowState#RUNNING} but null in other cases.
055: */
056: private Engine engine;
057:
058: /**
059: * The {@link ClassLoader} to load workflow applications.
060: *
061: * null if {@link WorkflowState#UNLOADED}.
062: */
063: private ClassLoaderImpl classLoader;
064:
065: /**
066: * The Main class in the {@link #classLoader}.
067: *
068: * The type isn't checked, hence the parameterization is '?'.
069: * null if {@link WorkflowState#UNLOADED}.
070: */
071: private Class<?> mainClass;
072:
073: /**
074: * The model of the {@link #mainClass}.
075: *
076: * Null when {@link #mainClass} is null.
077: */
078: private Model<?> model;
079:
080: /**
081: * The {@link Container} that owns this.
082: */
083: public final Container owner;
084:
085: /**
086: * Root of the working directory.
087: */
088: private final File workDir;
089:
090: /**
091: * Root of the class directory.
092: * <tt>DALMA_HOME/apps/<name></tt>
093: */
094: private final File appDir;
095:
096: /**
097: * Configuration property file.
098: */
099: private final File confFile;
100:
101: /**
102: * The workflow application. Null when not started.
103: */
104: private Program program;
105:
106: private ObjectName objectName;
107:
108: private WorkflowState state;
109:
110: private int logRotationDays = -1;
111: private final LogRotationPolicy logPolicy = new LogRotationPolicy() {
112: public boolean keep(Conversation conv) {
113: if (logRotationDays == -1)
114: return true; // no rotation
115:
116: Calendar cal = new GregorianCalendar();
117: cal.add(Calendar.DAY_OF_YEAR, -logRotationDays);
118:
119: return conv.getCompletionDate().getTime() > cal
120: .getTimeInMillis();
121: }
122: };
123:
124: private final CompletedConversationList ccList;
125:
126: public WorkflowApplication(Container owner, File appDir)
127: throws FailedOperationException {
128: this .owner = owner;
129: this .name = appDir.getName();
130: this .workDir = new File(new File(owner.getHomeDir(), "work"),
131: name);
132: this .confFile = new File(new File(new File(owner.getHomeDir(),
133: "conf"), "apps"), name + ".properties");
134: this .appDir = appDir;
135: this .state = UNLOADED;
136:
137: File clog = new File(workDir, "completed-logs");
138: clog.mkdirs();
139: this .ccList = new CompletedConversationList(clog);
140: this .ccList.setPolicy(logPolicy);
141:
142: try {
143: Properties props = loadConfigProperties();
144: this .logRotationDays = Integer.parseInt(props.getProperty(
145: LOG_ROTATION_KEY, "-1"));
146: } catch (IOException e) {
147: throw new FailedOperationException(
148: "Failed to load configuration " + confFile, e);
149: }
150:
151: try {
152: objectName = new ObjectName("dalma:container="
153: + ObjectName.quote(owner.getHomeDir().toString())
154: + ",name=" + name);
155: MBeanProxy.register(owner.mbeanServer, objectName,
156: WorkflowApplicationMBean.class, this );
157: } catch (JMException e) {
158: logger.log(Level.WARNING, "Failed to register to JMX", e);
159: }
160:
161: load();
162: }
163:
164: /**
165: * Sets the # of days the completed conversation logs are kept.
166: *
167: * @param d
168: * -1 to keep indefinitely.
169: */
170: public void setLogRotationDays(int d) throws IOException {
171: logRotationDays = d;
172: Properties props = loadConfigProperties();
173: props.setProperty(LOG_ROTATION_KEY, String.valueOf(d));
174: saveConfigProperties(props);
175: ccList.setPolicy(logPolicy);
176: }
177:
178: /**
179: * Gets the # of days the completed conversation logs are kept.
180: *
181: * Defaults to -1 (keep indefinitely)
182: */
183: public int getLogRotationDays() {
184: return logRotationDays;
185: }
186:
187: public boolean isConfigured() {
188: if (state == UNLOADED)
189: return false;
190: try {
191: return model.checkConfiguration(loadConfigProperties());
192: } catch (IOException e) {
193: log("Failed to check the configuration", e);
194: return false;
195: }
196: }
197:
198: public synchronized void load() throws FailedOperationException {
199: if (state != UNLOADED)
200: return; // nothing to do
201:
202: logger.info("Loading " + name);
203:
204: try {
205: classLoader = createClassLoader();
206: } catch (IOException e) {
207: throw new FailedOperationException(
208: "Failed to set up a ClassLoader", e);
209: }
210:
211: try {
212: mainClass = classLoader.loadClass(findMainClass());
213: } catch (ClassNotFoundException e) {
214: throw new FailedOperationException(
215: "Failed to load the main class from application", e);
216: } catch (IOException e) {
217: throw new FailedOperationException(
218: "Failed to load the main class from application", e);
219: } catch (LinkageError e) {
220: throw new FailedOperationException(
221: "Failed to load the main class from application", e);
222: }
223:
224: try {
225: model = new Model(mainClass);
226: } catch (IllegalResourceException e) {
227: throw new FailedOperationException(
228: "Failed to configure program", e);
229: } catch (LinkageError e) {
230: throw new FailedOperationException(
231: "Failed to configure program", e);
232: }
233:
234: state = STOPPED;
235:
236: logger.info("Loaded " + name);
237: }
238:
239: private String findMainClass() throws IOException {
240: // determine the Main class name
241: Enumeration<URL> res = classLoader
242: .getResources("META-INF/MANIFEST.MF");
243: while (res.hasMoreElements()) {
244: URL url = res.nextElement();
245: InputStream is = new BufferedInputStream(url.openStream());
246: try {
247: Manifest mf = new Manifest(is);
248: String value = mf.getMainAttributes().getValue(
249: "Dalma-Main-Class");
250: if (value != null) {
251: logger.info("Found Dalma-Main-Class=" + value
252: + " in " + url);
253: return value;
254: }
255: } finally {
256: is.close();
257: }
258: }
259:
260: // default location
261: return "Main";
262: }
263:
264: /**
265: * Starts executing the workflow application.
266: *
267: * <p>
268: * Moves the state to {@link WorkflowState#RUNNING}.
269: */
270: public synchronized void start() throws FailedOperationException {
271: load();
272: if (state == RUNNING)
273: return; // already started
274:
275: logger.info("Starting " + name);
276:
277: // set the context class loader when configuring the engine
278: // and calling into application classes
279: final ClassLoader old = Thread.currentThread()
280: .getContextClassLoader();
281: Thread.currentThread().setContextClassLoader(classLoader);
282:
283: try {
284: try {
285: engine = new EngineImpl(new File(workDir, "data"),
286: classLoader, owner.executor);
287: } catch (IOException e) {
288: throw new FailedOperationException(
289: "Failed to start engine", e);
290: }
291:
292: try {
293: Object main = mainClass.newInstance();
294: if (!(main instanceof Program)) {
295: logger.severe(mainClass.getName()
296: + " doesn't extend the Program class");
297: return;
298: }
299: program = (Program) main;
300: // TODO: replace with a real logger
301: program.setLogger(logger);
302: } catch (InstantiationException e) {
303: throw new FailedOperationException(
304: "Failed to load the main class from application",
305: e);
306: } catch (IllegalAccessException e) {
307: throw new FailedOperationException(
308: "Failed to load the main class from application",
309: e);
310: }
311:
312: // perform resource injection
313: try {
314: ((Model) model).inject(engine, program,
315: loadConfigProperties());
316: } catch (InjectionException e) {
317: throw new FailedOperationException(
318: "Failed to configure program", e);
319: } catch (ParseException e) {
320: throw new FailedOperationException(
321: "Failed to configure program", e);
322: } catch (IOException e) {
323: throw new FailedOperationException(
324: "Failed to configure program", e);
325: }
326:
327: try {
328: program.init(engine);
329: } catch (Throwable e) {
330: // faled
331: throw new FailedOperationException(mainClass.getName()
332: + ".init() method reported an exception", e);
333: }
334:
335: // hook things up so that completed conversations will be added to the record
336: engine.addListener(new EngineListener() {
337: public void onConversationCompleted(Conversation conv) {
338: ccList.add(conv);
339: }
340: });
341: engine.start();
342:
343: try {
344: program.main(engine);
345: } catch (Throwable e) {
346: // faled
347: throw new FailedOperationException(mainClass.getName()
348: + ".main() method reported an exception", e);
349: }
350:
351: state = RUNNING;
352:
353: logger.info("Started " + name);
354: } finally {
355: Thread.currentThread().setContextClassLoader(old);
356: if (state != RUNNING) {
357: // compensation
358: program = null;
359: if (engine.isStarted())
360: engine.stop();
361: engine = null;
362: }
363: }
364: }
365:
366: /**
367: * Loads {@link #confFile} into a {@link Properties}.
368: *
369: * @return always non-null.
370: */
371: public Properties loadConfigProperties() throws IOException {
372: Properties props = new Properties();
373: if (confFile.exists()) {
374: InputStream in = new BufferedInputStream(
375: new FileInputStream(confFile));
376: try {
377: props.load(in);
378: } finally {
379: in.close();
380: }
381: }
382: return props;
383: }
384:
385: /**
386: * Saves the given propertie sinto {@link #confFile}.
387: */
388: public void saveConfigProperties(Properties props)
389: throws IOException {
390: confFile.getParentFile().mkdirs();
391: OutputStream fos = new BufferedOutputStream(
392: new FileOutputStream(confFile));
393: try {
394: props.store(fos, null);
395: } finally {
396: fos.close();
397: }
398: }
399:
400: private static void log(String msg, Throwable t) {
401: logger.log(Level.SEVERE, msg, t);
402: }
403:
404: /**
405: * Creates a new {@link ClassLoader} that loads workflow application classes.
406: */
407: private ClassLoaderImpl createClassLoader() throws IOException {
408: ClassLoaderImpl cl = new ClassLoaderImpl(owner.appClassLoader);
409: cl.addJarFiles(appDir);
410: cl.addPathFile(appDir);
411: cl.makeContinuable();
412: return cl;
413: }
414:
415: /**
416: * Stops the execution of the workflow application.
417: *
418: * <p>
419: * Moves the state to {@link WorkflowState#STOPPED}.
420: */
421: public synchronized void stop() {
422: if (state != RUNNING)
423: return; // already stopped
424:
425: logger.info("Stopping " + name);
426:
427: if (program != null) {
428: try {
429: program.cleanup(engine);
430: } catch (Exception e) {
431: log(program.getClass().getName()
432: + ".cleanup() method reported an exception", e);
433: }
434: program = null;
435: }
436:
437: engine.stop();
438: engine = null;
439: state = STOPPED;
440:
441: logger.info("Stopped " + name);
442: }
443:
444: /**
445: * Unloads the workflow application from memory.
446: *
447: * <p>
448: * Moves the state to {@link WorkflowState#UNLOADED}.
449: */
450: public synchronized void unload() {
451: stop();
452: if (state == UNLOADED)
453: return; // nothing to do
454:
455: classLoader.cleanup();
456: classLoader = null;
457: mainClass = null;
458: model = null;
459: state = UNLOADED;
460:
461: logger.info("Unloaded " + name);
462: }
463:
464: /**
465: * The name of the workflow application
466: * that uniquely identifies a {@link WorkflowApplication}.
467: */
468: public String getName() {
469: return name;
470: }
471:
472: /**
473: * Gets the human-readable description of this web application.
474: */
475: public String getDescription() {
476: if (mainClass == null)
477: return "(not available for unloaded workflow)";
478:
479: Description d = mainClass.getAnnotation(Description.class);
480: if (d == null)
481: return "(no description available)";
482: return d.value();
483: }
484:
485: /**
486: * Returns true if this application is currently running.
487: */
488: public WorkflowState getState() {
489: return state;
490: }
491:
492: /**
493: * Gets the location of resource configuration file.
494: */
495: public File getConfigFile() {
496: return confFile;
497: }
498:
499: /**
500: * Gets the engine that's executing the workflow.
501: * <p>
502: * The engine is owned by this {@link WorkflowApplication}, so the caller shouldn't
503: * try to alter its state.
504: */
505: public Engine getEngine() {
506: return engine;
507: }
508:
509: /**
510: * Gets the model object that describes resources needed by this application.
511: */
512: public Model<?> getModel() {
513: return model;
514: }
515:
516: /**
517: * Gets records about completed conversations.
518: *
519: * @return
520: * always non-null. Map is keyed by ID.
521: */
522: public Map<Integer, Conversation> getCompletedConversations() {
523: return ccList.getList();
524: }
525:
526: /**
527: * Called when the directory is removed from the apps folder to remove
528: * this application from the container.
529: */
530: protected synchronized void remove() {
531: synchronized (undeployLock) {
532: owner.applications.remove(getName());
533: if (objectName != null) {
534: try {
535: owner.mbeanServer.unregisterMBean(objectName);
536: } catch (JMException e) {
537: logger.log(Level.WARNING, "Failed to unregister "
538: + objectName);
539: } finally {
540: objectName = null;
541: }
542: }
543: stop();
544: undeployed = true;
545: undeployLock.notifyAll();
546: }
547: }
548:
549: /**
550: * Completely removes this workflow application.
551: */
552: public void undeploy() throws FailedOperationException {
553: unload();
554: synchronized (undeployLock) {
555: if (!undeployed) {
556: File dar = new File(owner.appsDir, name + ".dar");
557: if (dar.exists() && !dar.delete()) {
558: throw new FailedOperationException(
559: "failed to delete " + appDir);
560: }
561:
562: try {
563: Util.deleteRecursive(appDir);
564: Util.deleteRecursive(workDir);
565: } catch (IOException e) {
566: throw new FailedOperationException(
567: "Unable to clean up the application directory "
568: + appDir, e);
569: }
570: try {
571: undeployLock.wait(15 * 1000);
572: } catch (InterruptedException e) {
573: Thread.currentThread().interrupt(); // process it later
574: }
575: if (!undeployed)
576: throw new FailedOperationException(
577: "Operation timed out");
578: }
579: }
580:
581: }
582:
583: private final Object undeployLock = new Object();
584: /**
585: * True when this workflow is already undeployed.
586: */
587: private boolean undeployed = false;
588:
589: private static final String LOG_ROTATION_KEY = "!log-rotation-days";
590: }
|