001: /*
002: * ChainBuilder ESB
003: * Visual Enterprise Integration
004: *
005: * Copyright (C) 2007 Bostech Corporation
006: *
007: * This program is free software; you can redistribute it and/or modify it
008: * under the terms of the GNU General Public License as published by the
009: * Free Software Foundation; either version 2 of the License, or (at your option)
010: * any later version.
011: *
012: * This program is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
014: * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
015: * for more details.
016: *
017: * You should have received a copy of the GNU General Public License along with
018: * this program; if not, write to the Free Software Foundation, Inc.,
019: * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
020: *
021: *
022: * $Id: FileConsumerHandler.java 12161 2008-02-29 19:57:33Z mpreston $
023: */
024: package com.bostechcorp.cbesb.runtime.component.file.processors;
025:
026: import java.io.File;
027: import java.io.FileInputStream;
028: import java.io.IOException;
029: import java.io.InputStream;
030:
031: import javax.jbi.JBIException;
032: import javax.jbi.messaging.MessageExchange;
033: import javax.jbi.messaging.NormalizedMessage;
034:
035: import org.apache.commons.logging.Log;
036: import org.apache.commons.logging.LogFactory;
037:
038: import com.bostechcorp.cbesb.common.constant.MetadataConstants;
039: import com.bostechcorp.cbesb.common.runtime.CbesbException;
040: import com.bostechcorp.cbesb.common.util.ErrorUtil;
041: import com.bostechcorp.cbesb.runtime.ccsl.jbi.messaging.IConsumerHandlerContext;
042: import com.bostechcorp.cbesb.runtime.ccsl.jbi.messaging.ScheduledProcessHandler;
043: import com.bostechcorp.cbesb.runtime.ccsl.lib.ExternalInput;
044: import com.bostechcorp.cbesb.runtime.component.file.FileEndpoint;
045: import com.bostechcorp.cbesb.runtime.file.DirectoryScanner;
046: import com.bostechcorp.cbesb.runtime.file.FileCounter;
047: import com.bostechcorp.cbesb.runtime.file.FileOperator;
048:
049: public class FileConsumerHandler extends ScheduledProcessHandler {
050:
051: protected static final String FILE_COMPLETE_ACTION_DELETE = "delete";
052: protected static final String FILE_COMPLETE_ACTION_ARCHIVE = "archive";
053:
054: protected static final String FILE_PATTERN_ASTERISK = "*";
055:
056: protected static final String REPLY_WRITE_STYLE_RAW = "raw";
057: protected static final String REPLY_WRITE_STYLE_NEWLINE = "newline";
058:
059: protected final transient Log logger = LogFactory
060: .getLog(getClass());
061:
062: protected FileEndpoint endpoint;
063: protected DirectoryScanner scanner;
064: protected FileWriterUtil fileWriterUtil;
065: private File archCounterFile;
066:
067: public FileConsumerHandler(FileEndpoint endpoint) {
068: super (endpoint);
069: this .endpoint = endpoint;
070:
071: }
072:
073: /* (non-Javadoc)
074: * @see com.bostechcorp.cbesb.runtime.ccsl.jbi.messaging.ScheduledProcessHandler#doStart()
075: */
076: @Override
077: protected void doStart() throws JBIException {
078: scanner = new DirectoryScanner();
079: File srcDir = new File(endpoint.getSourceDir());
080: if (!srcDir.exists()) {
081: srcDir.mkdirs();
082: }
083: scanner.setDir(srcDir);
084: scanner.setMatchMode(endpoint.getMatchMode());
085: scanner.setPattern(endpoint.getFilePattern());
086: scanner.setTwoPass(endpoint.isTwoPass());
087: scanner.setTwoPassInterval(endpoint.getTwoPassInterval());
088:
089: File rootPath = new File(endpoint.getServiceUnit()
090: .getRootPath());
091: archCounterFile = new File(rootPath, endpoint.getEndpoint()
092: + ".cnt");
093:
094: fileWriterUtil = new FileWriterUtil();
095: fileWriterUtil.setCounterFile(new File(rootPath, endpoint
096: .getEndpoint()
097: + "_reply.cnt"));
098: fileWriterUtil.setStageDir(null);
099: fileWriterUtil.setDefaultDestDir(endpoint.getReplyDir());
100: fileWriterUtil.setDefaultFilePattern(endpoint
101: .getReplyFilePattern());
102: fileWriterUtil.setDefaultWriteStyle(endpoint
103: .getReplyWriteStyle());
104: fileWriterUtil.setDefaultCharset(endpoint.getReplyCharset());
105:
106: scanStageDir();
107: }
108:
109: /* (non-Javadoc)
110: * @see com.bostechcorp.cbesb.runtime.ccsl.jbi.messaging.ScheduledProcessHandler#doProcessFault(javax.jbi.messaging.NormalizedMessage, java.lang.String)
111: */
112: @Override
113: protected void doProcessFault(NormalizedMessage nm, String fault)
114: throws Exception {
115: super .doProcessFault(nm, fault);
116:
117: }
118:
119: /* (non-Javadoc)
120: * @see com.bostechcorp.cbesb.runtime.ccsl.jbi.messaging.ScheduledProcessHandler#doProcessOut(javax.jbi.messaging.NormalizedMessage, java.lang.String, javax.jbi.messaging.MessageExchange)
121: */
122: @Override
123: protected void doProcessOut(NormalizedMessage nm, String outStr,
124: MessageExchange exchange) throws JBIException,
125: CbesbException {
126: fileWriterUtil.writeMessageToFile(nm, exchange);
127: }
128:
129: /* (non-Javadoc)
130: * @see com.bostechcorp.cbesb.runtime.ccsl.jbi.messaging.ScheduledProcessHandler#doTrigger()
131: */
132: @Override
133: protected boolean doTrigger() {
134: boolean filesProcessed = false;
135: try {
136: String[] fileList = scanner.doScan();
137:
138: for (int i = 0; i < fileList.length; i++) {
139: // move the file from SourceDir to StageDir
140: FileOperator.moveFile(new File(endpoint.getSourceDir(),
141: fileList[i]), endpoint.getStageDir());
142:
143: File processFile = new File(endpoint.getStageDir(),
144: fileList[i]);
145:
146: // read the file into normalized messages.
147: FileInputStream inputStream = new FileInputStream(
148: processFile);
149: try {
150: int recordsPerMessage = endpoint
151: .getRecordsPerMessage();
152: String readStyle = endpoint.getReadStyle();
153: String recordType = endpoint.getRecordType();
154: String fileInputCharset = endpoint.getCharset();
155:
156: FileConsumerHandlerContext context = new FileConsumerHandlerContext();
157: context.setInputCharset(fileInputCharset);
158: context.setInputFilename(fileList[i]);
159: context.setInputReadStyle(readStyle);
160: // most processing is now encapsulated in the ExternalInput object
161: ExternalInput ext = new ExternalInput(inputStream,
162: fileInputCharset, readStyle, recordType,
163: recordsPerMessage);
164: while (ext.hasMoreData())
165: process(ext, context);
166: filesProcessed = true;
167: } finally {
168: inputStream.close();
169: }
170:
171: // post-read processing
172: if (FILE_COMPLETE_ACTION_ARCHIVE.equals(endpoint
173: .getFileCompleteAction())
174: && endpoint.getArchiveDir() != null) {
175: int cnt = 0;
176: if (endpoint.getArchiveFilePattern().contains(
177: "{COUNT}")) {
178: FileCounter fc = new FileCounter(
179: archCounterFile);
180: cnt = fc.getNextValue();
181: }
182: FileOperator.archiveFile(processFile, endpoint
183: .getArchiveDir(), endpoint
184: .getArchiveFilePattern(), cnt);
185: FileOperator.deleteFile(processFile);
186: } else if (FILE_COMPLETE_ACTION_DELETE.equals(endpoint
187: .getFileCompleteAction())) {
188: FileOperator.deleteFile(processFile);
189: }
190: }
191:
192: } catch (Exception e) {
193: ErrorUtil.printWarn(
194: "General Exception in processing source directory '"
195: + endpoint.getSourceDir() + "''. - "
196: + e.getMessage(), e);
197: // logger.error("Exception in scanning source directory '" + endpoint.getSourceDir() + "': " + e.getMessage() );
198: // if (logger.isErrorEnabled())
199: // logger.debug(e.getMessage(), e);
200:
201: return false;
202: }
203:
204: return filesProcessed;
205: }
206:
207: /**
208: * Scan the StageDir to chick if any files are present in it, any file that exist was
209: * not completed when the component was stopped last time.
210: */
211: private void scanStageDir() {
212: String stageDir = endpoint.getStageDir();
213: String holdDir = endpoint.getHoldDir();
214: String sourceDir = endpoint.getSourceDir();
215: boolean isHold = endpoint.isHold();
216: FileOperator.scanStageDir(stageDir, holdDir, sourceDir, isHold);
217: }
218:
219: protected void transform(Object data, MessageExchange me,
220: IConsumerHandlerContext context) throws Exception {
221:
222: super .transform(data, me, context);
223:
224: FileConsumerHandlerContext ctx = (FileConsumerHandlerContext) context;
225: me.setProperty(MetadataConstants.FILE_INPUT_FILENAME, ctx
226: .getInputFilename());
227: me.setProperty(MetadataConstants.FILE_INPUT_CHARSET, ctx
228: .getInputCharset());
229: me.setProperty(MetadataConstants.FILE_INPUT_READSTYLE, ctx
230: .getInputReadStyle());
231:
232: }
233:
234: }
|