0001: /*
0002: * $Id: ExternalDatabaseTable.java,v 1.27 2006/01/10 21:02:36 ahimanikya Exp $
0003: * =======================================================================
0004: * Copyright (c) 2002-2006 Axion Development Team. All rights reserved.
0005: *
0006: * Redistribution and use in source and binary forms, with or without
0007: * modification, are permitted provided that the following conditions
0008: * are met:
0009: *
0010: * 1. Redistributions of source code must retain the above
0011: * copyright notice, this list of conditions and the following
0012: * disclaimer.
0013: *
0014: * 2. Redistributions in binary form must reproduce the above copyright
0015: * notice, this list of conditions and the following disclaimer in
0016: * the documentation and/or other materials provided with the
0017: * distribution.
0018: *
0019: * 3. The names "Tigris", "Axion", nor the names of its contributors may
0020: * not be used to endorse or promote products derived from this
0021: * software without specific prior written permission.
0022: *
0023: * 4. Products derived from this software may not be called "Axion", nor
0024: * may "Tigris" or "Axion" appear in their names without specific prior
0025: * written permission.
0026: *
0027: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
0028: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
0029: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
0030: * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
0031: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
0032: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
0033: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
0034: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
0035: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
0036: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
0037: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
0038: * =======================================================================
0039: */
0040: package org.axiondb.engine.tables;
0041:
0042: import java.io.File;
0043: import java.sql.BatchUpdateException;
0044: import java.sql.Connection;
0045: import java.sql.PreparedStatement;
0046: import java.sql.ResultSet;
0047: import java.sql.SQLException;
0048: import java.sql.Statement;
0049: import java.sql.Types;
0050: import java.util.ArrayList;
0051: import java.util.Collections;
0052: import java.util.HashMap;
0053: import java.util.HashSet;
0054: import java.util.Iterator;
0055: import java.util.List;
0056: import java.util.ListIterator;
0057: import java.util.Map;
0058: import java.util.NoSuchElementException;
0059: import java.util.Properties;
0060: import java.util.Set;
0061:
0062: import org.apache.commons.collections.primitives.IntCollection;
0063: import org.apache.commons.logging.Log;
0064: import org.apache.commons.logging.LogFactory;
0065: import org.axiondb.AxionException;
0066: import org.axiondb.Column;
0067: import org.axiondb.ColumnIdentifier;
0068: import org.axiondb.Constraint;
0069: import org.axiondb.ConstraintViolationException;
0070: import org.axiondb.Database;
0071: import org.axiondb.DatabaseLink;
0072: import org.axiondb.ExternalTable;
0073: import org.axiondb.Function;
0074: import org.axiondb.Index;
0075: import org.axiondb.IndexLoader;
0076: import org.axiondb.Literal;
0077: import org.axiondb.Row;
0078: import org.axiondb.RowCollection;
0079: import org.axiondb.RowDecorator;
0080: import org.axiondb.RowIterator;
0081: import org.axiondb.RowSource;
0082: import org.axiondb.Selectable;
0083: import org.axiondb.SelectableBasedConstraint;
0084: import org.axiondb.Sequence;
0085: import org.axiondb.Table;
0086: import org.axiondb.TableIdentifier;
0087: import org.axiondb.TransactableTable;
0088: import org.axiondb.constraints.NotNullConstraint;
0089: import org.axiondb.constraints.PrimaryKeyConstraint;
0090: import org.axiondb.constraints.UniqueConstraint;
0091: import org.axiondb.engine.indexes.BaseIndex;
0092: import org.axiondb.engine.rowiterators.BaseRowIterator;
0093: import org.axiondb.engine.rowiterators.FilteringRowIterator;
0094: import org.axiondb.engine.rowiterators.ListIteratorRowIterator;
0095: import org.axiondb.engine.rowiterators.UnmodifiableRowIterator;
0096: import org.axiondb.engine.rows.SimpleRow;
0097: import org.axiondb.event.ColumnEvent;
0098: import org.axiondb.event.ConstraintEvent;
0099: import org.axiondb.event.RowEvent;
0100: import org.axiondb.event.RowUpdatedEvent;
0101: import org.axiondb.event.TableModificationListener;
0102: import org.axiondb.event.TableModifiedEvent;
0103: import org.axiondb.functions.AndFunction;
0104: import org.axiondb.functions.ComparisonFunction;
0105: import org.axiondb.functions.EqualFunction;
0106: import org.axiondb.jdbc.AxionConnection;
0107: import org.axiondb.util.ExceptionConverter;
0108: import org.axiondb.util.Utils;
0109: import org.axiondb.util.ValuePool;
0110:
0111: /**
0112: * Base implementation of ExternalTable interface.
0113: * <p>
0114: * Example: <code>
0115: * create external table emp (lname varchar(80), sid integer(6),
0116: * fname varchar(80), id integer(6) not null, dob timestamp(6))
0117: * organization (loadtype='remote' SERVER='myserver'
0118: * REMOTETABLE='mytablename' SCHEMA='myschema' WHERE='id > 1000');
0119: * </code>
0120: * <p>
0121: * Note : We have tested this for Oracle 8i/9i/10g , SQL Server, Sybase and DB2. <br>
0122: *
0123: * @version $Revision: 1.27 $ $Date: 2006/01/10 21:02:36 $
0124: * @author Jonathan Giron
0125: * @author Rahul Dwivedi
0126: * @author Ahimanikya Satapathy
0127: * @see org.axiondb.DatabaseLink
0128: */
0129: public class ExternalDatabaseTable implements ExternalTable,
0130: TransactableTable {
0131:
0132: // TODO: Vendor specific impl for truncate, table exists check, escape id, datatypes
0133: // TODO: Support remote table drop, create/drop index, create/drop constraint
0134:
0135: /** Set of recognized keys for organization properties */
0136: private static final Set PROPERTY_KEYS = new HashSet(6);
0137:
0138: /** Set of required keys for organization properties */
0139: private static final Set REQUIRED_KEYS = new HashSet(1);
0140:
0141: static {
0142: // Build set of recognized property keys for external db tables.
0143: PROPERTY_KEYS.add(PROP_DB);
0144: PROPERTY_KEYS.add(PROP_REMOTETABLE);
0145: PROPERTY_KEYS.add(PROP_ORDERBY);
0146: PROPERTY_KEYS.add(PROP_CATALOG);
0147: PROPERTY_KEYS.add(PROP_SCHEMA);
0148: PROPERTY_KEYS.add(PROP_WHERE);
0149: PROPERTY_KEYS.add(PROP_CREATE_IF_NOT_EXIST);
0150:
0151: // Build set of required property keys for external db tables.
0152: REQUIRED_KEYS.add(PROP_DB);
0153: }
0154:
0155: public ExternalDatabaseTable(String name, Database db) {
0156: _name = (name + "").toUpperCase();
0157: _db = db;
0158: }
0159:
0160: /**
0161: * Add the given {@link Column}to this table. This implementation throws an
0162: * {@link AxionException}if rows have already been added to the table.
0163: */
0164: public void addColumn(Column col) throws AxionException {
0165: if (getRowCount() > 0) {
0166: throw new AxionException(
0167: "Cannot add column because table already contains rows.");
0168: }
0169:
0170: if (col.isDerivedColumn() || col.isIdentityColumn()
0171: || col.isGeneratedAlways()) {
0172: throw new AxionException(
0173: "Generated column is not supported for table type");
0174: }
0175:
0176: _cols.add(col);
0177: clearCache();
0178: publishEvent(new ColumnEvent(this , col));
0179: }
0180:
0181: // Can we add this constraint to remote table if not defined already ?
0182: public void addConstraint(Constraint constraint)
0183: throws AxionException {
0184: if (_constraints.containsKey(constraint.getName())) {
0185: throw new AxionException("A constraint named "
0186: + constraint.getName() + " already exists.");
0187: } else if (constraint instanceof PrimaryKeyConstraint) {
0188: if (null != _pk) {
0189: throw new AxionException(
0190: "This table already has a primary key");
0191: }
0192: _pk = (PrimaryKeyConstraint) constraint;
0193: } else if (constraint instanceof NotNullConstraint) {
0194: NotNullConstraint notNull = (NotNullConstraint) constraint;
0195: _notNullColumns.addAll(getConstraintColumns(notNull));
0196: } else if (constraint instanceof UniqueConstraint) {
0197: _uniqueConstraints.add(constraint);
0198: } else {
0199: return; // ignore other constraint types
0200: }
0201:
0202: doAddConstraint(constraint);
0203: }
0204:
0205: public void addConstraint(Constraint constraint,
0206: boolean checkExistingRows) throws AxionException {
0207: if (constraint instanceof PrimaryKeyConstraint
0208: && null != getPrimaryKey()) {
0209: throw new AxionException(
0210: "This table already has a primary key");
0211: } else if (_constraints.containsKey(constraint.getName())) {
0212: throw new AxionException("A constraint named "
0213: + constraint.getName() + " already exists.");
0214: } else {
0215: if (checkExistingRows) {
0216: RowDecorator dec = makeRowDecorator();
0217: for (RowIterator iter = getRowIterator(); iter
0218: .hasNext();) {
0219: Row current = iter.next();
0220: RowEvent event = new RowUpdatedEvent(this , current,
0221: current);
0222: if (!constraint.evaluate(event, dec)) {
0223: throw new ConstraintViolationException(
0224: constraint);
0225: }
0226: }
0227: }
0228: _constraints.put(constraint.getName(), constraint);
0229:
0230: Iterator iter = getTableModificationListeners();
0231: while (iter.hasNext()) {
0232: TableModificationListener listener = (TableModificationListener) (iter
0233: .next());
0234: listener.constraintAdded(new ConstraintEvent(this ,
0235: constraint));
0236: }
0237: }
0238: }
0239:
0240: public void addIndex(Index index) throws AxionException {
0241: // TODO If index does not exist in remote table create it now.
0242: Column idxCol = index.getIndexedColumn();
0243: if (!isColumnIndexed(idxCol)) {
0244: String colName = idxCol.getName();
0245: _indexes.add(colName);
0246: // Since we are no longer maintaining index in Axion for remote tables,
0247: // we do not need to listen by addTableModificationListener(index);
0248: }
0249: }
0250:
0251: public void addRow(Row row) throws AxionException {
0252: assertConnection();
0253: try {
0254: if (_insertPS == null) {
0255: _insertPS = _conn.prepareStatement(getInsertSQL());
0256: }
0257: setValueParams(_insertPS, row);
0258: _insertModCount = addBatch(_insertPS, _insertModCount);
0259: } catch (Exception e) {
0260: throw new AxionException(e);
0261: }
0262: }
0263:
0264: public void addTableModificationListener(
0265: TableModificationListener listener) {
0266: _tableModificationListeners.add(listener);
0267: }
0268:
0269: public void apply() throws AxionException {
0270: }
0271:
0272: public void applyDeletes(IntCollection rowIds)
0273: throws AxionException {
0274: throw new UnsupportedOperationException();
0275: }
0276:
0277: public void applyInserts(RowCollection rows) {
0278: throw new UnsupportedOperationException();
0279: }
0280:
0281: public void applyUpdates(RowCollection rows) {
0282: throw new UnsupportedOperationException();
0283: }
0284:
0285: public void checkpoint() throws AxionException {
0286: }
0287:
0288: public void commit() throws AxionException {
0289: if (_conn == null) {
0290: return; // Nothing to commit.
0291: }
0292:
0293: try {
0294: if (_modCount > 0) {
0295: if (_insertModCount > 0 && _insertPS != null) {
0296: _insertPS.executeBatch();
0297: _insertModCount = 0;
0298: }
0299:
0300: if (_updateModCount > 0 && _updatePS != null) {
0301: _updatePS.executeBatch();
0302: _updateModCount = 0;
0303: }
0304:
0305: if (_deleteModCount > 0 && _deletePS != null) {
0306: _deletePS.executeBatch();
0307: _deleteModCount = 0;
0308: }
0309:
0310: _conn.commit();
0311: _modCount = 0;
0312: remount();
0313: }
0314: } catch (SQLException e) {
0315: rollback();
0316: if (e instanceof BatchUpdateException
0317: && e.getNextException() != null) {
0318: e = e.getNextException();
0319: }
0320: throw new AxionException(e, Integer.parseInt(e
0321: .getSQLState() == null ? "0" : e.getSQLState()));
0322: }
0323: }
0324:
0325: public void deleteRow(Row row) throws AxionException {
0326: assertUpdatable();
0327: assertConnection();
0328: try {
0329: if (_deletePS == null) {
0330: _deletePS = _conn.prepareStatement(getDeleteSQL());
0331: }
0332: setWhereParams(_deletePS, row, 0);
0333: _deleteModCount = addBatch(_deletePS, _deleteModCount);
0334: } catch (SQLException e) {
0335: rollback();
0336: throw convertException("Failed to apply deletes ", e);
0337: }
0338: }
0339:
0340: public void drop() throws AxionException {
0341: if (_conn == null || _stmt == null) {
0342: throw new AxionException("Invalid State: "
0343: + "Remote connection has been already closed");
0344: }
0345: try {
0346: _stmt.executeUpdate("DROP TABLE " + _remoteTableName);
0347: } catch (SQLException ignore) {
0348: // Ignore this exception
0349: } finally {
0350: shutdown();
0351: }
0352: }
0353:
0354: public void freeRowId(int id) {
0355: }
0356:
0357: public final Column getColumn(int index) {
0358: return (Column) (_cols.get(index));
0359: }
0360:
0361: public Column getColumn(String name) {
0362: for (int i = 0, I = _cols.size(); i < I; i++) {
0363: Column col = (Column) (_cols.get(i));
0364: if (col.getName().equalsIgnoreCase(name)) {
0365: return col;
0366: }
0367: }
0368: return null;
0369: }
0370:
0371: public final int getColumnCount() {
0372: return _cols.size();
0373: }
0374:
0375: public List getColumnIdentifiers() {
0376: List colids = new ArrayList();
0377: for (int i = 0, I = _cols.size(); i < I; i++) {
0378: Column col = (Column) (_cols.get(i));
0379: colids
0380: .add(new ColumnIdentifier(new TableIdentifier(
0381: getName()), col.getName(), null, col
0382: .getDataType()));
0383: }
0384: return Collections.unmodifiableList(colids);
0385: }
0386:
0387: public int getColumnIndex(String name) throws AxionException {
0388: for (int i = 0, I = _cols.size(); i < I; i++) {
0389: Column col = (Column) (_cols.get(i));
0390: if (col.getName().equalsIgnoreCase(name)) {
0391: return i;
0392: }
0393: }
0394: throw new AxionException("Column " + name + " not found.");
0395: }
0396:
0397: public final Constraint getConstraint(String name) {
0398: return null; // Don't allow explicit operations on Constraint
0399: }
0400:
0401: public Iterator getConstraints() {
0402: return _constraints.values().iterator();
0403: }
0404:
0405: public String getDBLinkName() {
0406: return _dblink;
0407: }
0408:
0409: public RowIterator getIndexedRows(RowSource source,
0410: Selectable node, boolean readOnly) throws AxionException {
0411: if (readOnly) {
0412: return UnmodifiableRowIterator.wrap(getIndexedRows(source,
0413: node));
0414: }
0415: return getIndexedRows(source, node);
0416: }
0417:
0418: public RowIterator getIndexedRows(Selectable node, boolean readOnly)
0419: throws AxionException {
0420: return getIndexedRows(this , node, readOnly);
0421: }
0422:
0423: public Index getIndexForColumn(Column column) {
0424: if ((_indexes != null) && (_indexes.contains(column.getName()))) {
0425: return new ExternalTableIndex(column);
0426: }
0427: return null;
0428: }
0429:
0430: public Iterator getIndices() {
0431: return Collections.EMPTY_LIST.iterator();
0432: }
0433:
0434: public RowIterator getMatchingRows(List selectables, List values,
0435: boolean readOnly) throws AxionException {
0436: if (null == selectables || selectables.isEmpty()) {
0437: return getRowIterator(readOnly);
0438: }
0439:
0440: RowIterator baseIterator = null;
0441: Selectable filter = null;
0442: for (int i = 0, I = selectables.size(); i < I; i++) {
0443: Selectable sel = (Selectable) selectables.get(i);
0444: Object val = values.get(i);
0445:
0446: EqualFunction function = new EqualFunction();
0447: function.addArgument(sel);
0448: function.addArgument(new Literal(val));
0449:
0450: if (null == baseIterator) {
0451: baseIterator = getIndexedRows(function, readOnly);
0452: if (baseIterator != null) {
0453: function = null;
0454: }
0455: }
0456:
0457: if (function != null) {
0458: if (null == filter) {
0459: filter = function;
0460: } else {
0461: AndFunction fn = new AndFunction();
0462: fn.addArgument(filter);
0463: fn.addArgument(function);
0464: filter = fn;
0465: }
0466: }
0467: }
0468:
0469: if (null == baseIterator) {
0470: baseIterator = getRowIterator(readOnly);
0471: }
0472:
0473: if (null != filter) {
0474: return new FilteringRowIterator(baseIterator,
0475: makeRowDecorator(), filter);
0476: }
0477: return baseIterator;
0478: }
0479:
0480: public final String getName() {
0481: return _name;
0482: }
0483:
0484: public int getNextRowId() {
0485: return 0;
0486: }
0487:
0488: public Row getRow(int id) throws AxionException {
0489: return getRowByOffset(id);
0490: }
0491:
0492: public int getRowCount() {
0493: if (-1 == _rowCount) {
0494: _rowCount = getTableSize();
0495: }
0496: return _rowCount;
0497: }
0498:
0499: public RowIterator getRowIterator(boolean readOnly)
0500: throws AxionException {
0501: if (readOnly) {
0502: return UnmodifiableRowIterator.wrap(getRowIterator());
0503: }
0504: return getRowIterator();
0505: }
0506:
0507: public final Sequence getSequence() {
0508: return null;
0509: }
0510:
0511: public Iterator getTableModificationListeners() {
0512: return _tableModificationListeners.iterator();
0513: }
0514:
0515: public Properties getTableProperties() {
0516: return context.getTableProperties();
0517: }
0518:
0519: public final String getType() {
0520: return EXTERNAL_DB_TABLE_TYPE;
0521: }
0522:
0523: public boolean hasColumn(ColumnIdentifier id) {
0524: boolean result = false;
0525: String tableName = id.getTableName();
0526: if (tableName == null || tableName.equals(getName())) {
0527: result = (getColumn(id.getName()) != null);
0528: }
0529: return result;
0530: }
0531:
0532: public boolean hasIndex(String name) {
0533: // XXX Check with remote table whether we have an index
0534: return false;
0535: }
0536:
0537: public boolean isColumnIndexed(Column column) {
0538: try {
0539: return isColumnIndexed(column.getName());
0540: } catch (AxionException e) {
0541: throw ExceptionConverter.convertToRuntimeException(e);
0542: }
0543: }
0544:
0545: public boolean isPrimaryKeyConstraintExists(String columnName) {
0546: boolean result = false;
0547: for (Iterator iter = _constraints.values().iterator(); iter
0548: .hasNext();) {
0549: Object constraint = iter.next();
0550: if (constraint instanceof PrimaryKeyConstraint) {
0551: UniqueConstraint uk = (UniqueConstraint) constraint;
0552: if (uk.getSelectableCount() == 1) {
0553: ColumnIdentifier cid = (ColumnIdentifier) (uk
0554: .getSelectableList().get(0));
0555: if (columnName.equals(cid.getName())) {
0556: result = true;
0557: }
0558: }
0559: }
0560: }
0561: return result;
0562: }
0563:
0564: /**
0565: * check if unique constraint exists on a column
0566: *
0567: * @param columnName name of the column
0568: * @return true if uniqueConstraint exists on the column
0569: */
0570: public boolean isUniqueConstraintExists(String columnName) {
0571: boolean result = false;
0572: for (Iterator iter = _constraints.values().iterator(); iter
0573: .hasNext();) {
0574: Object constraint = iter.next();
0575: if (constraint instanceof UniqueConstraint) {
0576: UniqueConstraint uk = (UniqueConstraint) constraint;
0577: if (uk.getSelectableCount() == 1) {
0578: ColumnIdentifier cid = (ColumnIdentifier) (uk
0579: .getSelectableList().get(0));
0580: if (columnName.equals(cid.getName())) {
0581: result = true;
0582: }
0583: }
0584: }
0585: }
0586: return result;
0587: }
0588:
0589: public boolean loadExternalTable(Properties props)
0590: throws AxionException {
0591: context = new ExternalDatabaseTableOrganizationContext();
0592: context.readOrSetDefaultProperties(props);
0593: context.updateProperties();
0594: return true;
0595: }
0596:
0597: public RowDecorator makeRowDecorator() {
0598: if (null == _colIndexToColIdMap) {
0599: int size = _cols.size();
0600: Map map = new HashMap(size);
0601: for (int i = 0; i < size; i++) {
0602: Column col = (Column) (_cols.get(i));
0603: ColumnIdentifier colid = new ColumnIdentifier(
0604: new TableIdentifier(getName()), col.getName(),
0605: null, col.getDataType());
0606: map.put(colid, ValuePool.getInt(i));
0607: }
0608: _colIndexToColIdMap = map;
0609: }
0610: return new RowDecorator(_colIndexToColIdMap);
0611: }
0612:
0613: public TransactableTable makeTransactableTable() {
0614: return this ;
0615: }
0616:
0617: public void migrate() throws AxionException {
0618: }
0619:
0620: // Can we create index on remote table if not defined already ?
0621: public void populateIndex(Index index) throws AxionException {
0622: // Ignore as of now, for remote tables Axion will not maintain indexes.
0623: }
0624:
0625: public void remount() throws AxionException {
0626: if (_externalRs != null) {
0627: try {
0628: _externalRs.close();
0629: } catch (SQLException e) {
0630: }
0631: _externalRs = null;
0632: }
0633: _rowCount = -1;
0634: }
0635:
0636: public void remount(File dir, boolean datafilesonly)
0637: throws AxionException {
0638: }
0639:
0640: public Constraint removeConstraint(String name) {
0641: if (name != null) {
0642: name = name.toUpperCase();
0643: if ("PRIMARYKEY".equals(name)) {
0644: if (_pk != null) {
0645: name = _pk.getName();
0646: } else {
0647: name = null;
0648: }
0649: }
0650: }
0651:
0652: if (_constraints.containsKey(name)) {
0653: Constraint constraint = (Constraint) _constraints.get(name);
0654: Iterator iter = getTableModificationListeners();
0655: while (iter.hasNext()) {
0656: TableModificationListener listener = (TableModificationListener) (iter
0657: .next());
0658: try {
0659: listener.constraintRemoved(new ConstraintEvent(
0660: this , constraint));
0661: } catch (AxionException e) {
0662: _log
0663: .error(
0664: "Unable to publish constraint removed event",
0665: e);
0666: }
0667: }
0668: _constraints.remove(name);
0669:
0670: if (constraint == _pk) {
0671: _pk = null;
0672: } else if (constraint instanceof NotNullConstraint) {
0673: List columns = getConstraintColumns((NotNullConstraint) constraint);
0674: _notNullColumns.removeAll(columns);
0675: } else if (constraint instanceof UniqueConstraint) {
0676: _uniqueConstraints.remove(constraint);
0677: }
0678: return constraint;
0679: }
0680: return null;
0681: }
0682:
0683: public void removeIndex(Index index) throws AxionException {
0684: // TODO try to remove index from remote table
0685: this .removeTableModificationListener(index);
0686: }
0687:
0688: public void removeTableModificationListener(
0689: TableModificationListener listener) {
0690: _tableModificationListeners.remove(listener);
0691: }
0692:
0693: public void rename(String oldName, String newName)
0694: throws AxionException {
0695: setName(newName);
0696: clearCache();
0697: }
0698:
0699: public void rollback() throws AxionException {
0700: if (_conn == null) {
0701: return; // Nothing to rollback.
0702: }
0703:
0704: try {
0705: if (_insertModCount > 0 && _insertPS != null) {
0706: _insertPS.clearBatch();
0707: _insertPS.clearWarnings();
0708: _insertModCount = 0;
0709: }
0710:
0711: if (_updateModCount > 0 && _updatePS != null) {
0712: _updatePS.clearBatch();
0713: _updatePS.clearWarnings();
0714: _updateModCount = 0;
0715: }
0716:
0717: if (_deleteModCount > 0 && _deletePS != null) {
0718: _deletePS.clearBatch();
0719: _deletePS.clearWarnings();
0720: _deleteModCount = 0;
0721: }
0722: _conn.rollback();
0723: _modCount = 0;
0724: remount();
0725: } catch (SQLException e) {
0726: throw new AxionException(e);
0727: }
0728: }
0729:
0730: public void setDeferAllConstraints(boolean deferAll) {
0731: }
0732:
0733: public void setSequence(Sequence seq) throws AxionException {
0734: }
0735:
0736: public void shutdown() throws AxionException {
0737: try {
0738: if (_externalRs != null)
0739: _externalRs.close();
0740: if (_stmt != null)
0741: _stmt.close();
0742: if (_insertPS != null)
0743: _insertPS.close();
0744: if (_updatePS != null)
0745: _updatePS.close();
0746: if (_deletePS != null)
0747: _deletePS.close();
0748: if (_rowCountPS != null)
0749: _rowCountPS.close();
0750:
0751: if (_indexSelectPSs != null) {
0752: Iterator itr = _indexSelectPSs.values().iterator();
0753: Statement stmnt = null;
0754: while (itr.hasNext()) {
0755: stmnt = (Statement) itr.next();
0756: try {
0757: stmnt.close();
0758: } catch (Exception ex) {
0759: // ignore
0760: }
0761: }
0762: }
0763: if (_conn != null)
0764: _conn.close();
0765: } catch (SQLException ignore) {
0766: // Ignore this exception
0767: } finally {
0768: _externalRs = null;
0769: _conn = null;
0770: _stmt = null;
0771: _rowCountPS = null;
0772: _insertPS = null;
0773: _updatePS = null;
0774: _deletePS = null;
0775: _indexSelectPSs = null;
0776: _db = null;
0777: }
0778: }
0779:
0780: public String toString() {
0781: return getName();
0782: }
0783:
0784: // NOTE: DB specific subclass could optimize truncate operation
0785: public void truncate() throws AxionException {
0786: assertConnection();
0787: Statement stmt = null;
0788: try {
0789: stmt = _conn.createStatement();
0790: stmt.executeUpdate(getTruncateSQL());
0791: _modCount++;
0792: commit();
0793: } catch (SQLException e) {
0794: throw new AxionException(e);
0795: } finally {
0796: closeStatement(stmt);
0797: }
0798: }
0799:
0800: public void updateRow(Row oldrow, Row newrow) throws AxionException {
0801: updateRow(oldrow, newrow, null);
0802: }
0803:
0804: public void updateRow(Row oldrow, Row newrow, List cols)
0805: throws AxionException {
0806: assertUpdatable();
0807: assertConnection();
0808: try {
0809: if (_updatePS == null) {
0810: createUpdatePS(cols);
0811: } else if (_updateCols != null && cols == null) {
0812: createUpdatePS(cols);
0813: } else if (cols != null
0814: && (_updateCols == null || !_updateCols
0815: .equals(cols))) {
0816: createUpdatePS(cols);
0817: }
0818:
0819: setValueParams(_updatePS, newrow);
0820: setWhereParams(_updatePS, oldrow,
0821: _updateCols != null ? _updateCols.size() : oldrow
0822: .size());
0823: _updateModCount = addBatch(_updatePS, _updateModCount);
0824: } catch (SQLException e) {
0825: rollback();
0826: throw convertException("Failed to apply updates ", e);
0827: }
0828: }
0829:
0830: protected void checkConstraints(RowEvent event)
0831: throws AxionException {
0832: // let remote db handle this.
0833: }
0834:
0835: protected String getDeleteSQL() {
0836: StringBuffer stmtBuf = new StringBuffer(30);
0837: String rTable = getQualifiedTable();
0838:
0839: stmtBuf.append("DELETE FROM ").append(rTable);
0840: populateWhere(stmtBuf.append(" WHERE "));
0841: return stmtBuf.toString();
0842: }
0843:
0844: // TODO: Make sure the row that is getting inserted/updated is not out of scope for
0845: // the given _where condition
0846: protected String getInsertSQL() {
0847: StringBuffer stmtBuf = new StringBuffer(30);
0848: String rTable = getQualifiedTable();
0849:
0850: stmtBuf.append("INSERT INTO ").append(rTable).append(" ");
0851: stmtBuf.append(getDirectInsertHint(rTable)).append(" (");
0852:
0853: populateColumns(stmtBuf, null);
0854: populateValues(stmtBuf.append(") VALUES ("));
0855:
0856: stmtBuf.append(")");
0857: return stmtBuf.toString();
0858: }
0859:
0860: protected RowIterator getRowIterator() throws AxionException {
0861: createOrLoadResultSet();
0862: return new BaseRowIterator() {
0863: Row _current = null;
0864: int _currentId = -1;
0865: int _currentIndex = -1;
0866: int _nextId = 0;
0867: int _nextIndex = 0;
0868:
0869: public Row current() {
0870: if (!hasCurrent()) {
0871: throw new NoSuchElementException("No current row.");
0872: }
0873: return _current;
0874: }
0875:
0876: public int currentIndex() {
0877: return _currentIndex;
0878: }
0879:
0880: public boolean hasCurrent() {
0881: return (null != _current);
0882: }
0883:
0884: public boolean hasNext() {
0885: return nextIndex() < getRowCount();
0886: }
0887:
0888: public boolean hasPrevious() {
0889: return nextIndex() > 0;
0890: }
0891:
0892: public Row next() throws AxionException {
0893: if (!hasNext()) {
0894: throw new NoSuchElementException("No next row");
0895: }
0896:
0897: do {
0898: _currentId = _nextId++;
0899: _current = getRowByOffset(_currentId);
0900: } while (null == _current);
0901: _currentIndex = _nextIndex;
0902: _nextIndex++;
0903: return _current;
0904: }
0905:
0906: public int nextIndex() {
0907: return _nextIndex;
0908: }
0909:
0910: public Row previous() throws AxionException {
0911: if (!hasPrevious()) {
0912: throw new NoSuchElementException("No previous row");
0913: }
0914:
0915: do {
0916: _currentId = (--_nextId);
0917: _current = getRowByOffset(_currentId);
0918: } while (null == _current);
0919: _nextIndex--;
0920: _currentIndex = _nextIndex;
0921: return _current;
0922: }
0923:
0924: public int previousIndex() {
0925: return _nextIndex - 1;
0926: }
0927:
0928: public void remove() throws AxionException {
0929: if (-1 == _currentIndex) {
0930: throw new IllegalStateException("No current row.");
0931: }
0932: deleteRow(_current);
0933: _currentIndex = -1;
0934: }
0935:
0936: public void reset() {
0937: _current = null;
0938: _nextIndex = 0;
0939: _currentIndex = -1;
0940: _nextId = 0;
0941: }
0942:
0943: public void set(Row row) throws AxionException {
0944: if (-1 == _currentIndex) {
0945: throw new IllegalStateException("No current row.");
0946: }
0947: updateRow(_current, row);
0948: }
0949:
0950: public int size() throws AxionException {
0951: return getRowCount();
0952: }
0953:
0954: public String toString() {
0955: return "ExternalDatabaseTable(" + getName() + ")";
0956: }
0957: };
0958: }
0959:
0960: /**
0961: * Gets appropriate string for use in indicating wildcard search for schemas.
0962: *
0963: * @return wildcard string.
0964: */
0965: protected String getSchemaWildcardForRemoteDB() {
0966: String wildcard = "%";
0967:
0968: try {
0969: String dbName = _conn.getMetaData()
0970: .getDatabaseProductName();
0971: if (dbName.lastIndexOf("DB2") != -1) {
0972: // Don't use schema wildcard
0973: wildcard = null;
0974: }
0975: } catch (SQLException ignore) {
0976: wildcard = "%";
0977: }
0978:
0979: return wildcard;
0980: }
0981:
0982: protected String getSelectSQL(String where) {
0983: StringBuffer stmtBuf = new StringBuffer(60);
0984: String rTable = getQualifiedTable();
0985:
0986: stmtBuf.append("SELECT ");
0987: populateColumns(stmtBuf, rTable);
0988:
0989: stmtBuf.append(" FROM ").append(rTable);
0990:
0991: if (where != null && where.trim().length() != 0) {
0992: stmtBuf.append(" WHERE " + where.trim());
0993: }
0994:
0995: return stmtBuf.toString();
0996: }
0997:
0998: protected String getUpdateSQL() {
0999: StringBuffer stmtBuf = new StringBuffer(30);
1000: String rTable = getQualifiedTable();
1001:
1002: stmtBuf.append("UPDATE ").append(rTable);
1003:
1004: populateSet(stmtBuf.append(" SET "));
1005: populateWhere(stmtBuf.append(" WHERE "));
1006:
1007: return stmtBuf.toString();
1008: }
1009:
1010: protected void setUp(DatabaseLink server) throws AxionException {
1011: try {
1012: // We don't want to see others change for performance point of view. This is a memory
1013: // table anyway, after every commit the result will be updated or one could
1014: // choose to use REMOUNT table-name to see external changes on demand.
1015: _conn = server.getConnection();
1016: assertConnection();
1017:
1018: _isAxion = _conn instanceof AxionConnection;
1019: if (_conn.getMetaData().supportsResultSetConcurrency(
1020: ResultSet.TYPE_SCROLL_INSENSITIVE,
1021: ResultSet.CONCUR_READ_ONLY)) {
1022: _stmt = _conn.createStatement(
1023: ResultSet.TYPE_SCROLL_INSENSITIVE,
1024: ResultSet.CONCUR_READ_ONLY);
1025: _stmt.setFetchSize(FETCH_SIZE);
1026: } else if (_isAxion) {
1027: _stmt = _conn.createStatement(
1028: ResultSet.TYPE_SCROLL_SENSITIVE,
1029: ResultSet.CONCUR_READ_ONLY);
1030: } else {
1031: _stmt = _conn.createStatement();
1032: }
1033:
1034: if (_isCreateIfNotExist) {
1035: createRemoteTableIfNotExists();
1036: } else {
1037: if (!tableExistsInRemoteDB()) {
1038: throw new AxionException("Table " + _name
1039: + " does not exists.");
1040: }
1041: }
1042: } catch (SQLException e) {
1043: throw convertException(
1044: "Initialization error for remote table "
1045: + getName(), e);
1046: }
1047: }
1048:
1049: private int addBatch(PreparedStatement pstmt, int stmtModCount)
1050: throws SQLException {
1051: pstmt.addBatch();
1052: if (++stmtModCount == BATCH_SIZE) {
1053: pstmt.executeBatch();
1054: stmtModCount = 0;
1055: }
1056: _modCount++;
1057: return stmtModCount;
1058: }
1059:
1060: private void appendColumnNames(List cols, StringBuffer buf) {
1061: ListIterator listIter = cols.listIterator();
1062: while (listIter.hasNext()) {
1063: String colName = (String) listIter.next();
1064: if (listIter.previousIndex() != 0) {
1065: buf.append(", ");
1066: }
1067: buf.append(colName);
1068: }
1069: }
1070:
1071: private void assertConnection() throws AxionException {
1072: if (_conn == null) {
1073: throw new AxionException(
1074: "Could not connect to remote database: " + _dblink);
1075: }
1076: }
1077:
1078: private void assertExternalResultSet() throws AxionException {
1079: if (_externalRs == null) {
1080: throw new AxionException(
1081: "Invalid state: <null> ResultSet for external table "
1082: + getName());
1083: }
1084: }
1085:
1086: private void assertUpdatable() throws AxionException {
1087: if (!_isUpdatable) {
1088: throw new AxionException(
1089: "Not an updatable view - operation not allowed.");
1090: }
1091: }
1092:
1093: private void buildDBSpecificDatatypeMap() throws SQLException {
1094: _typeInfoMap.clear();
1095: ResultSet typeInfo = _conn.getMetaData().getTypeInfo();
1096: while (typeInfo.next()) {
1097: Object typeName = typeInfo.getString(1);
1098: Object type = ValuePool.getInt(typeInfo.getInt(2));
1099: _typeInfoMap.put(type, typeName);
1100: }
1101: }
1102:
1103: private void buildTableIndex() throws AxionException {
1104: if (_indexes == null) {
1105: if (!tableExistsInRemoteDB()) {
1106: _indexes = Collections.EMPTY_LIST;
1107: }
1108: }
1109: }
1110:
1111: private final void clearCache() {
1112: _colIndexToColIdMap = null;
1113: }
1114:
1115: private void closeResultSet(ResultSet rs) {
1116: if (rs != null) {
1117: try {
1118: rs.close();
1119: } catch (Exception ex) {
1120: // Ignore
1121: }
1122: }
1123: }
1124:
1125: private void closeStatement(Statement stmt) {
1126: if (stmt != null) {
1127: try {
1128: stmt.close();
1129: } catch (SQLException e1) {
1130: }
1131: }
1132: }
1133:
1134: private AxionException convertException(String errMsg,
1135: SQLException sqlEx) {
1136: StringBuffer msg = new StringBuffer(100);
1137: msg.append(errMsg == null ? "" : errMsg);
1138: msg.append(" ( CODE " + sqlEx.getErrorCode());
1139: msg.append(" - SQLSTATE " + sqlEx.getSQLState() + " ) ");
1140: msg.append(" : " + sqlEx.getMessage());
1141: return new AxionException(msg.toString());
1142: }
1143:
1144: private void createOrLoadResultSet() throws AxionException {
1145: if (_externalRs == null) {
1146: _externalRs = getResultSet(getSelectSQL(_where));
1147: _rowCount = -1;
1148: }
1149: }
1150:
1151: private void createRemoteTableIfNotExists() throws AxionException {
1152: assertConnection();
1153: Statement stmt = null;
1154: try {
1155: if (!tableExistsInRemoteDB()) {
1156: stmt = _conn.createStatement();
1157: buildDBSpecificDatatypeMap();
1158: stmt.executeUpdate(getCreateSQL());
1159: }
1160: } catch (SQLException e) {
1161: throw convertException("Could not create remote table "
1162: + getName(), e);
1163: } finally {
1164: closeStatement(stmt);
1165: }
1166: }
1167:
1168: private void createUpdatePS(List cols) throws SQLException {
1169: _updateCols = cols;
1170: if (_updatePS != null) {
1171: _updatePS.close();
1172: _updatePS = null;
1173: }
1174: _updatePS = _conn.prepareStatement(getUpdateSQL());
1175: }
1176:
1177: private void doAddConstraint(Constraint constraint)
1178: throws AxionException {
1179: _constraints.put(constraint.getName(), constraint);
1180: Iterator iter = getTableModificationListeners();
1181: while (iter.hasNext()) {
1182: TableModificationListener listener = (TableModificationListener) (iter
1183: .next());
1184: listener.constraintAdded(new ConstraintEvent(this ,
1185: constraint));
1186: }
1187: }
1188:
1189: /**
1190: * Indicates whether a table with the given name exists in the remote database for
1191: * schemas that match the given schema pattern.
1192: *
1193: * @param name name of table whose existence is being tested
1194: * @param schemaPattern schema pattern to use; may be null
1195: * @return true if a table exists with <code>name</code> as its object name in a
1196: * schema that matches <code>schemaPattern</code>; false otherwise
1197: * @throws SQLException if error occurs while testing for existence of
1198: * <code>name</code>
1199: */
1200: private boolean doesRemoteTableExist(String name,
1201: String schemaPattern) throws SQLException {
1202: ResultSet tableExist = _conn.getMetaData().getTables(
1203: _conn.getCatalog(), schemaPattern, name,
1204: JDBC_TABLE_OBJECT_TYPE);
1205: boolean found = (tableExist.next());
1206: if (found && _indexes == null) {
1207: _indexes = getIndexInfo(name, tableExist
1208: .getString("TABLE_SCHEM"));
1209: }
1210: closeResultSet(tableExist);
1211: return found;
1212: }
1213:
1214: private List getConstraintColumns(
1215: SelectableBasedConstraint constraint) {
1216: List columnList = new ArrayList();
1217: for (int i = 0, I = constraint.getSelectableCount(); i < I; i++) {
1218: Selectable sel = constraint.getSelectable(i);
1219: if (sel instanceof ColumnIdentifier
1220: && hasColumn((ColumnIdentifier) sel)) {
1221: columnList.add(sel.getName());
1222: }
1223: }
1224: return columnList;
1225: }
1226:
1227: private String getCreateSQL() throws SQLException {
1228: StringBuffer stmtBuf = new StringBuffer(30);
1229: String rTable = _remoteTableName;
1230:
1231: String dbName = _conn.getMetaData().getDatabaseProductName();
1232: boolean isdb2 = dbName.lastIndexOf("DB2") != -1 ? true : false;
1233:
1234: stmtBuf.append("CREATE TABLE ").append(rTable);
1235: stmtBuf.append(" ( ");
1236: for (int i = 0, I = getColumnCount(); i < I; i++) {
1237: Column col = getColumn(i);
1238: int jdbcType = col.getDataType().getJdbcType();
1239: Integer jdbcTypeObj = ValuePool.getInt(jdbcType);
1240:
1241: if (i != 0) {
1242: stmtBuf.append(", ");
1243: }
1244: stmtBuf.append(col.getName());
1245:
1246: String typeName = (String) (_typeInfoMap
1247: .containsKey(jdbcTypeObj) ? _typeInfoMap
1248: .get(jdbcTypeObj) : col.getSqlType());
1249: stmtBuf.append(" ").append(typeName);
1250:
1251: int scale = col.getScale();
1252: int precision = col.getSize();
1253: if (precision > 0 && isPrecisionRequired(jdbcType, isdb2)) {
1254: stmtBuf.append("(").append(precision);
1255: if (scale > 0 && Utils.isScaleRequired(jdbcType)) {
1256: stmtBuf.append(",").append(scale).append(") ");
1257: } else {
1258: stmtBuf.append(")");
1259: }
1260: }
1261:
1262: if (Utils.isBinary(jdbcType) && isdb2) {
1263: stmtBuf.append(" FOR BIT DATA ");
1264: }
1265:
1266: if (_notNullColumns.contains(col.getName())) {
1267: stmtBuf.append(" NOT NULL");
1268: }
1269:
1270: if (_pk != null) {
1271: stmtBuf.append(" PRIMARY KEY (");
1272: appendColumnNames(getConstraintColumns(_pk), stmtBuf);
1273: stmtBuf.append(") ");
1274: }
1275:
1276: if (!_uniqueConstraints.isEmpty()) {
1277: Iterator it = _uniqueConstraints.iterator();
1278: while (it.hasNext()) {
1279: UniqueConstraint unique = (UniqueConstraint) it
1280: .next();
1281: stmtBuf.append(" UNIQUE (");
1282: appendColumnNames(getConstraintColumns(unique),
1283: stmtBuf);
1284: stmtBuf.append(") ");
1285: }
1286: }
1287: }
1288: stmtBuf.append(" )");
1289: return stmtBuf.toString();
1290: }
1291:
1292: private String getDirectInsertHint(String tableName) {
1293: DatabaseLink server = _db.getDatabaseLink(_dblink);
1294: if (server.getJdbcUrl().toUpperCase().indexOf("ORACLE") != -1) {
1295: return "/*+ append parallel ( " + tableName + ", 4) */";
1296: }
1297: return "";
1298: }
1299:
1300: private RowIterator getIndexedRows(RowSource source, Selectable node)
1301: throws AxionException {
1302: if (node instanceof ComparisonFunction) {
1303: ComparisonFunction function = (ComparisonFunction) node;
1304:
1305: String column = null;
1306: Literal literal = null;
1307: Selectable left = function.getArgument(0);
1308: Selectable right = function.getArgument(1);
1309:
1310: if (left instanceof ColumnIdentifier
1311: && right instanceof Literal) {
1312: column = ((ColumnIdentifier) left).getName();
1313: literal = (Literal) (right);
1314: return getIndexedRows(function, column, literal
1315: .evaluate());
1316: } else if (left instanceof Literal
1317: && right instanceof ColumnIdentifier) {
1318: column = ((ColumnIdentifier) right).getName();
1319: literal = (Literal) (left);
1320: return getIndexedRows(function, column, literal
1321: .evaluate());
1322: } else {
1323: return null;
1324: }
1325: }
1326: return null; // No matching index found
1327: }
1328:
1329: private RowIterator getIndexedRows(ComparisonFunction function,
1330: String columnName, Object value) throws AxionException {
1331: String psKey = columnName + "_" + function.toString();
1332: PreparedStatement ps = (PreparedStatement) _indexSelectPSs
1333: .get(psKey);
1334:
1335: if (ps == null) {
1336: String where = "";
1337: String strPS = "";
1338: where = columnName + " " + function.getOperatorString()
1339: + " ? ";
1340:
1341: if (_where != null && _where.trim().length() != 0) {
1342: where = (_where + " AND " + where);
1343: }
1344:
1345: strPS = getSelectSQL(where);
1346: try {
1347: ps = this ._conn.prepareStatement(strPS,
1348: ResultSet.TYPE_FORWARD_ONLY,
1349: ResultSet.CONCUR_READ_ONLY);
1350: } catch (SQLException ex) {
1351: throw convertException(
1352: "while creating PS for remote DB:", ex);
1353: }
1354: _indexSelectPSs.put(psKey, ps);
1355: }
1356:
1357: List rows = new ArrayList();
1358: ResultSet rs = null;
1359: try {
1360: ps.setObject(1, value);
1361: rs = ps.executeQuery();
1362:
1363: while (rs.next()) {
1364: rows.add(getRowFromRS(-1, rs));
1365: }
1366: } catch (SQLException e) {
1367: throw new AxionException(e);
1368: } finally {
1369: closeResultSet(rs);
1370: }
1371:
1372: return new IndexRowIterator(rows);
1373: }
1374:
1375: private List getIndexInfo(String name, String schemaPattern)
1376: throws SQLException {
1377: ResultSet rs = _conn.getMetaData().getIndexInfo(
1378: _conn.getCatalog(), schemaPattern, name, false, true);
1379: List indexes = new ArrayList();
1380: while (rs.next()) {
1381: String colName = rs.getString("COLUMN_NAME");
1382: if (colName != null) {
1383: indexes.add(colName);
1384: }
1385: }
1386: closeResultSet(rs);
1387: return indexes;
1388: }
1389:
1390: private PrimaryKeyConstraint getPrimaryKey() {
1391: for (Iterator iter = _constraints.values().iterator(); iter
1392: .hasNext();) {
1393: Constraint constraint = (Constraint) (iter.next());
1394: if (constraint instanceof PrimaryKeyConstraint) {
1395: return (PrimaryKeyConstraint) (constraint);
1396: }
1397: }
1398: return null;
1399: }
1400:
1401: private String getQualifiedTable() {
1402: if (_qualifiedTableName == null) {
1403: String rTable = _remoteTableName;
1404:
1405: if (_schemaName != null && _schemaName.trim().length() != 0) {
1406: rTable = _schemaName + "." + rTable;
1407: }
1408:
1409: if (_catalogName != null
1410: && _catalogName.trim().length() != 0) {
1411: rTable = _catalogName + "." + rTable;
1412: }
1413: _qualifiedTableName = rTable;
1414: }
1415: return _qualifiedTableName;
1416: }
1417:
1418: private ResultSet getResultSet(String sql) throws AxionException {
1419: assertConnection();
1420: if (_stmt == null) {
1421: throw new AxionException(
1422: "Invalid state: <null> statement for external table "
1423: + getName());
1424: }
1425: try {
1426: return _stmt.executeQuery(sql);
1427: } catch (SQLException e) {
1428: throw convertException("Could not load remote table "
1429: + getName(), e);
1430: }
1431: }
1432:
1433: private Row getRowByOffset(int rowId) throws AxionException {
1434: assertExternalResultSet();
1435: try {
1436: synchronized (_externalRs) {
1437: if (_externalRs.absolute(rowId + 1)) {
1438: return getRowFromRS(rowId, _externalRs);
1439: }
1440: return null;
1441: }
1442: } catch (Exception e) {
1443: throw new AxionException(e);
1444: }
1445: }
1446:
1447: private Row getRowFromRS(int rowId, ResultSet rs)
1448: throws SQLException, AxionException {
1449: Row aRow = new SimpleRow(rowId, getColumnCount());
1450: Object colValue = null;
1451: Object externalColValue = null;
1452:
1453: for (int i = 0, I = getColumnCount(); i < I; i++) {
1454: externalColValue = rs.getObject(i + 1);
1455: colValue = getColumn(i).getDataType().convert(
1456: externalColValue);
1457: aRow.set(i, colValue);
1458: }
1459: return aRow;
1460: }
1461:
1462: private int getTableSize() {
1463: int tableSize = 0;
1464: ResultSet rs = null;
1465: try {
1466: assertConnection();
1467: if (_rowCountPS == null) {
1468: StringBuffer sb = new StringBuffer(60);
1469: sb.append("SELECT COUNT(*) FROM ");
1470: sb.append(getQualifiedTable());
1471:
1472: if ((_where != null) && (!"".equals(_where.trim()))) {
1473: sb.append(" WHERE ");
1474: sb.append(_where);
1475: }
1476:
1477: _rowCountPS = _conn.prepareStatement(sb.toString());
1478: }
1479:
1480: rs = _rowCountPS.executeQuery();
1481: if (rs.next()) {
1482: tableSize = rs.getInt(1);
1483: }
1484: } catch (Exception ex) {
1485: throw ExceptionConverter.convertToRuntimeException(ex);
1486: } finally {
1487: closeResultSet(rs);
1488: }
1489:
1490: return tableSize;
1491: }
1492:
1493: private String getTruncateSQL() {
1494: if (_trunncateSQL == null) {
1495: StringBuffer truncateSql = new StringBuffer(20);
1496: DatabaseLink server = _db.getDatabaseLink(_dblink);
1497: String url = server.getJdbcUrl().toUpperCase();
1498: if (url.indexOf("ORACLE") != -1
1499: || url.indexOf("AXIONDB") != -1) {
1500: truncateSql.append("TRUNCATE TABLE ");
1501: } else {
1502: truncateSql.append("DELETE FROM ");
1503: }
1504: truncateSql.append(getQualifiedTable());
1505: _trunncateSQL = truncateSql.toString();
1506: }
1507: return _trunncateSQL;
1508: }
1509:
1510: private boolean isColumnIndexed(String column)
1511: throws AxionException {
1512: buildTableIndex();
1513: return _indexes.contains(column);
1514: }
1515:
1516: private boolean isPrecisionRequired(int jdbcType, boolean isdb2) {
1517: if (isdb2 && jdbcType == Types.BLOB || jdbcType == Types.CLOB) {
1518: return true;
1519: } else {
1520: return Utils.isPrecisionRequired(jdbcType);
1521: }
1522: }
1523:
1524: private void populateColumns(StringBuffer stmtBuf, String rTable) {
1525: for (int i = 0, I = getColumnCount(); i < I; i++) {
1526: Column col = getColumn(i);
1527: if (i != 0) {
1528: stmtBuf.append(", ");
1529: }
1530: if (rTable == null) {
1531: // DB2 does not like qualified column name in INSERT
1532: stmtBuf.append(col.getName());
1533: } else {
1534: stmtBuf.append(rTable).append(".")
1535: .append(col.getName());
1536: }
1537: }
1538: }
1539:
1540: private void populateSet(StringBuffer stmtBuf) {
1541: if (_updateCols != null) {
1542: for (int i = 0, I = _updateCols.size(); i < I; i++) {
1543: if (i != 0) {
1544: stmtBuf.append(", ");
1545: }
1546: ColumnIdentifier colid = (ColumnIdentifier) _updateCols
1547: .get(i);
1548: stmtBuf.append(colid.getName()).append("=").append("?");
1549: }
1550: } else {
1551: for (int i = 0, I = getColumnCount(); i < I; i++) {
1552: Column col = getColumn(i);
1553: if (i != 0) {
1554: stmtBuf.append(", ");
1555: }
1556: stmtBuf.append(col.getName()).append("=").append("?");
1557: }
1558: }
1559: }
1560:
1561: private void populateValues(StringBuffer stmtBuf) {
1562: for (int i = 0, I = getColumnCount(); i < I; i++) {
1563: if (i != 0) {
1564: stmtBuf.append(", ");
1565: }
1566: stmtBuf.append("?");
1567: }
1568: }
1569:
1570: private void populateWhere(StringBuffer stmtBuf) {
1571: if (_pk != null) {
1572: // create where based on PK
1573: for (int i = 0, I = _pk.getSelectableCount(); i < I; i++) {
1574: String colName = _pk.getSelectable(i).getName();
1575: if (i != 0) {
1576: stmtBuf.append(" AND ");
1577: }
1578: stmtBuf.append(colName).append("=").append("?");
1579: }
1580: } else if (!_uniqueConstraints.isEmpty()) {
1581: // create where based on one unique column
1582: UniqueConstraint uc = (UniqueConstraint) _uniqueConstraints
1583: .get(0);
1584: for (int i = 0, I = uc.getSelectableCount(); i < I; i++) {
1585: String colName = uc.getSelectable(i).getName();
1586: if (i != 0) {
1587: stmtBuf.append(" AND ");
1588: }
1589: stmtBuf.append(colName).append("=").append("?");
1590: }
1591: } else {
1592: // use all columns in where condition
1593: for (int i = 0, I = getColumnCount(); i < I; i++) {
1594: Column col = getColumn(i);
1595: if (i != 0) {
1596: stmtBuf.append(" AND ");
1597: }
1598:
1599: stmtBuf.append("( ( ");
1600: stmtBuf.append(col.getName());
1601: stmtBuf.append(" = ? ) OR ((");
1602: stmtBuf.append(NULL_CHECK);
1603: stmtBuf.append(" = ?) AND (");
1604: stmtBuf.append(col.getName());
1605: stmtBuf.append(" IS NULL))) ");
1606: }
1607: }
1608: }
1609:
1610: private void publishEvent(TableModifiedEvent e)
1611: throws AxionException {
1612: for (int i = 0, I = _tableModificationListeners.size(); i < I; i++) {
1613: e
1614: .visit((TableModificationListener) (_tableModificationListeners
1615: .get(i)));
1616: }
1617: }
1618:
1619: private void setName(String name) {
1620: _name = name;
1621: }
1622:
1623: private void setObject(PreparedStatement pstmt, int i, Object obj)
1624: throws SQLException {
1625: if (obj == null) {
1626: int jdbcType = getColumn(i).getDataType().getJdbcType();
1627: pstmt.setNull(i + 1, jdbcType);
1628: } else {
1629: pstmt.setObject(i + 1, obj);
1630: }
1631: }
1632:
1633: private void setValueParams(PreparedStatement pstmt, Row row)
1634: throws SQLException, AxionException {
1635: if (_updateCols != null) {
1636: for (int i = 0, I = _updateCols.size(); i < I; i++) {
1637: ColumnIdentifier colid = (ColumnIdentifier) _updateCols
1638: .get(i);
1639: setObject(pstmt, i, row.get(getColumnIndex(colid
1640: .getName())));
1641: }
1642: } else {
1643: for (int i = 0, I = row.size(); i < I; i++) {
1644: setObject(pstmt, i, row.get(i));
1645: }
1646: }
1647: }
1648:
1649: private void setWhereParams(PreparedStatement pstmt, Row oldrow,
1650: int size) throws AxionException, SQLException {
1651: if (_pk != null) {
1652: // create where based on PK
1653: for (int i = 0, I = _pk.getSelectableCount(); i < I; i++) {
1654: Object obj = oldrow.get(getColumnIndex(_pk
1655: .getSelectable(i).getName()));
1656: pstmt.setObject(size + i + 1, obj);
1657: }
1658: } else if (!_uniqueConstraints.isEmpty()) {
1659: // create where based on one unique column
1660: UniqueConstraint uc = (UniqueConstraint) _uniqueConstraints
1661: .get(0);
1662: for (int i = 0, I = uc.getSelectableCount(); i < I; i++) {
1663: Object obj = oldrow.get(getColumnIndex(uc
1664: .getSelectable(i).getName()));
1665: pstmt.setObject(size + i + 1, obj);
1666: }
1667: } else {
1668: // use all columns in where condition
1669: int base = 0;
1670: int colCnt = getColumnCount();
1671: for (int i = 0; i < colCnt; i++) {
1672: Object obj = oldrow.get(i);
1673: base = size + (i * 2);
1674: if (obj == null) {
1675: int jdbcType = getColumn(i).getDataType()
1676: .getJdbcType();
1677: pstmt.setNull(base + 1, jdbcType);
1678: pstmt.setInt(base + 2, NULL_CHECK);
1679: } else {
1680: pstmt.setObject(base + 1, obj);
1681: pstmt.setInt(base + 2, EQUALITY_CHECK);
1682: }
1683: }
1684: }
1685: }
1686:
1687: private boolean tableExistsInRemoteDB() throws AxionException {
1688: assertConnection();
1689: try {
1690: String schemaPattern = getSchemaWildcardForRemoteDB();
1691:
1692: // Use known schema name if it has been given.
1693: if (_schemaName != null && _schemaName.trim().length() != 0) {
1694: schemaPattern = _schemaName;
1695: }
1696:
1697: // First, try upper-case name with default-case schema pattern.
1698: if (doesRemoteTableExist(_remoteTableName.toUpperCase(),
1699: schemaPattern)) {
1700: _remoteTableName = _remoteTableName.toUpperCase();
1701: return true;
1702: // Next, try default-case name.
1703: } else if (doesRemoteTableExist(_remoteTableName,
1704: schemaPattern)) {
1705: return true;
1706: } else if (schemaPattern != null) {
1707: // Now try upper-case name and schema pattern.
1708: if (doesRemoteTableExist(
1709: _remoteTableName.toUpperCase(), schemaPattern
1710: .toUpperCase())) {
1711: _remoteTableName = _remoteTableName.toUpperCase();
1712: return true;
1713: }
1714:
1715: // Finally, try default-case name and upper-case schema pattern.
1716: return doesRemoteTableExist(_remoteTableName,
1717: schemaPattern.toUpperCase());
1718: }
1719:
1720: return false;
1721: } catch (SQLException e) {
1722: throw convertException("Remote table/view "
1723: + _remoteTableName + " not found", e);
1724: }
1725: }
1726:
1727: private class ExternalDatabaseTableOrganizationContext extends
1728: BaseTableOrganizationContext {
1729:
1730: public Set getPropertyKeys() {
1731: Set baseKeys = getBasePropertyKeys();
1732: Set keys = new HashSet(baseKeys.size()
1733: + PROPERTY_KEYS.size());
1734: keys.addAll(baseKeys);
1735: keys.addAll(PROPERTY_KEYS);
1736:
1737: return keys;
1738: }
1739:
1740: public Set getRequiredPropertyKeys() {
1741: Set baseRequiredKeys = getBaseRequiredPropertyKeys();
1742: Set keys = new HashSet(baseRequiredKeys.size()
1743: + REQUIRED_KEYS.size());
1744: keys.addAll(baseRequiredKeys);
1745: keys.addAll(REQUIRED_KEYS);
1746:
1747: return keys;
1748: }
1749:
1750: public void readOrSetDefaultProperties(Properties props)
1751: throws AxionException {
1752: // Validate all supplied property keys to ensure they are recognized.
1753: super .assertValidPropertyKeys(props);
1754:
1755: _dblink = props.getProperty(PROP_DB);
1756:
1757: DatabaseLink server = _db.getDatabaseLink(_dblink);
1758: if (server == null) {
1759: throw new AxionException("Database link " + _dblink
1760: + " does not exist.");
1761: }
1762:
1763: _where = props.getProperty(PROP_WHERE);
1764: _remoteTableName = props.getProperty(PROP_REMOTETABLE);
1765:
1766: _catalogName = props.getProperty(PROP_CATALOG);
1767: if (_catalogName == null) {
1768: _catalogName = server.getCatalogName();
1769: }
1770:
1771: _schemaName = props.getProperty(PROP_SCHEMA);
1772: if (_schemaName == null) {
1773: _schemaName = server.getSchemaName();
1774: }
1775:
1776: if (_where != null && _where.trim().length() != 0) {
1777: _isUpdatable = false;
1778: }
1779:
1780: if (_dblink == null || _dblink.trim().length() == 0
1781: || !_db.hasDatabaseLink(_dblink)) {
1782: throw new AxionException(
1783: "Please provide a valid server name");
1784: }
1785:
1786: _remoteTableName = (_remoteTableName != null) ? _remoteTableName
1787: : getName();
1788: if ((props.getProperty(PROP_CREATE_IF_NOT_EXIST) != null)
1789: && ("TRUE".equalsIgnoreCase(props
1790: .getProperty(PROP_CREATE_IF_NOT_EXIST)))) {
1791: _isCreateIfNotExist = true;
1792: }
1793:
1794: setUp(server);
1795:
1796: _log.debug("External DB Table " + _remoteTableName
1797: + " created (Updatable=" + _isUpdatable + ")");
1798:
1799: }
1800:
1801: public void updateProperties() {
1802: super .updateProperties();
1803:
1804: setProperty(PROP_DB, _dblink);
1805: setProperty(PROP_WHERE, _where);
1806: setProperty(PROP_REMOTETABLE, _remoteTableName);
1807: setProperty(PROP_CATALOG, _catalogName);
1808: setProperty(PROP_SCHEMA, _schemaName);
1809: setProperty(PROP_LOADTYPE, ExternalTableFactory.TYPE_REMOTE);
1810: }
1811: }
1812:
1813: private class IndexRowIterator extends ListIteratorRowIterator {
1814: public IndexRowIterator(List list) {
1815: super (list.listIterator());
1816: }
1817:
1818: public void add(Row row) throws AxionException {
1819: addRow(row);
1820: }
1821:
1822: public void remove() {
1823: try {
1824: deleteRow(current());
1825: } catch (AxionException e) {
1826: throw ExceptionConverter.convertToRuntimeException(e);
1827: }
1828: }
1829:
1830: public void set(Row row) {
1831: try {
1832: updateRow(current(), row);
1833: } catch (AxionException e) {
1834: throw ExceptionConverter.convertToRuntimeException(e);
1835: }
1836: }
1837: }
1838:
1839: /**
1840: * AxionDB Index which delegates to remote DB by using SELECT ... WHERE
1841: * INDEX_COL_COMPARE
1842: */
1843: private class ExternalTableIndex extends BaseIndex {
1844: public ExternalTableIndex(Column column) {
1845: super (column.getName(), column, false);
1846: }
1847:
1848: public IndexLoader getIndexLoader() {
1849: throw new UnsupportedOperationException("getIndexLoader");
1850: }
1851:
1852: public RowIterator getInorderRowIterator(RowSource source)
1853: throws AxionException {
1854: return null;
1855: }
1856:
1857: public RowIterator getRowIterator(RowSource source,
1858: Function fn, Object value) throws AxionException {
1859: return getIndexedRows(((ComparisonFunction) fn),
1860: getIndexedColumn().getName(), value);
1861: }
1862:
1863: public void save(File dataDirectory) throws AxionException {
1864: throw new UnsupportedOperationException("save");
1865: }
1866:
1867: public void saveAfterTruncate(File dataDirectory)
1868: throws AxionException {
1869: throw new UnsupportedOperationException("saveAfterTruncate");
1870: }
1871:
1872: public boolean supportsFunction(Function fn) {
1873: if (fn instanceof ComparisonFunction) {
1874: return true;
1875: }
1876: return false;
1877: }
1878:
1879: public void truncate() throws AxionException {
1880: // No action
1881: }
1882:
1883: public String getType() {
1884: return XTERNAL_DB;
1885: }
1886:
1887: public void changeRowId(Table table, Row row, int oldId,
1888: int newId) throws AxionException {
1889: throw new UnsupportedOperationException("changeRowId");
1890: }
1891: }
1892:
1893: private static Log _log = LogFactory
1894: .getLog(ExternalDatabaseTable.class);
1895: private static final String XTERNAL_DB = "externalDB";;
1896: private final static int NULL_CHECK = 1;
1897: private final static int EQUALITY_CHECK = 0;
1898: private final static int BATCH_SIZE = 1000;
1899: private final static int FETCH_SIZE = 100; // 100 seems to be the best choice
1900: private final static String[] JDBC_TABLE_OBJECT_TYPE = { "TABLE" };
1901:
1902: private List _indexes;
1903: private String _catalogName;
1904: private Map _colIndexToColIdMap;
1905: private List _cols = new ArrayList();
1906: private Connection _conn;
1907: private Map _constraints = new HashMap(4);
1908:
1909: private Database _db;
1910: private String _dblink;
1911:
1912: private int _deleteModCount;
1913: private PreparedStatement _deletePS;
1914: private ResultSet _externalRs;
1915: private int _insertModCount;
1916: private PreparedStatement _insertPS;
1917:
1918: private boolean _isAxion = false;
1919: private boolean _isCreateIfNotExist = false;
1920: private boolean _isUpdatable = true;
1921: private int _modCount = 0;
1922: private String _name;
1923:
1924: private Set _notNullColumns = new HashSet(4);
1925: private PrimaryKeyConstraint _pk;
1926: private String _qualifiedTableName;
1927: private String _remoteTableName;
1928: private int _rowCount = 0;
1929: private PreparedStatement _rowCountPS;
1930:
1931: private String _schemaName;
1932: private Statement _stmt;
1933: private List _tableModificationListeners = new ArrayList();
1934: private String _trunncateSQL;
1935: private Map _typeInfoMap = new HashMap(20);
1936:
1937: private List _uniqueConstraints = new ArrayList(2);
1938: private List _updateCols;
1939: private int _updateModCount;
1940: private PreparedStatement _updatePS;
1941: private Map _indexSelectPSs = new HashMap();
1942: private String _where;
1943: private ExternalDatabaseTableOrganizationContext context;
1944:
1945: }
|