001: package dalma.container;
002:
003: import dalma.Executor;
004: import dalma.impl.Util;
005: import dalma.helpers.Java5Executor;
006:
007: import javax.management.JMException;
008: import javax.management.ObjectName;
009: import javax.management.MBeanServer;
010: import javax.management.remote.JMXServiceURL;
011: import javax.management.remote.JMXConnectorServer;
012: import javax.management.remote.JMXConnectorServerFactory;
013: import java.io.File;
014: import java.io.FileFilter;
015: import java.io.IOException;
016: import java.io.FileOutputStream;
017: import java.io.OutputStream;
018: import java.io.FileInputStream;
019: import java.io.InputStream;
020: import java.lang.management.ManagementFactory;
021: import java.util.Collection;
022: import java.util.Collections;
023: import java.util.Hashtable;
024: import java.util.Map;
025: import java.util.Properties;
026: import java.util.Enumeration;
027: import java.util.List;
028: import java.util.ArrayList;
029: import java.util.jar.JarFile;
030: import java.util.jar.JarEntry;
031: import java.util.concurrent.Executors;
032: import java.util.concurrent.Future;
033: import java.util.concurrent.TimeUnit;
034: import java.util.concurrent.ExecutionException;
035: import java.util.concurrent.TimeoutException;
036: import java.util.logging.Level;
037: import java.util.logging.Logger;
038: import java.net.URL;
039:
040: /**
041: * Root of Dalmacon.
042: *
043: * @author Kohsuke Kawaguchi
044: */
045: public final class Container implements ContainerMBean {
046: private static final Logger logger = Logger
047: .getLogger(Container.class.getName());
048:
049: /**
050: * The root directory of the dalma installation. The value of DALMA_HOME.
051: * We want the absolute version since we send this across JMX.
052: */
053: final File homeDir;
054:
055: /**
056: * DALMA_HOME/apps
057: */
058: final File appsDir;
059:
060: /**
061: * {@link Executor} that is shared by all {@link WorkflowApplication}s.
062: */
063: protected final Executor executor;
064:
065: final Map<String, WorkflowApplication> applications;
066:
067: /**
068: * All installed {@link Module}s.
069: */
070: final List<Module> modules;
071:
072: private final Redeployer redeployer;
073:
074: protected final MBeanServer mbeanServer;
075:
076: /**
077: * {@link ClassLoader} that can load lib/*.jar
078: */
079: final ClassLoader appClassLoader;
080:
081: /**
082: * Creates a new container.
083: *
084: * @param root
085: * Home directory to store data. Must exist.
086: * @param executor
087: * used to execute workflow applications.
088: */
089: public Container(File root, Executor executor) throws IOException {
090: this .homeDir = root.getAbsoluteFile();
091: this .modules = findModules();
092: this .appsDir = new File(homeDir, "apps");
093: this .executor = executor;
094: this .appClassLoader = createClassLoader();
095: this .mbeanServer = ManagementFactory.getPlatformMBeanServer();
096: this .applications = findApps();
097:
098: for (WorkflowApplication app : applications.values()) {
099: try {
100: if (app.isConfigured())
101: app.start();
102: } catch (FailedOperationException e) {
103: logger.log(Level.WARNING, "Failed to start "
104: + app.getName(), e);
105: }
106: }
107:
108: try {
109: MBeanProxy.register(mbeanServer,
110: new ObjectName("dalma:dir="
111: + ObjectName.quote(homeDir.toString())),
112: ContainerMBean.class, this );
113: } catch (JMException e) {
114: logger.log(Level.WARNING, "Failed to register to JMX", e);
115: }
116:
117: redeployer = new Redeployer(this );
118: logger.info("Auto-redeployment activated");
119: }
120:
121: //private void disableAutoRedeploy() {
122: // if(redeployer!=null) {
123: // redeployer.cancel();
124: // redeployer = null;
125: // logger.info("Auto-redeployment deactivated");
126: // }
127: //}
128:
129: /**
130: * Creates a {@link ClassLoader} that loads all lib/*.jar.
131: */
132: private ClassLoader createClassLoader() throws IOException {
133: ClassLoaderImpl cl = new ClassLoaderImpl(getClass()
134: .getClassLoader());
135: // lib/*.jar
136: cl.addJarFiles(new File(homeDir, "lib"));
137:
138: // modules/*/*.jar
139: for (Module mod : modules)
140: cl.addJarFiles(mod.dir);
141:
142: return cl;
143: }
144:
145: public void stop() {
146: for (WorkflowApplication app : applications.values())
147: app.stop();
148: }
149:
150: public synchronized WorkflowApplication deploy(String name,
151: byte[] data) throws FailedOperationException,
152: InterruptedException {
153: logger.info("Accepting application '" + name + "'");
154: // use a temp file first to hide from auto redeployer
155: File tmpFile = new File(appsDir, name + ".tmp");
156: try {
157: OutputStream os = new FileOutputStream(tmpFile);
158: try {
159: os.write(data);
160: } finally {
161: os.close();
162: }
163: } catch (IOException e) {
164: throw new FailedOperationException(
165: "Failed to write to a file", e);
166: }
167:
168: Future<WorkflowApplication> ft = redeployer.getFuture(new File(
169: appsDir, name));
170:
171: File darFile = new File(appsDir, name + ".dar");
172: if (darFile.exists())
173: darFile.delete();
174: tmpFile.renameTo(darFile);
175: // the rest is up to redeployer to pick up
176:
177: try {
178: return ft.get(15, TimeUnit.SECONDS);
179: } catch (ExecutionException e) {
180: throw new FailedOperationException("Deployment failed", e
181: .getCause());
182: } catch (TimeoutException e) {
183: throw new FailedOperationException("Operation timed out", e);
184: }
185: }
186:
187: /**
188: * Called when a new directory is created in the 'apps' folder,
189: * to create a {@link WorkflowApplication} over it.
190: */
191: protected WorkflowApplication deploy(File appsubdir)
192: throws FailedOperationException {
193: WorkflowApplication wa = new WorkflowApplication(this ,
194: appsubdir);
195: applications.put(wa.getName(), wa);
196: if (wa.isConfigured())
197: wa.start(); // looks like it's configured enough to start
198: // otherwise leave it in the loaded state
199: return wa;
200: }
201:
202: /**
203: * Returns the read-only list of all {@link WorkflowApplication}s
204: * in this container.
205: */
206: public Collection<WorkflowApplication> getApplications() {
207: return Collections
208: .unmodifiableCollection(applications.values());
209: }
210:
211: public WorkflowApplication getApplication(String name) {
212: return applications.get(name);
213: }
214:
215: private List<Module> findModules() {
216: List<Module> r = new ArrayList<Module>();
217:
218: File[] modules = new File(homeDir, "modules")
219: .listFiles(new FileFilter() {
220: public boolean accept(File path) {
221: return path.isDirectory();
222: }
223: });
224:
225: if (modules != null)
226: for (File mod : modules)
227: r.add(new Module(this , mod));
228:
229: return Collections.unmodifiableList(r);
230: }
231:
232: /**
233: * Finds all the workflow applications.
234: */
235: private Map<String, WorkflowApplication> findApps() {
236: if (!appsDir.exists()) {
237: logger
238: .severe("Workflow application directory doesn't exist: "
239: + appsDir);
240: return Collections.emptyMap(); // no apps
241: }
242:
243: // first extract all dars (unless they are up-to-date)
244: File[] dars = appsDir.listFiles(new FileFilter() {
245: public boolean accept(File path) {
246: return path.getPath().endsWith(".dar");
247: }
248: });
249: for (File dar : dars)
250: explode(dar);
251:
252: File[] subdirs = appsDir.listFiles(new FileFilter() {
253: public boolean accept(File path) {
254: return path.isDirectory();
255: }
256: });
257:
258: Map<String, WorkflowApplication> apps = new Hashtable<String, WorkflowApplication>();
259:
260: for (File subdir : subdirs)
261: try {
262: apps.put(subdir.getName(), new WorkflowApplication(
263: this , subdir));
264: } catch (FailedOperationException e) {
265: logger.log(Level.WARNING, "Failed to load from "
266: + subdir, e);
267: }
268:
269: return apps;
270: }
271:
272: /**
273: * Gets the name of the DALMA_HOME directory.
274: */
275: public File getHomeDir() {
276: return homeDir;
277: }
278:
279: /**
280: * Gets the name of the container configuration file.
281: */
282: public File getConfigFile() {
283: return new File(new File(homeDir, "conf"), "dalma.properties");
284: }
285:
286: /**
287: * Creates a configured {@link Container} from HOME/conf/dalma.properties
288: */
289: public static Container create(File home) throws IOException {
290: Properties conf = loadProperties(home);
291:
292: Container container = new Container(home, new Java5Executor(
293: Executors.newFixedThreadPool(readProperty(conf,
294: "thread.count", 5))));
295:
296: int jmxPort = readProperty(conf, "jmx.port", -1);
297: if (jmxPort >= 0) {
298: logger.info("Initializing JMXMP connector at port "
299: + jmxPort);
300: JMXServiceURL url = new JMXServiceURL("jmxmp", null,
301: jmxPort);
302: JMXConnectorServer cs = JMXConnectorServerFactory
303: .newJMXConnectorServer(url, null, ManagementFactory
304: .getPlatformMBeanServer());
305:
306: cs.start();
307: logger.info("Started JMXMP connector");
308: }
309:
310: return container;
311: }
312:
313: private static int readProperty(Properties props, String key,
314: int defaultValue) {
315: String value = props.getProperty(key);
316: if (value == null)
317: return defaultValue;
318:
319: try {
320: return Integer.parseInt(value);
321: } catch (NumberFormatException e) {
322: logger.severe("Configuration value for " + key
323: + " must be int, but found \"" + value + "\"");
324: return defaultValue;
325: }
326: }
327:
328: private static File getConfigFile(File home, String name) {
329: return new File(new File(home, "conf"), name);
330: }
331:
332: private static Properties loadProperties(File home) {
333: Properties props = new Properties();
334: File config = getConfigFile(home, "dalma.properties");
335: if (config.exists()) {
336: try {
337: FileInputStream in = new FileInputStream(config);
338: try {
339: props.load(in);
340: } finally {
341: in.close();
342: }
343: } catch (IOException e) {
344: logger.log(Level.SEVERE, "Failed to read " + config, e);
345: }
346: }
347: return props;
348: }
349:
350: /**
351: * Extracts the given dar file into the same directory.
352: */
353: static void explode(File dar) {
354: try {
355: String name = dar.getName();
356: File exploded = new File(dar.getParentFile(), name
357: .substring(0, name.length() - 4));
358: if (exploded.exists()) {
359: if (exploded.lastModified() > dar.lastModified()) {
360: return;
361: }
362: Util.deleteRecursive(exploded);
363: }
364:
365: logger.info("Extracting " + dar);
366:
367: byte[] buf = new byte[1024]; // buffer
368:
369: JarFile archive = new JarFile(dar);
370: Enumeration<JarEntry> e = archive.entries();
371: while (e.hasMoreElements()) {
372: JarEntry j = e.nextElement();
373: File dst = new File(exploded, j.getName());
374:
375: if (j.isDirectory()) {
376: dst.mkdirs();
377: continue;
378: }
379:
380: dst.getParentFile().mkdirs();
381:
382: InputStream in = archive.getInputStream(j);
383: FileOutputStream out = new FileOutputStream(dst);
384: try {
385: while (true) {
386: int sz = in.read(buf);
387: if (sz < 0)
388: break;
389: out.write(buf, 0, sz);
390: }
391: } finally {
392: in.close();
393: out.close();
394: }
395: }
396:
397: archive.close();
398: } catch (IOException x) {
399: logger.log(Level.SEVERE, "Unable to extract " + dar, x);
400: // leave the engine stopped,
401: // so that if the user updates the file again, it will restart the engine
402: }
403: }
404:
405: static {
406: try {
407: // avoid cache that causes jar leaks
408: new URL("http://dummy/").openConnection()
409: .setDefaultUseCaches(false);
410: } catch (IOException e) {
411: throw new Error(e);
412: }
413: }
414: }
|