001: package dalma.impl;
002:
003: import dalma.Conversation;
004: import dalma.ConversationDeath;
005: import dalma.EndPoint;
006: import dalma.Engine;
007: import dalma.ErrorHandler;
008: import dalma.Executor;
009: import dalma.Workflow;
010: import dalma.EngineListener;
011: import dalma.endpoints.timer.TimerEndPoint;
012: import dalma.spi.EngineSPI;
013: import org.apache.bsf.BSFManager;
014: import org.apache.commons.io.IOUtils;
015:
016: import java.io.File;
017: import java.io.FileFilter;
018: import java.io.FileReader;
019: import java.io.IOException;
020: import java.io.Reader;
021: import java.io.Serializable;
022: import java.text.ParseException;
023: import java.util.ArrayList;
024: import java.util.Collection;
025: import java.util.Collections;
026: import java.util.Hashtable;
027: import java.util.Map;
028: import java.util.Properties;
029: import java.util.TreeMap;
030: import java.util.Date;
031: import java.util.Comparator;
032: import java.util.logging.Logger;
033: import java.util.logging.Level;
034:
035: /**
036: * {@link Engine} implementation.
037: *
038: * <h3>Persistence of Engine</h3>
039: * <p>
040: * Engine object is serialized to persist the state of the engine itself
041: * across JVM sessions.
042: *
043: * @author Kohsuke Kawaguchi
044: */
045: public final class EngineImpl extends EngineSPI implements Serializable {
046:
047: /**
048: * Logger that logs events.
049: */
050: private transient Logger logger = Logger.getLogger(EngineImpl.class
051: .getName());
052:
053: /**
054: * Executes conversations that can be run.
055: */
056: private transient final Executor executor;
057:
058: /**
059: * Root directory of the system.
060: */
061: private transient final File rootDir;
062:
063: /**
064: * Generates the unique ID.
065: */
066: private final SequenceGenerator idGen = new SequenceGenerator();
067:
068: /**
069: * ClassLoader used to restore conversations.
070: *
071: * TODO: allow each conversation to have its own class loader,
072: * but this has an issue in the restoration.
073: *
074: * Transient because ClassLoaders can't be serialized in general.
075: */
076: /*package*/final transient ClassLoader classLoader;
077:
078: /**
079: * All conversations that belong to this engine.
080: * access need to be synchronized.
081: * Keyed by their {@link ConversationImpl#id}.
082: */
083: transient final Map<Integer, ConversationImpl> conversations = new Hashtable<Integer, ConversationImpl>();
084:
085: /**
086: * All {@link EndPoint}s that bleong to this engine.
087: * access need to be synchronized.
088: */
089: transient final Map<String, EndPointImpl> endPoints = new Hashtable<String, EndPointImpl>();
090:
091: /**
092: * Signals when all the conversation completes.
093: */
094: transient final Object completionLock = new Object();
095:
096: /**
097: * This lock is used to control "dalma.xml" access.
098: */
099: transient final Object saveLoadLock = new Object();
100:
101: transient final EngineListenerSet listeners = new EngineListenerSet();
102:
103: /**
104: * True once the engine is started.
105: */
106: transient private boolean started;
107:
108: /**
109: * Possibly null {@link ErrorHandler}.
110: */
111: transient private ErrorHandler errorHandler;
112:
113: /**
114: * See {@link #getLastActiveTime()}
115: */
116: private long lastActiveTime = 0;
117:
118: public EngineImpl(File rootDir, ClassLoader classLoader,
119: Executor executor) throws IOException {
120: this .rootDir = rootDir;
121: this .executor = executor;
122: this .classLoader = classLoader;
123: setLogger(Logger.getLogger(getClass().getName()));
124: load();
125:
126: addEndPoint(new TimerEndPoint());
127: }
128:
129: /**
130: * Loads the configuration from disk.
131: */
132: private void load() throws IOException {
133: synchronized (saveLoadLock) {
134: XmlFile df = getDataFile();
135: if (df.exists()) {
136: // load data into this object
137: df.unmarshal(this );
138: }
139: }
140: }
141:
142: /**
143: * Loads persisted conversations from disk.
144: */
145: private void loadConversations() {
146: // restore conversations
147: File convDir = getConversationsDir();
148: convDir.mkdirs();
149: File[] subdirs = convDir.listFiles(new FileFilter() {
150: public boolean accept(File child) {
151: return child.isDirectory();
152: }
153: });
154:
155: for (File subdir : subdirs) {
156: try {
157: ConversationImpl conv = ConversationImpl.load(this ,
158: subdir);
159: conversations.put(conv.id, conv);
160: } catch (IOException e) {
161: logger.log(Level.WARNING,
162: "Failed to load conversation " + subdir, e);
163: }
164: }
165: }
166:
167: /**
168: * Directory to store conversations.
169: */
170: File getConversationsDir() {
171: return new File(rootDir, "conversations");
172: }
173:
174: /**
175: * Persists ths state of this object (but not conversations)
176: * into the data file.
177: */
178: private void save() throws IOException {
179: synchronized (saveLoadLock) {
180: try {
181: SerializationContext.set(this ,
182: SerializationContext.Mode.ENGINE);
183: getDataFile().write(this );
184: } finally {
185: SerializationContext.remove();
186: }
187: }
188: }
189:
190: /**
191: * Name of the data file to persist this object.
192: */
193: private XmlFile getDataFile() {
194: return new XmlFile(new File(rootDir, "dalma.xml"));
195: }
196:
197: /**
198: * Generates unique IDs for {@link ConversationImpl}.
199: */
200: int generateUniqueId() throws IOException {
201: int r = idGen.next();
202: save();
203: return r;
204: }
205:
206: /**
207: * Queues a conversation that became newly runnable.
208: */
209: void queue(final FiberImpl f) {
210: executor.execute(new Runnable() {
211: public void run() {
212: try {
213: lastActiveTime = System.currentTimeMillis();
214: f.run();
215: } catch (FiberDeath t) {
216: // this fiber is dead!
217: } catch (ConversationDeath t) {
218: // some fatal error caused the conversation to die.
219: // report the error first before removing the conversation,
220: // which might cause the engine to signal "we are done!" event.
221: addToErrorQueue(t.getCause());
222: f.owner.remove();
223: } catch (Throwable t) {
224: // even if the error recovery process fails,
225: // don't let the worker thread die.
226: addToErrorQueue(t);
227: } finally {
228: lastActiveTime = System.currentTimeMillis();
229: }
230: }
231: });
232: }
233:
234: protected void addToErrorQueue(Throwable t) {
235: if (errorHandler == null)
236: ErrorHandler.DEFAULT.onError(t);
237: else
238: errorHandler.onError(t);
239: }
240:
241: public ErrorHandler getErrorHandler() {
242: return errorHandler;
243: }
244:
245: public void setErrorHandler(ErrorHandler errorHandler) {
246: this .errorHandler = errorHandler;
247: }
248:
249: public void addListener(EngineListener listener) {
250: listeners.add(listener);
251: }
252:
253: public void removeListener(EngineListener listener) {
254: listeners.remove(listener);
255: }
256:
257: public synchronized Collection<Conversation> getConversations() {
258: makeSureStarted();
259: ArrayList<Conversation> convs = new ArrayList<Conversation>(
260: conversations.values());
261: Collections.sort(convs, ID_COMPARATOR);
262: return convs;
263: }
264:
265: public int getConversationsSize() {
266: return conversations.size();
267: }
268:
269: public Date getLastActiveTime() {
270: return new Date(lastActiveTime);
271: }
272:
273: public Map<String, EndPoint> getEndPoints() {
274: synchronized (endPoints) {
275: return Collections
276: .<String, EndPoint> unmodifiableMap(endPoints);
277: }
278: }
279:
280: public EndPoint getEndPoint(String name) {
281: return endPoints.get(name);
282: }
283:
284: public void addEndPoint(EndPoint ep) {
285: makeSureNotStarted();
286: synchronized (endPoints) {
287: if (endPoints.containsKey(ep.getName()))
288: throw new IllegalArgumentException(
289: "There's already an EndPoint of the name "
290: + ep.getName());
291: if (!(ep instanceof EndPointImpl))
292: throw new IllegalArgumentException(ep.getClass()
293: .getName()
294: + " doesn't derive from EndPointImpl");
295: endPoints.put(ep.getName(), (EndPointImpl) ep);
296: }
297: }
298:
299: public synchronized EndPoint addEndPoint(String name,
300: String endpointURL) throws ParseException {
301: EndPoint ep = EndPoint.create(name, endpointURL);
302: addEndPoint(ep);
303: return ep;
304: }
305:
306: public Map<String, EndPoint> addEndPoints(Properties endpointURLs)
307: throws ParseException {
308: Map<String, EndPoint> r = new TreeMap<String, EndPoint>();
309:
310: for (Map.Entry e : endpointURLs.entrySet()) {
311: EndPoint ep = addEndPoint(e.getKey().toString(), e
312: .getValue().toString());
313: r.put(ep.getName(), ep);
314: }
315:
316: return r;
317: }
318:
319: public void configureWithBSF(File scriptFile) throws IOException {
320: Reader f = new FileReader(scriptFile);
321: try {
322: try {
323: BSFManager bsfm = new BSFManager();
324: bsfm.declareBean("engine", this , Engine.class);
325: String language = BSFManager
326: .getLangFromFilename(scriptFile.getPath());
327: bsfm.exec(language, scriptFile.getPath(), 1, 1, IOUtils
328: .toString(f));
329: } catch (RuntimeException e) {
330: throw e;
331: } catch (Exception e) {
332: // we'd really like to just catch BSFException, but if I do that,
333: // we'll need BSF just to load this class
334: IOException x = new IOException(e.getMessage());
335: x.initCause(e);
336: throw x;
337: }
338: } finally {
339: f.close();
340: }
341: }
342:
343: public void start() {
344: makeSureNotStarted();
345: started = true;
346: synchronized (endPoints) {
347: for (EndPointImpl ep : endPoints.values())
348: ep.start();
349: }
350: loadConversations();
351: }
352:
353: public boolean isStarted() {
354: return started;
355: }
356:
357: private void makeSureStarted() {
358: if (!started)
359: throw new IllegalStateException("engine is not started");
360: }
361:
362: private void makeSureNotStarted() {
363: if (started)
364: throw new IllegalStateException("engine is already started");
365: }
366:
367: public void stop() {
368: makeSureStarted();
369:
370: // clone first to avoid concurrent modification
371: Collection<EndPointImpl> eps;
372: synchronized (endPoints) {
373: eps = new ArrayList<EndPointImpl>(endPoints.values());
374: }
375:
376: for (EndPointImpl ep : eps)
377: ep.stop();
378:
379: // write any pending changes
380: try {
381: save();
382: } catch (IOException e) {
383: logger.log(Level.WARNING, "Failed to save state", e);
384: }
385: }
386:
387: public void setLogger(Logger logger) {
388: if (logger == null) {
389: // use unconnected anonymous logger to ignore log
390: logger = Logger.getAnonymousLogger();
391: logger.setUseParentHandlers(false);
392: }
393: this .logger = logger;
394: }
395:
396: public void waitForCompletion() throws InterruptedException {
397: makeSureStarted();
398: synchronized (completionLock) {
399: while (!conversations.isEmpty())
400: completionLock.wait();
401: }
402: }
403:
404: public ConversationImpl getConversation(int id) {
405: makeSureStarted();
406: return conversations.get(id);
407: }
408:
409: public ConversationImpl createConversation(Runnable target)
410: throws IOException {
411: return createConversation(new RunnableWorkflowImpl(target));
412: }
413:
414: public ConversationImpl createConversation(Workflow workflow)
415: throws IOException {
416: makeSureStarted();
417: return new ConversationImpl(this , workflow);
418: }
419:
420: private Object writeReplace() {
421: if (SerializationContext.get().mode != SerializationContext.Mode.ENGINE)
422: // if the engine is written as a part of dehydration,
423: // return a moniker to avoid the whole engine to be serialized.
424: return MONIKER;
425: else
426: // otherwise we are serializing the whole engine.
427: return this ;
428: }
429:
430: private static final long serialVersionUID = 1L;
431:
432: private static final class EngineMoniker implements Serializable {
433: private static final long serialVersionUID = 1L;
434:
435: private Object readResolve() {
436: return SerializationContext.get().engine;
437: }
438: }
439:
440: private static final EngineMoniker MONIKER = new EngineMoniker();
441:
442: private static final Comparator<Conversation> ID_COMPARATOR = new Comparator<Conversation>() {
443: public int compare(Conversation lhs, Conversation rhs) {
444: return lhs.getId() - rhs.getId();
445: }
446: };
447: }
|