001: /*
002: * $Id: DelimitedFlatfileTable.java,v 1.18 2006/01/10 21:02:36 ahimanikya Exp $
003: * =======================================================================
004: * Copyright (c) 2002-2004 Axion Development Team. All rights reserved.
005: *
006: * Redistribution and use in source and binary forms, with or without
007: * modification, are permitted provided that the following conditions
008: * are met:
009: *
010: * 1. Redistributions of source code must retain the above
011: * copyright notice, this list of conditions and the following
012: * disclaimer.
013: *
014: * 2. Redistributions in binary form must reproduce the above copyright
015: * notice, this list of conditions and the following disclaimer in
016: * the documentation and/or other materials provided with the
017: * distribution.
018: *
019: * 3. The names "Tigris", "Axion", nor the names of its contributors may
020: * not be used to endorse or promote products derived from this
021: * software without specific prior written permission.
022: *
023: * 4. Products derived from this software may not be called "Axion", nor
024: * may "Tigris" or "Axion" appear in their names without specific prior
025: * written permission.
026: *
027: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
028: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
029: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
030: * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
031: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
032: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
033: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
034: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
035: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
036: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
037: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
038: * =======================================================================
039: */
040:
041: package org.axiondb.engine.tables;
042:
043: import java.io.CharArrayWriter;
044: import java.io.EOFException;
045: import java.io.IOException;
046: import java.io.ObjectInputStream;
047: import java.io.ObjectOutputStream;
048: import java.sql.Types;
049: import java.util.ArrayList;
050: import java.util.Arrays;
051: import java.util.HashSet;
052: import java.util.Properties;
053: import java.util.Set;
054: import java.util.StringTokenizer;
055: import java.util.regex.Pattern;
056:
057: import org.axiondb.AxionException;
058: import org.axiondb.DataType;
059: import org.axiondb.Database;
060: import org.axiondb.ExternalTable;
061: import org.axiondb.Row;
062: import org.axiondb.engine.rows.SimpleRow;
063: import org.axiondb.io.BufferedDataInputStream;
064: import org.axiondb.io.BufferedDataOutputStream;
065:
066: /**
067: * A disk-resident Delimited Flatfile {@link org.axiondb.Table}.<br>
068: *
069: * TODO: Support for multiple delimiter for field and record
070: * TODO: Support for treating consecutive delimiter as one
071: *
072: * @version $Revision: 1.18 $ $Date: 2006/01/10 21:02:36 $
073: * @author Ahimanikya Satapathy
074: * @author Jonathan Giron
075: */
076: public class DelimitedFlatfileTable extends BaseFlatfileTable {
077:
078: private static final String EMPTY_STRING = "";
079: private static final String COMMA = ",";
080: private static final char NL = Character.MAX_VALUE;
081: public static final String PROP_FIELDDELIMITER = "FIELDDELIMITER"; // NOI18N
082: public static final String PROP_QUALIFIER = "QUALIFIER"; // NOI18N
083:
084: byte[] QUALIFIER_BYTES;
085: byte[] EMPTY_STRING_BYTES = EMPTY_STRING.getBytes();
086: byte[] LINESEP_BYTES;
087: byte[] FIELDSEP_BYTES;
088:
089: private static final Set PROPERTY_KEYS = new HashSet(2);
090:
091: /** Set of required keys for organization properties */
092: private static final Set REQUIRED_KEYS = new HashSet(1);
093:
094: static {
095: PROPERTY_KEYS.add(PROP_FIELDDELIMITER);
096: PROPERTY_KEYS.add(PROP_QUALIFIER);
097: }
098:
099: public DelimitedFlatfileTable(String name, Database db)
100: throws AxionException {
101: super (name, db, new DelimitedFlatfileTableLoader());
102: setType(ExternalTable.DELIMITED_TABLE_TYPE);
103: }
104:
105: protected String getDefaultDataFileExtension() {
106: return "csv";
107: }
108:
109: protected String getQualifier() {
110: return _qualifier;
111: }
112:
113: protected int getQualifierLength() {
114: return _qualifier.length();
115: }
116:
117: protected Row getRowByOffset(int idToAssign, long ptr)
118: throws AxionException {
119: BufferedDataInputStream data = getInputStream();
120: int colCount = getColumnCount();
121: Row row = new SimpleRow(idToAssign, colCount);
122:
123: try {
124: synchronized (data) {
125: char[] charArray = readLine(data, ptr);
126: if (charArray[0] == NL) {
127: throw new AxionException(
128: "Empty line detected - invalid.");
129: }
130:
131: CharTokenizer charTokenizer = new CharTokenizer(
132: charArray, _fieldSep);
133:
134: for (int i = 0; i < colCount
135: && charTokenizer.hasMoreTokens(); i++) {
136: String columnValue = charTokenizer.nextToken();
137: row = trySettingColumn(idToAssign, row, i,
138: columnValue);
139: }
140: }
141: } catch (Exception e) {
142: if (e instanceof AxionException) {
143: throw (AxionException) e;
144: }
145: throw new AxionException(e);
146: }
147: return row;
148: }
149:
150: protected boolean isQuoted() {
151: return !isNullString(_qualifier);
152: }
153:
154: protected boolean isEndOfRecord(int recLength, int nextChar,
155: BufferedDataInputStream data) throws IOException {
156: if (isEOF(nextChar)) {
157: return true;
158: }
159:
160: boolean foundEOL = false;
161: for (int k = 0; (k < _lineSeps.length && !foundEOL); k++) {
162: String lineSep = _lineSeps[k];
163: if (!("".equals(lineSep)) && lineSep.charAt(0) == nextChar) {
164: foundEOL = true;
165: char[] charBuf = lineSep.toCharArray();
166: // Look ahead to see whether the following chars match EOL.
167: long lastDataFileOffset = data.getPos();
168: for (int i = 1, I = lineSep.length(); i < I; i++) {
169: if (charBuf[i] != (char) data.read()) {
170: data.seek(lastDataFileOffset);
171: foundEOL = false;
172: }
173: }
174: }
175: }
176: return foundEOL;
177: }
178:
179: public boolean loadExternalTable(Properties props)
180: throws AxionException {
181: context = new DelimitedTableOrganizationContext();
182: return super .loadExternalTable(props);
183: }
184:
185: public Properties getTableProperties() {
186: return context.getTableProperties();
187: }
188:
189: protected void parseTableProperties(ObjectInputStream in)
190: throws AxionException {
191: try {
192: _lineSep = in.readUTF();
193: _fieldSep = in.readUTF();
194: _isFirstLineHeader = Boolean.valueOf(in.readUTF())
195: .booleanValue();
196: _fileName = in.readUTF();
197: in.readUTF(); // _eol will be computed, keep for older version metadata
198: _qualifier = in.readUTF();
199: in.readUTF(); // _quoted will be computed, keep for older version metadata
200:
201: try {
202: _rowCount = in.readInt();
203: } catch (EOFException ignore) {
204: // Goes here if metadata from an older version is parsed - ignore.
205: }
206:
207: context = new DelimitedTableOrganizationContext();
208: context.updateProperties();
209: context.readOrSetDefaultProperties(context
210: .getTableProperties());
211: createOrLoadDataFile();
212:
213: } catch (IOException ioex) {
214: throw new AxionException(
215: "Unable to parse meta file for table " + getName(),
216: ioex);
217: }
218: }
219:
220: protected void writeHeader(BufferedDataOutputStream dataFile)
221: throws AxionException {
222: if (_isFirstLineHeader) {
223: try {
224: CharArrayWriter header = new CharArrayWriter();
225: for (int i = 0, I = getColumnCount(); i < I; i++) {
226:
227: if (i != 0) {
228: header.write(_fieldSep);
229: }
230: header.write(getColumn(i).getName());
231: }
232: header.write(_preferredLineSep);
233: dataFile.write(header.toString().getBytes());
234: header.close();
235: } catch (IOException ioex) {
236: throw new AxionException(
237: "Unable to write header for table: "
238: + getName(), ioex);
239: }
240: }
241: }
242:
243: protected void writeRow(BufferedDataOutputStream buffer, Row row)
244: throws AxionException {
245: Object colValue = null;
246: DataType type = null;
247:
248: QUALIFIER_BYTES = _qualifier.getBytes();
249: LINESEP_BYTES = _preferredLineSep.getBytes();
250: FIELDSEP_BYTES = _fieldSep.getBytes();
251:
252: try {
253: for (int i = 0, I = getColumnCount(); i < I; i++) {
254: colValue = row.get(i);
255: type = getColumn(i).getDataType();
256:
257: if (i != 0) {
258: buffer.write(FIELDSEP_BYTES);
259: }
260: byte[] qualifier = isEscapeRequired(type) ? QUALIFIER_BYTES
261: : EMPTY_STRING_BYTES;
262: if (colValue != null) {
263: buffer.write(qualifier);
264: String val = type.toString(colValue);
265: if (isQuoted() && val.indexOf(_qualifier) != -1) {
266: // escape the quealifier in the data string.
267: val = _qPattern.matcher(val).replaceAll(
268: _qualifier + _qualifier);
269: }
270: buffer.write(val.getBytes());
271: buffer.write(qualifier);
272: } else {
273: buffer.write(EMPTY_STRING_BYTES); // Write Null column
274: }
275: }
276: // write new line
277: buffer.write(LINESEP_BYTES);
278: } catch (IOException e) {
279: throw new AxionException("Error writing row: " + row, e);
280: }
281: }
282:
283: protected void writeTableProperties(ObjectOutputStream out)
284: throws AxionException {
285: try {
286: if (_lineSep != null && _fieldSep != null
287: && _fileName != null) {
288: out.writeUTF(_lineSep);
289: out.writeUTF(_fieldSep);
290: out.writeUTF(Boolean.toString(_isFirstLineHeader));
291: out.writeUTF(_fileName);
292: out.writeUTF(_lineSep);
293: out.writeUTF(_qualifier);
294: out.writeUTF("true");
295: out.writeInt(_rowsToSkip);
296: }
297: } catch (IOException ioex) {
298: throw new AxionException(
299: "Unable to write meta file for table " + getName(),
300: ioex);
301: }
302: }
303:
304: private boolean isEscapeRequired(DataType type) {
305: switch (type.getJdbcType()) {
306: case Types.CHAR:
307: case Types.DATE:
308: case Types.TIME:
309: case Types.TIMESTAMP:
310: case Types.VARCHAR:
311: return true;
312: default:
313: return false;
314: }
315: }
316:
317: private char[] readLine(BufferedDataInputStream data,
318: long fileOffset) throws AxionException {
319: Arrays.fill(_lineCharArray, FILLER);
320: int recLength = 0;
321: try {
322: int nextChar;
323: data.seek(fileOffset);
324:
325: while (true) {
326: nextChar = data.read();
327: if (isEndOfRecord(recLength, nextChar, data)) {
328: _lineCharArray[recLength] = NL;
329: break;
330: }
331:
332: // ensure capacity
333: if ((recLength + 2) > _lineCharArray.length) {
334: char[] newlineCharArray = new char[recLength + 80];
335: System.arraycopy(_lineCharArray, 0,
336: newlineCharArray, 0, _lineCharArray.length);
337: _lineCharArray = newlineCharArray;
338: }
339:
340: _lineCharArray[recLength++] = ((char) nextChar);
341: }
342: return _lineCharArray;
343:
344: } catch (IOException e) {
345: throw new AxionException("Unable to parse data file: ", e);
346: }
347: }
348:
349: protected boolean isNewLine(int nextChar) {
350: return nextChar == NL;
351: }
352:
353: private class DelimitedTableOrganizationContext extends
354: BaseFlatfileTableOrganizationContext {
355: public Set getPropertyKeys() {
356: Set baseKeys = super .getPropertyKeys();
357: Set keys = new HashSet(baseKeys.size()
358: + PROPERTY_KEYS.size());
359: keys.addAll(baseKeys);
360: keys.addAll(PROPERTY_KEYS);
361:
362: return keys;
363: }
364:
365: public void readOrSetDefaultProperties(Properties props)
366: throws AxionException {
367: super .readOrSetDefaultProperties(props);
368:
369: String rawFieldSep = props.getProperty(PROP_FIELDDELIMITER);
370: if (isNullString(rawFieldSep)) {
371: rawFieldSep = COMMA;
372: }
373: _fieldSep = fixEscapeSequence(rawFieldSep);
374:
375: // default line separator is new line
376: String lineSep = System.getProperty("line.separator");
377: if ("".equals(_lineSep)) {
378: _lineSep = fixEscapeSequence(lineSep);
379: }
380:
381: // Support multiple record delimiter for delimited
382: StringTokenizer tokenizer = new StringTokenizer(_lineSep,
383: " ");
384: ArrayList tmpList = new ArrayList();
385: while (tokenizer.hasMoreTokens()) {
386: String token = tokenizer.nextToken();
387: tmpList.add(token);
388: if (token.equals(lineSep)) {
389: _preferredLineSep = token;
390: }
391: }
392: _lineSeps = (String[]) tmpList.toArray(new String[0]);
393:
394: // determine the delimiter to be used for writing line
395: if (_preferredLineSep == null
396: || _preferredLineSep.length() == 0) {
397: _preferredLineSep = _lineSeps[0];
398: }
399:
400: _qualifier = fixEscapeSequence(props
401: .getProperty(PROP_QUALIFIER));
402: if (isNullString(_qualifier)) {
403: _qualifier = EMPTY_STRING;
404: } else {
405: _qPattern = Pattern.compile(_qualifier);
406: _qqPattern = Pattern.compile(_qualifier + _qualifier);
407: }
408:
409: }
410:
411: public void updateProperties() {
412: super .updateProperties();
413:
414: _props.setProperty(PROP_LOADTYPE,
415: ExternalTableFactory.TYPE_DELIMITED);
416: _props.setProperty(PROP_FIELDDELIMITER,
417: addEscapeSequence(_fieldSep));
418: _props.setProperty(PROP_QUALIFIER, _qualifier);
419: }
420:
421: public Set getRequiredPropertyKeys() {
422: Set baseRequiredKeys = getBaseRequiredPropertyKeys();
423: Set keys = new HashSet(baseRequiredKeys.size()
424: + REQUIRED_KEYS.size());
425: keys.addAll(baseRequiredKeys);
426: keys.addAll(REQUIRED_KEYS);
427:
428: return keys;
429: }
430: }
431:
432: class CharTokenizer {
433: char[] _charArray;
434:
435: private int _currentPosition;
436:
437: private String _delimiters;
438: private int _maxPosition;
439:
440: public CharTokenizer(char[] thecharArray, String theDelim) {
441: _delimiters = theDelim;
442: _charArray = thecharArray;
443: _maxPosition = _charArray.length;
444: _currentPosition = 0;
445: }
446:
447: public boolean hasMoreTokens() {
448: return (_currentPosition < _maxPosition);
449: }
450:
451: public String nextToken() {
452: int start = _currentPosition;
453: int end = start;
454: int pos = _currentPosition;
455: boolean inQuotedString = false;
456: boolean endQuotedString = false;
457: boolean treatAsUnquoted = false;
458: boolean wasEscaped = false;
459:
460: while (pos < _maxPosition) {
461: // if new line
462: if (isNewLine(_charArray[pos])) {
463: if (isQuoted() && !endQuotedString) {
464: _maxPosition = pos;
465: _currentPosition = pos;
466: end = pos;
467: break;
468: }
469: _currentPosition = _maxPosition;
470: }
471:
472: // if quoted and found qualifier
473: if (isQuoted() && isQualifier(pos)) {
474: if (!inQuotedString) { // not inside the quoted string
475: pos += getQualifierLength();
476: start = pos;
477: inQuotedString = true;
478: continue;
479: } else if (isQualifier(pos + getQualifierLength())) {
480: pos += (getQualifierLength() * 2);
481: wasEscaped = true;
482: continue;
483: }
484: // inside the quoted string
485: end = pos;
486: pos += getQualifierLength();
487: inQuotedString = false;
488: endQuotedString = true;
489: continue;
490: }
491:
492: // if quoted, close quote found, but have not found a delimiter yet
493: if (isQuoted() && endQuotedString
494: && _delimiters.charAt(0) != _charArray[pos]
495: && !isNewLine(_charArray[pos])) {
496: pos++;
497: continue;
498: }
499:
500: // if quoted, close quote found and found a delimiter
501: if (isQuoted() && endQuotedString) {
502: if (isDelimiter(pos)) {
503: pos += _delimiters.length();
504: break;
505: } else if (isNewLine(_charArray[pos])) {
506: break;
507: }
508: }
509:
510: // if quoted but did not find start qualifer, treat this token as
511: // unquoted
512: if (isQuoted() && !inQuotedString) {
513: treatAsUnquoted = true;
514: }
515:
516: // if non-quoted
517: if ((!isQuoted() || treatAsUnquoted)
518: && pos < _maxPosition) {
519: if (isDelimiter(pos)) {
520: end = pos;
521: pos += _delimiters.length();
522: break;
523: } else if (isNewLine(_charArray[pos])) {
524: end = pos;
525: break;
526: }
527: }
528:
529: pos++;
530: }
531:
532: _currentPosition = pos;
533: if (pos == _maxPosition) {
534: end = _maxPosition;
535: }
536:
537: if (start != end) {
538: String token = new String(_charArray, start, end
539: - start);
540: if (wasEscaped) {
541: return _qqPattern.matcher(token).replaceAll(
542: _qualifier);
543: }
544: return token;
545: } else if (endQuotedString) {
546: return EMPTY_STRING;
547: } else {
548: return null;
549: }
550:
551: }
552:
553: // if delimiter more than 1 char long, make sure all chars match
554: private boolean isDelimiter(int position) {
555: boolean delimiterFound = true;
556: for (int j = 0, J = _delimiters.length(); j < J; j++) {
557: if (_delimiters.charAt(j) != _charArray[position++]) {
558: delimiterFound = false;
559: break;
560: }
561: }
562: return delimiterFound;
563: }
564:
565: // if qualifier more than 1 char long, make sure all chars match
566: private boolean isQualifier(int position) {
567: boolean qualifierFound = true;
568: for (int j = 0, J = getQualifierLength(); j < J; j++) {
569: if (getQualifier().charAt(j) != _charArray[position++]) {
570: qualifierFound = false;
571: break;
572: }
573: }
574: return qualifierFound;
575: }
576: }
577:
578: // Expose default size of _lineCharArray for associated unit test.
579: static final int LINE_CHAR_ARRAY_SIZE = 80;
580:
581: private String _fieldSep;
582: private String _preferredLineSep;
583: private String[] _lineSeps;
584: private char[] _lineCharArray = new char[LINE_CHAR_ARRAY_SIZE];
585: private String _qualifier;
586: private Pattern _qPattern;
587: private Pattern _qqPattern;
588: }
|