001: /*
002: * ChainBuilder ESB
003: * Visual Enterprise Integration
004: *
005: * Copyright (C) 2006 Bostech Corporation
006: *
007: * This program is free software; you can redistribute it and/or modify
008: * it under the terms of the GNU General Public License as published by
009: * the Free Software Foundation; either version 2 of the License, or
010: * (at your option) any later version.
011: *
012: * This program is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015: * General Public License for more details.
016: *
017: * You should have received a copy of the GNU General Public License
018: * along with this program; if not, write to the Free Software
019: * Foundation, Inc.,59 Temple Place, Suite 330, Boston, MA 02111-1307
020: * USA
021: *
022: *
023: * $Id: ExternalInput.java 9658 2007-10-17 17:01:53Z mpreston $
024: */
025: package com.bostechcorp.cbesb.runtime.ccsl.lib;
026:
027: import java.io.BufferedInputStream;
028: import java.io.BufferedReader;
029: import java.io.ByteArrayInputStream;
030: import java.io.ByteArrayOutputStream;
031: import java.io.IOException;
032: import java.io.InputStream;
033: import java.io.InputStreamReader;
034: import java.io.Reader;
035: import java.io.StringReader;
036: import java.io.StringWriter;
037:
038: import javax.jbi.messaging.MessagingException;
039: import javax.jbi.messaging.NormalizedMessage;
040: import javax.xml.transform.Source;
041: import javax.xml.transform.stream.StreamSource;
042:
043: import org.apache.commons.logging.Log;
044: import org.apache.commons.logging.LogFactory;
045:
046: import com.bostechcorp.cbesb.runtime.ccsl.jbi.messaging.ServiceDescriptionHandler;
047: import com.bostechcorp.cbesb.runtime.ccsl.nmhandler.ByteArraySource;
048: import com.bostechcorp.cbesb.runtime.ccsl.nmhandler.NormalizedMessageHandler;
049: import com.bostechcorp.cbesb.runtime.ccsl.nmhandler.StringSource;
050:
051: /**
052: * Read external input from a Reader or InputStream and transfer to normalized messages based on the
053: * read settings.
054: */
055: public class ExternalInput {
056:
057: public static final Log log = LogFactory
058: .getLog(ExternalInput.class);
059:
060: private final static String[] READ_STYLES = { "Read Style", "raw",
061: "newline" };
062: private final static int READ_STYLE_RAW = 0;
063: private final static int READ_STYLE_NEWLINE = 1;
064:
065: private final static String[] REC_TYPES = { "Record Type", "xml",
066: "string", "binary" };
067: private final static int REC_TYPE_XML = 0;
068: private final static int REC_TYPE_STRING = 1;
069: private final static int REC_TYPE_BINARY = 2;
070:
071: private final static String IDENTITY_CHARSET = "ISO8859-1";
072:
073: private BufferedReader reader;
074: private BufferedInputStream inputStream;
075: private String charset;
076: private int readStyle;
077: private int recordType;
078: private int recordsPerMessage;
079: private boolean mayHaveMoreData = true;
080: private String nextString;
081:
082: /*
083: * Construct the external input source based on a Reader or an InputStream
084: */
085: public ExternalInput(Reader rdr, String cs, String rs, String rt,
086: int rpm) throws Exception {
087: setupParameters(cs, rs, rt, rpm);
088: if (recordType != REC_TYPE_BINARY
089: || readStyle == READ_STYLE_NEWLINE) {
090: // If we have a reader and want binary then set up a stream using the character set
091: reader = new BufferedReader(rdr);
092: } else
093: inputStream = new BufferedInputStream(
094: new ReaderToInputStream(rdr, charset));
095: }
096:
097: public ExternalInput(InputStream instr, String cs, String rs,
098: String rt, int rpm) throws Exception {
099: setupParameters(cs, rs, rt, rpm);
100: if (recordType == REC_TYPE_STRING
101: || readStyle == READ_STYLE_NEWLINE) {
102: // If we want binary and newline then use an identity charset to avoid changing the
103: // data when converting to a reader and back.
104: if (recordType == REC_TYPE_BINARY)
105: charset = IDENTITY_CHARSET;
106: // If we have an input stream but want characters then set up a reader
107: reader = new BufferedReader(new InputStreamReader(instr,
108: charset));
109: } else
110: inputStream = new BufferedInputStream(instr);
111: }
112:
113: public boolean hasMoreData() {
114: boolean result = false;
115: if (mayHaveMoreData) {
116: if (readStyle == READ_STYLE_RAW)
117: result = true;
118: else {
119: if (nextString != null)
120: result = true;
121: else {
122: try {
123: nextString = reader.readLine();
124: } catch (IOException e) {
125: log.error("Error reading input:: "
126: + e.getMessage());
127: if (log.isDebugEnabled()) {
128: log.debug("Error reading input::", e);
129: }
130: }
131: if (nextString != null)
132: result = true;
133: else {
134: mayHaveMoreData = false;
135: }
136: }
137: }
138: }
139: return result;
140: }
141:
142: /*
143: * Use input data to populate a normalized message. Return the number of records added.
144: */
145: public int populateMessage(NormalizedMessage nm)
146: throws MessagingException {
147: NormalizedMessageHandler nmh = new NormalizedMessageHandler(nm);
148: if (readStyle == READ_STYLE_NEWLINE)
149: populateNewline(nmh);
150: else
151: populateRaw(nmh);
152: nmh.generateMessageContent();
153: return nmh.getRecordCount();
154: }
155:
156: public int populateMessage(NormalizedMessage nm,
157: ServiceDescriptionHandler svcDescHandler)
158: throws MessagingException {
159:
160: NormalizedMessageHandler nmh = new NormalizedMessageHandler(nm,
161: svcDescHandler);
162: if (readStyle == READ_STYLE_NEWLINE)
163: populateNewline(nmh);
164: else
165: populateRaw(nmh);
166: nmh.generateMessageContent();
167: return nmh.getRecordCount();
168: }
169:
170: /*
171: * newline style always has a reader and uses readLine. Read up to recordsPerMessage
172: * new records.
173: */
174: private void populateNewline(NormalizedMessageHandler nmh) {
175: int newRecords = 0;
176: try {
177: for (;;) {
178: String this String;
179: Source src = null;
180: if (nextString != null) {
181: this String = nextString;
182: nextString = null;
183: } else {
184: this String = reader.readLine();
185: if (this String == null) {
186: mayHaveMoreData = false;
187: break;
188: }
189: }
190: if (recordType == REC_TYPE_XML) {
191: src = new StreamSource(new StringReader(this String));
192: } else if (recordType == REC_TYPE_BINARY) {
193: src = new ByteArraySource(this String
194: .getBytes(charset));
195: } else {
196: src = new StringSource(this String);
197: }
198: nmh.addRecord(src);
199: if (recordsPerMessage > 0
200: && (++newRecords == recordsPerMessage))
201: break;
202: }
203: } catch (IOException e) {
204: ioErrorWarn(e);
205: }
206: }
207:
208: /*
209: * raw style reads the entire stream at once
210: */
211: private void populateRaw(NormalizedMessageHandler nmh) {
212: mayHaveMoreData = false;
213: Source src = null;
214: if (recordType == REC_TYPE_XML) {
215: if (reader != null)
216: src = new StreamSource(reader);
217: else
218: src = new StreamSource(inputStream);
219: } else if (recordType == REC_TYPE_BINARY) {
220: ByteArrayOutputStream baos = new ByteArrayOutputStream();
221: byte[] buffer = new byte[2048];
222: int len;
223: try {
224: while ((len = inputStream.read(buffer)) > 0)
225: baos.write(buffer, 0, len);
226: } catch (IOException e) {
227: ioErrorWarn(e);
228: }
229: src = new ByteArraySource(baos.toByteArray());
230: } else {
231: StringWriter sw = new StringWriter();
232: char[] buffer = new char[2048];
233: int len;
234: try {
235: while ((len = reader.read(buffer)) > 0)
236: sw.write(buffer, 0, len);
237: } catch (IOException e) {
238: ioErrorWarn(e);
239: }
240: src = new StringSource(sw.toString());
241: }
242: nmh.addRecord(src);
243: }
244:
245: private void setupParameters(String cs, String rs, String rt,
246: int rpm) throws Exception {
247: charset = cleanupCharset(cs);
248: readStyle = findInArray(rs, READ_STYLES);
249: recordType = findInArray(rt, REC_TYPES);
250: recordsPerMessage = rpm;
251: }
252:
253: private static final String defaultCharset = (new InputStreamReader(
254: new ByteArrayInputStream(new byte[0]))).getEncoding();
255:
256: private String cleanupCharset(String cs) {
257: String result = null;
258: if (cs == null || cs.length() < 1)
259: result = defaultCharset;
260: else
261: result = cs;
262: return result;
263: }
264:
265: private int findInArray(String s, String arr[]) throws Exception {
266: for (int i = 1; i < arr.length; i++)
267: if (s.equals(arr[i]))
268: return i - 1;
269: throw new Exception("invalid " + arr[0] + " \"" + s + "\"");
270: }
271:
272: private void ioErrorWarn(Exception e) {
273: log.error("Error reading external input: " + e.getMessage());
274: if (log.isDebugEnabled()) {
275: log.debug("Error reading external input:", e);
276: }
277: }
278: }
|