001: package dalma.container;
002:
003: import dalma.Conversation;
004:
005: import java.io.File;
006: import java.io.FileFilter;
007: import java.io.IOException;
008: import java.util.Collections;
009: import java.util.HashMap;
010: import java.util.Iterator;
011: import java.util.Map;
012: import java.util.Observable;
013: import java.util.TreeMap;
014: import java.util.concurrent.Executor;
015: import java.util.concurrent.Executors;
016: import java.util.concurrent.ThreadFactory;
017: import java.util.logging.Level;
018: import java.util.logging.Logger;
019:
020: /**
021: * List of {@link CompletedConversation}s.
022: *
023: * <p>
024: * This class handles persistence of {@link CompletedConversation}s.
025: *
026: * <p>
027: * To keep the data in sync with the disk, we occasionally rescan
028: * the disk.
029: *
030: * @author Kohsuke Kawaguchi
031: */
032: final class CompletedConversationList extends Observable {
033: /**
034: * Directory to store conversations.
035: */
036: private final File dir;
037:
038: /**
039: * We occasionally update the list from a file system.
040: * The next scheduled update time.
041: */
042: private transient long nextUpdate = 0;
043:
044: /**
045: * If the reloading of runs are in progress (in another thread,
046: * set to true.)
047: */
048: private transient boolean reloadingInProgress;
049:
050: /**
051: * Keyed by ID.
052: */
053: private Map<Integer, CompletedConversation> convs;
054:
055: /**
056: * Determines when to discard a log record.
057: * Transient because it's an application object that may not be
058: * persistable.
059: */
060: private transient LogRotationPolicy policy = LogRotationPolicy.NEVER;
061:
062: public CompletedConversationList(File dir) {
063: this .dir = dir;
064: scheduleReload();
065: }
066:
067: /**
068: * Gets the data file for the given conversation.
069: */
070: private File getDataFile(Conversation conv) {
071: return new File(dir, String.format("%06d.dat", conv.getId()));
072: }
073:
074: /**
075: * Sets the log rotation policy.
076: */
077: public void setPolicy(LogRotationPolicy policy) {
078: this .policy = policy;
079: scheduleReload();
080: }
081:
082: public void add(Conversation _conv) {
083: loadSync();
084: CompletedConversation conv = new CompletedConversation(_conv);
085:
086: synchronized (convs) {
087: convs.put(conv.getId(), conv);
088: applyLogRotation(convs);
089: }
090:
091: File dt = getDataFile(conv);
092: try {
093: conv.save(dt);
094: } catch (IOException e) {
095: logger.log(Level.WARNING, "Unable to save " + dt, e);
096: }
097: }
098:
099: public void remove(Conversation conv) {
100: loadSync();
101: if (convs.remove(conv.getId()) == null)
102: throw new IllegalArgumentException();
103: // delete from disk, too
104: getDataFile(conv).delete();
105: }
106:
107: /**
108: * Gets a snapshot view of all the {@link CompletedConversation}s.
109: *
110: * @return
111: * always non-null, possibly empty. Map is keyed by ID.
112: */
113: public Map<Integer, Conversation> getList() {
114: loadSync();
115: if (nextUpdate < System.currentTimeMillis()
116: && !reloadingInProgress)
117: scheduleReload();
118: // return a new copy to avoid synchronization issue
119: return new HashMap<Integer, Conversation>(convs);
120: }
121:
122: private synchronized void scheduleReload() {
123: if (!reloadingInProgress) {
124: // avoid scheduling the task twice
125: reloadingInProgress = true;
126: // schedule a new reloading operation.
127: // we don't want to block the current thread,
128: // so reloading is done asynchronously.
129: reloader.execute(reloadTask);
130: }
131: }
132:
133: private void loadSync() {
134: if (convs == null)
135: reloadTask.run();
136: }
137:
138: private final Runnable reloadTask = new Runnable() {
139: public void run() {
140: Map<Integer, CompletedConversation> convs = new TreeMap<Integer, CompletedConversation>();
141: convs = Collections.synchronizedMap(convs);
142:
143: File[] data = dir.listFiles(new FileFilter() {
144: public boolean accept(File file) {
145: return file.getPath().endsWith(".dat");
146: }
147: });
148: if (data != null) {
149: for (File dt : data) {
150: try {
151: CompletedConversation conv = CompletedConversation
152: .load(dt);
153: convs.put(conv.getId(), conv);
154: } catch (IOException e) {
155: logger.log(Level.WARNING, "Unable to load "
156: + dt, e);
157: dt.delete(); // discard this entry to avoid repeating this error in the future
158: }
159: }
160: }
161:
162: applyLogRotation(convs);
163:
164: CompletedConversationList.this .convs = convs;
165: reloadingInProgress = false;
166: nextUpdate = System.currentTimeMillis() + 5000;
167: }
168: };
169:
170: private void applyLogRotation(
171: Map<Integer, CompletedConversation> convs) {
172: // apply log policy and trim the entries
173: for (Iterator<Map.Entry<Integer, CompletedConversation>> itr = convs
174: .entrySet().iterator(); itr.hasNext();) {
175: CompletedConversation c = itr.next().getValue();
176: if (!policy.keep(c))
177: itr.remove();
178: }
179: }
180:
181: private static final Executor reloader = Executors
182: .newSingleThreadExecutor(new ThreadFactory() {
183: public Thread newThread(Runnable r) {
184: Thread t = new Thread(r);
185: t.setDaemon(true);
186: return t;
187: }
188: });
189:
190: private static final Logger logger = Logger
191: .getLogger(CompletedConversationList.class.getName());
192: }
|