001: /*
002: * Copyright 2005-2006 The Kuali Foundation.
003: *
004: *
005: * Licensed under the Educational Community License, Version 1.0 (the "License");
006: * you may not use this file except in compliance with the License.
007: * You may obtain a copy of the License at
008: *
009: * http://www.opensource.org/licenses/ecl1.php
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017: package edu.iu.uis.eden.batch;
018:
019: import java.io.BufferedReader;
020: import java.io.File;
021: import java.io.FileReader;
022: import java.io.FileWriter;
023: import java.io.IOException;
024: import java.text.Format;
025: import java.text.SimpleDateFormat;
026: import java.util.ArrayList;
027: import java.util.Calendar;
028: import java.util.Collection;
029: import java.util.Date;
030: import java.util.Iterator;
031: import java.util.List;
032:
033: import edu.iu.uis.eden.KEWServiceLocator;
034:
035: /**
036: * Utility class responsible for polling and ingesting XML data files
037: * containing various forms of workflow engine data (e.g. document types
038: * and rules).
039: * Loaded files and problem files are placed into a subdirectory of a
040: * configured 'loaded' and 'problem' directory, respectively.
041: * "Problem-ness" is determined by inspecting a 'processed' flag on each <code>XmlDoc</code>
042: * in each collection. If not all <code>XmlDoc</code>s are marked 'processed' an
043: * error is assumed, and the collection file (e.g. for a Zip, the Zip file) is moved
044: * to the 'problem' directory.
045: * As such, it is the <b><code>XmlIngesterService</code>'s responsibility</b> to mark
046: * any unknown or otherwise innocuous non-failure non-processed files, as 'processed'.
047: * A different mechanism can be developed if this proves to be a problem, but for now
048: * it is simple enough for the <code>XmlIngesterService</code> to determine this.
049: * @see edu.iu.uis.eden.batch.XmlPollerService
050: * @see edu.iu.uis.eden.batch.XmlIngesterServiceImpl
051: * @author Aaron Hamid (arh14 at cornell dot edu)
052: */
053: public class XmlPollerServiceImpl implements XmlPollerService {
054:
055: private static final org.apache.log4j.Logger LOG = org.apache.log4j.Logger
056: .getLogger(XmlPollerServiceImpl.class);
057: private static final Format DIR_FORMAT = new SimpleDateFormat(
058: "yyyy-MM-dd_HH-mm-ss-SSS");
059:
060: /**
061: * Specifies the polling interval that should be used with this task.
062: */
063: private int pollIntervalSecs = 5 * 60; // default to 5 minutes
064: /**
065: * Specifies the initial delay the poller should wait before starting to poll
066: */
067: private int initialDelaySecs = 30; // default to 30 seconds
068: /**
069: * Location in which to find XML files to load.
070: */
071: private String xmlPendingLocation;
072: /**
073: * Location in which to place successfully loaded XML files.
074: */
075: private String xmlCompletedLocation;
076: /**
077: * Location in which to place XML files which have failed to load.
078: */
079: private String xmlProblemLocation;
080:
081: private String xmlParentDirectory;
082: private static final String PENDING_MOVE_FAILED_ARCHIVE_FILE = "movesfailed";
083: private static final String NEW_LINE = "\n";
084:
085: public void run() {
086: // if(!directoriesWritable()){
087: // LOG.error("Error writing to xml data directories. Stopping xmlLoader ...");
088: // this.cancel();
089: // }
090: LOG.debug("checking for xml data files...");
091: File[] files = getXmlPendingDir().listFiles();
092: if (files == null || files.length == 0) {
093: return;
094: }
095: LOG.info("Found " + files.length + " files to ingest.");
096: List<XmlDocCollection> collections = new ArrayList<XmlDocCollection>();
097: for (int i = 0; i < files.length; i++) {
098: if (files[i].isDirectory()) {
099: collections
100: .add(new DirectoryXmlDocCollection(files[i]));
101: } else if (files[i].getName().equals(
102: PENDING_MOVE_FAILED_ARCHIVE_FILE)) {
103: // the movesfailed file...ignore this
104: continue;
105: } else if (files[i].getName().toLowerCase()
106: .endsWith(".zip")) {
107: try {
108: collections.add(new ZipXmlDocCollection(files[i]));
109: } catch (IOException ioe) {
110: LOG.error("Unable to load file: " + files[i]);
111: }
112: } else if (files[i].getName().endsWith(".xml")) {
113: collections.add(new FileXmlDocCollection(files[i]));
114: } else {
115: LOG
116: .warn("Ignoring extraneous file in xml pending directory: "
117: + files[i]);
118: }
119: }
120:
121: // Cull any resources which were already processed and whose moves failed
122: Iterator collectionsIt = collections.iterator();
123: Collection<XmlDocCollection> culled = new ArrayList<XmlDocCollection>();
124: while (collectionsIt.hasNext()) {
125: XmlDocCollection container = (XmlDocCollection) collectionsIt
126: .next();
127: // if a move has already failed for this archive, ignore it
128: if (inPendingMoveFailedArchive(container.getFile())) {
129: LOG.info("Ignoring previously processed resource: "
130: + container);
131: culled.add(container);
132: }
133: }
134: collections.removeAll(culled);
135:
136: if (collections.size() == 0) {
137: LOG.debug("No valid new resources found to ingest");
138: return;
139: }
140:
141: Date LOAD_TIME = Calendar.getInstance().getTime();
142: // synchronization around date format should not be an issue as this code is single-threaded
143: File completeDir = new File(getXmlCompleteDir(), DIR_FORMAT
144: .format(LOAD_TIME));
145: File failedDir = new File(getXmlProblemDir(), DIR_FORMAT
146: .format(LOAD_TIME));
147:
148: // now ingest the containers
149: Collection failed = null;
150: try {
151: failed = KEWServiceLocator.getXmlIngesterService().ingest(
152: collections);
153: } catch (Exception e) {
154: LOG.error("Error ingesting data", e);
155: //throw new RuntimeException(e);
156: }
157:
158: // now iterate through all containers again, and move containers to approprate dir
159: LOG.info("Moving files...");
160: collectionsIt = collections.iterator();
161: while (collectionsIt.hasNext()) {
162: XmlDocCollection container = (XmlDocCollection) collectionsIt
163: .next();
164: LOG.debug("container: " + container);
165: try {
166: // "close" the container
167: // this only matters for ZipFiles for now
168: container.close();
169: } catch (IOException ioe) {
170: LOG.warn("Error closing " + container, ioe);
171: }
172: if (failed.contains(container)) {
173: // some docs must have failed, move the whole
174: // container to the failed dir
175: LOG.error("Moving " + container.getFile()
176: + " to problem dir.");
177: if ((!failedDir.isDirectory() && !failedDir.mkdirs())
178: || !moveFile(failedDir, container.getFile())) {
179: LOG.error("Could not move: " + container.getFile());
180: recordUnmovablePendingFile(container.getFile(),
181: LOAD_TIME);
182: }
183: } else {
184: LOG.info("Moving " + container.getFile()
185: + " to loaded dir.");
186: if ((!completeDir.isDirectory() && !completeDir
187: .mkdirs())
188: || !moveFile(completeDir, container.getFile())) {
189: LOG.error("Could not move: " + container.getFile());
190: recordUnmovablePendingFile(container.getFile(),
191: LOAD_TIME);
192: }
193: }
194: }
195: }
196:
197: private boolean inPendingMoveFailedArchive(File xmlDataFile) {
198: BufferedReader inFile = null;
199: File movesFailedFile = new File(getXmlPendingDir(),
200: PENDING_MOVE_FAILED_ARCHIVE_FILE);
201: if (!movesFailedFile.isFile())
202: return false;
203: try {
204: inFile = new BufferedReader(new FileReader(movesFailedFile));
205: String line;
206:
207: while ((line = inFile.readLine()) != null) {
208: String trimmedLine = line.trim();
209: if (trimmedLine.equals(xmlDataFile.getName())
210: || trimmedLine.startsWith(xmlDataFile.getName()
211: + "=")) {
212: return true;
213: }
214: }
215: } catch (IOException e) {
216: LOG.warn("Error reading file " + movesFailedFile);
217: //TODO try reading the pending file or stop?
218: } finally {
219: if (inFile != null)
220: try {
221: inFile.close();
222: } catch (Exception e) {
223: LOG.warn("Error closing buffered reader for "
224: + movesFailedFile);
225: }
226: }
227:
228: return false;
229: }
230:
231: private boolean recordUnmovablePendingFile(
232: File unMovablePendingFile, Date dateLoaded) {
233: boolean recorded = false;
234: FileWriter archiveFile = null;
235: try {
236: archiveFile = new FileWriter(new File(getXmlPendingDir(),
237: PENDING_MOVE_FAILED_ARCHIVE_FILE), true);
238: archiveFile.write(unMovablePendingFile.getName() + "="
239: + dateLoaded.getTime() + NEW_LINE);
240: recorded = true;
241: } catch (IOException e) {
242: LOG.error("Unable to record unmovable pending file "
243: + unMovablePendingFile.getName()
244: + "in the archive file "
245: + PENDING_MOVE_FAILED_ARCHIVE_FILE);
246: } finally {
247: if (archiveFile != null) {
248: try {
249: archiveFile.close();
250: } catch (IOException ioe) {
251: LOG.error("Error closing unmovable pending file",
252: ioe);
253: }
254: }
255: }
256: return recorded;
257: }
258:
259: private boolean moveFile(File toDirectory, File fileToMove) {
260: boolean moved = true;
261: if (!fileToMove.renameTo(new File(toDirectory.getPath(),
262: fileToMove.getName()))) {
263: LOG.error("Unable to move file " + fileToMove.getName()
264: + " to directory " + toDirectory.getPath());
265: moved = false;
266: }
267: return moved;
268: }
269:
270: private File getXmlPendingDir() {
271: return new File(getXmlPendingLocation());
272: }
273:
274: private File getXmlCompleteDir() {
275: return new File(getXmlCompletedLocation());
276: }
277:
278: private File getXmlProblemDir() {
279: return new File(getXmlProblemLocation());
280: }
281:
282: public String getXmlCompletedLocation() {
283: return xmlCompletedLocation;
284: }
285:
286: public void setXmlCompletedLocation(String xmlCompletedLocation) {
287: this .xmlCompletedLocation = xmlCompletedLocation;
288: }
289:
290: public String getXmlPendingLocation() {
291: return xmlPendingLocation;
292: }
293:
294: /*public boolean validate(File uploadedFile) {
295: XmlDataLoaderFileFilter filter = new XmlDataLoaderFileFilter();
296: return filter.accept(uploadedFile);
297: }*/
298:
299: public void setXmlPendingLocation(String xmlPendingLocation) {
300: this .xmlPendingLocation = xmlPendingLocation;
301: }
302:
303: public String getXmlProblemLocation() {
304: return xmlProblemLocation;
305: }
306:
307: public void setXmlProblemLocation(String xmlProblemLocation) {
308: this .xmlProblemLocation = xmlProblemLocation;
309: }
310:
311: public String getXmlParentDirectory() {
312: return xmlParentDirectory;
313: }
314:
315: public void setXmlParentDirectory(String xmlDataParentDirectory) {
316: this .xmlParentDirectory = xmlDataParentDirectory;
317: }
318:
319: /**
320: * Sets the polling interval time in seconds
321: * @param seconds the polling interval time in seconds
322: */
323: public void setPollIntervalSecs(int seconds) {
324: this .pollIntervalSecs = seconds;
325: }
326:
327: /**
328: * Gets the polling interval time in seconds
329: * @return the polling interval time in seconds
330: */
331: public int getPollIntervalSecs() {
332: return this .pollIntervalSecs;
333: }
334:
335: /**
336: * Sets the initial delay time in seconds
337: * @param seconds the initial delay time in seconds
338: */
339: public void setInitialDelaySecs(int seconds) {
340: this .initialDelaySecs = seconds;
341: }
342:
343: /**
344: * Gets the initial delay time in seconds
345: * @return the initial delay time in seconds
346: */
347: public int getInitialDelaySecs() {
348: return this.initialDelaySecs;
349: }
350: }
|