0001: package com.xoetrope.service;
0002:
0003: import java.io.IOException;
0004: import java.io.StringReader;
0005: import java.sql.DatabaseMetaData;
0006: import java.sql.ResultSet;
0007: import java.sql.ResultSetMetaData;
0008: import java.sql.SQLException;
0009: import java.sql.Statement;
0010: import java.util.Date;
0011: import java.sql.Timestamp;
0012: import java.util.Vector;
0013:
0014: import net.n3.nanoxml.IXMLElement;
0015: import net.n3.nanoxml.XMLElement;
0016: import net.n3.nanoxml.XMLWriter;
0017: import java.io.StringWriter;
0018:
0019: import net.xoetrope.debug.DebugLogger;
0020: import net.xoetrope.optional.data.sql.ConnectionObject;
0021: import net.xoetrope.optional.data.sql.DataConnection;
0022: import net.xoetrope.optional.data.sql.DatabaseTable;
0023: import net.xoetrope.optional.service.ServiceProxy;
0024: import net.xoetrope.xml.XmlElement;
0025: import net.xoetrope.xml.XmlSource;
0026: import net.xoetrope.xui.build.BuildProperties;
0027: import java.sql.PreparedStatement;
0028: import java.sql.Time;
0029: import net.xoetrope.optional.service.ServiceContext;
0030:
0031: import net.xoetrope.optional.service.ServiceProxyArgs;
0032: import net.xoetrope.optional.service.XServiceHelper;
0033: import net.xoetrope.xui.XProject;
0034:
0035: public abstract class XReplicationEngine implements
0036: net.xoetrope.optional.data.sql.XReplicationEngine {
0037:
0038: protected static boolean hasTimeStamps = false;
0039:
0040: protected static boolean lookupRemoteDataService = true;
0041: protected static int replicationStatementCount = 0;
0042:
0043: protected DataConnection dataConnection;
0044: protected XProject currentProject;
0045: protected ServiceProxy remoteService;
0046:
0047: /**
0048: * Creates a new instance of XReplicationEngine
0049: * @param project the current project
0050: * @param dataConnection DataConnection object to be used
0051: */
0052: public XReplicationEngine(XProject project, DataConnection dataConn) {
0053: currentProject = project;
0054: dataConnection = dataConn;
0055: if (lookupRemoteDataService)
0056: remoteService = XServiceHelper.getServiceProxy(
0057: currentProject, "replicationService");
0058: }
0059:
0060: /**
0061: * Synchronizes the specified local database table.
0062: * @param tableName the name of the table
0063: * @param connName the connection name
0064: * @param updateStrategy the strategory used to udpate the database.
0065: * @return a result set for a non cachable query or if the data has been
0066: * cached and the result set contains the subset of data matches the query.
0067: * Null is returned if the the server returns the full dataset and the query
0068: * needs to be run against the local copy.
0069: */
0070: public ResultSet synchronizeTable(String tableName,
0071: int updateStrategy) {
0072: checkLocalDatabase("XSYSSERVERTIMESTAMPS",
0073: DatabaseTable.PARANOID_UPDATE);
0074: return checkLocalDatabase(tableName,
0075: DatabaseTable.NORMAL_UPDATE);
0076: }
0077:
0078: // Synchronization code ------------------------------------------------------
0079: private static String fieldAttrName[] = { "a", "b", "c", "d", "e",
0080: "f", "g", "h", "i", "j", "k", "l", "m", "n", "o", "p", "q",
0081: "r", "s", "t", "u", "v", "w", "x", "y", "z", "1", "2", "3",
0082: "4", "5", "6", "7", "8", "9", "0", "A", "B", "C", "D" };
0083:
0084: /**
0085: * Check that the local database definition is up to date.
0086: * @param tableName the name of the table.
0087: * @param updateStrategy the strategy used to update the database
0088: * @return a result set for a non cachable query or if the data has been
0089: * cached and the result set contains the subset of data matches the query.
0090: * Null is returned if the the server returns the full dataset and the query
0091: * needs to be run against the local copy.
0092: */
0093: private synchronized ResultSet checkLocalDatabase(String tableName,
0094: int updateStrategy) {
0095: ResultSet rs = null;
0096: String xml = null;
0097:
0098: // if there is no route to the remote service don't try checking for updates
0099: try {
0100: if ((remoteService == null) || !remoteService.isAvailable())
0101: return null;
0102:
0103: // check whether the table should be udpated
0104: if (!requiresUpdate(tableName))
0105: return null;
0106:
0107: postDeletes(tableName);
0108: boolean updateFixupInProgress = (postUpdates(tableName) > 0L);
0109:
0110: // if the database table doesn't exist locally create it
0111: boolean createTable = !tableExists(tableName);
0112: if (createTable) {
0113: // gets the remote DDL and injects it into local database
0114: ServiceContext context = new ServiceContext();
0115: ServiceProxyArgs args = context.getArgs();
0116: args.setPassParam("table", tableName);
0117: args.setPassParam("key", null);
0118: args.setPassParam("ts", null);
0119: remoteService.call("getDDL", context);
0120: String createSql = context.getReturnValue().toString();
0121: dataConnection.doUpdate(createSql);
0122:
0123: // create triggers, etc.
0124: tagLocalTable(tableName);
0125: }
0126:
0127: Timestamp lastUpdateTime = getLocalTimestamp(tableName);
0128: ServiceContext context = new ServiceContext();
0129: ServiceProxyArgs args = context.getArgs();
0130: args = context.getArgs();
0131: args.setPassParam("table", tableName);
0132: args.setPassParam("key", null);
0133: args.setPassParam("ts", Long.toString(createTable ? 0L
0134: : lastUpdateTime.getTime()));
0135: long localTime = lastUpdateTime.getTime();
0136: long serverTime = getServerTimestamp(tableName).getTime();
0137: if (updateStrategy == DatabaseTable.PARANOID_UPDATE)
0138: localTime = 0L;
0139: else if (updateStrategy == DatabaseTable.OPTIMISTIC_UPDATE) {
0140: if (localTime > 0L)
0141: localTime = Long.MAX_VALUE;
0142: }
0143:
0144: /***/
0145: // Is the local data up to date
0146: if (updateFixupInProgress || (serverTime == 0)
0147: || (localTime < serverTime)) {
0148: boolean localDataDropped = false;
0149:
0150: // Fetch several rows at a time but maybe not all. The complete attribute
0151: // indicates if more records are to be sent
0152: boolean isComplete = false;
0153: while (!isComplete) {
0154: // Request the data from the server side.
0155: Integer callStatus = (Integer) remoteService.call(
0156: "getData", context);
0157:
0158: if (context.hasReturnValue()) {
0159: xml = context.getReturnValue().toString();
0160: if ((xml != null) && (xml.length() > 0)) {
0161: XmlSource src = new XmlSource();
0162: XmlElement rsRemote = src
0163: .read(new StringReader(xml));
0164: Timestamp timestamp = new Timestamp(Long
0165: .parseLong(rsRemote
0166: .getAttribute("timestamp")));
0167:
0168: // The complete flag indicates whether more data is available.
0169: String completeStr = rsRemote
0170: .getAttribute("complete");
0171: if (completeStr != null) {
0172: isComplete = completeStr
0173: .compareToIgnoreCase("true") == 0;
0174: // Get the last PSEUDOID retrieved
0175: //args.setPassParam( "maxid", Long.toString( maxPseudoId ));
0176: }
0177:
0178: // The delete flag indicates that certain server side updates (e.g. row deletions)
0179: // have occurred and hence the local data must be replaced in its entirety
0180: boolean deleteRequired = false;
0181: String delStr = rsRemote
0182: .getAttribute("delete");
0183: if (delStr != null)
0184: deleteRequired = delStr
0185: .equalsIgnoreCase("true");
0186:
0187: // The key is used on the server side to track the resources used to know
0188: // what has been transmitted and what remains for this session.
0189: args.setPassParam("key", rsRemote
0190: .getAttribute("key"));
0191:
0192: long maxPseudoId = getMaxPseudoId(tableName
0193: .toUpperCase());
0194:
0195: // Clear out the local data
0196: if (deleteRequired && !localDataDropped) {
0197: clearLocalData(tableName);
0198: localDataDropped = true;
0199: maxPseudoId = 0;
0200: }
0201:
0202: // Replace the local data
0203: if (appendLocalData(tableName, rsRemote,
0204: updateFixupInProgress ? 0L
0205: : maxPseudoId) > 0L) {
0206: // Record the current timestamp.
0207: setLocalTimestamp(tableName,
0208: new Timestamp(new Date()
0209: .getTime()));
0210: }
0211:
0212: } else
0213: break;
0214: } else
0215: break;
0216: }
0217: }
0218:
0219: /***/
0220:
0221: } catch (Exception ex) {
0222: rs = null;
0223: ex.printStackTrace();
0224: } finally {
0225: //@todo ???
0226: ConnectionObject connObj = dataConnection
0227: .getConnectionObject();
0228: if (connObj != null)
0229: connObj.expireLease();
0230: }
0231:
0232: return rs;
0233: }
0234:
0235: protected void removeDeleteRecords(String tableName, String[] ids) {
0236: if (ids.length == 0)
0237: return;
0238: for (int i = 0; i < ids.length; i++)
0239: ids[0] += (ids[i] + ", ");
0240: dataConnection.doUpdate("DELETE FROM " + tableName
0241: + " WHERE PSEUDOID IN (" + ids[0] + ")");
0242: dataConnection
0243: .doUpdate("DELETE FROM XSYSLOCALDELETIONS WHERE TABLENAME='"
0244: + tableName
0245: + "' "
0246: + "AND PSEUDOID IN ("
0247: + ids[0] + ")");
0248: }
0249:
0250: protected abstract void tagLocalTable(String tableName);
0251:
0252: /**
0253: * Tests whether the specified table exists locally.
0254: * @param tablenName the name of the table to be queried
0255: * @return ture if the table exists, false otherwise
0256: */
0257: protected boolean tableExists(String tableName) {
0258: boolean tableExists = false;
0259: try {
0260: ConnectionObject connObj = dataConnection
0261: .getConnectionObject();
0262: DatabaseMetaData dbmd = connObj.getConnection()
0263: .getMetaData();
0264: ResultSet localMetaData = dbmd.getTables(null, null,
0265: tableName.toUpperCase(), null);
0266: tableExists = localMetaData.next();
0267: } catch (Exception ex) {
0268: tableExists = false;
0269: }
0270: return tableExists;
0271: }
0272:
0273: protected void removeDeleteRecords(String tableName, String ids) {
0274: dataConnection.doUpdate("DELETE FROM " + tableName
0275: + " WHERE PSEUDOID IN(" + ids + ")");
0276: dataConnection
0277: .doUpdate("DELETE FROM XSYSLOCALDELETIONS WHERE TABLENAME='"
0278: + tableName
0279: + "' "
0280: + "AND PSEUDOID IN ("
0281: + ids
0282: + ")");
0283: }
0284:
0285: protected ResultSet getModifiedRecords(String tableName) {
0286: ResultSet resultSet = null;
0287: long lastUpdateID = -1;
0288: try {
0289: resultSet = dataConnection.doQuery("SELECT MAXPSEUDOID "
0290: + "FROM XSYSLOCALTIMESTAMPS WHERE tableName='"
0291: + tableName + "'");
0292: lastUpdateID = resultSet.getLong(1);
0293: } catch (Exception ex) {
0294: return null;
0295: }
0296:
0297: resultSet = dataConnection
0298: .doQuery("SELECT DISTINCT PseudoId FROM "
0299: + "XSYSLOCALDELETIONS WHERE tableName='"
0300: + tableName + "' AND PSEUDOID<=" + lastUpdateID);
0301:
0302: return resultSet;
0303: }
0304:
0305: /**** END OF POSTDELETES ***************/
0306:
0307: protected Timestamp getLastUpdate(String tableName) {
0308: Timestamp lastUpdate = null;
0309: try {
0310: ResultSet resultSet = dataConnection
0311: .doQuery("SELECT MAXPSEUDOID, LASTUPDATE "
0312: + "FROM XSYSLOCALTIMESTAMPS WHERE tableName='"
0313: + tableName + "'");
0314: resultSet.next();
0315:
0316: } catch (Exception ex) {
0317: lastUpdate = null;
0318: } finally {
0319: dataConnection.closeQuery();
0320: }
0321: return lastUpdate;
0322: }
0323:
0324: /**
0325: * Post the deleted records back to the server
0326: * @param tableName the current table to process
0327: */
0328: // protected synchronized int postDeletes( String tableName )
0329: // {
0330: // try {
0331: // if ( !isSysTable( tableName )) {
0332: //
0333: // Timestamp lastUpdate = getLastUpdate( tableName );
0334: // if ( lastUpdate == null )
0335: // return 0;
0336: //
0337: // ServiceContext context = new ServiceContext();
0338: // ServiceProxyArgs args = context.getArgs();
0339: // args.setPassParam( "table", tableName );
0340: // args.setPassParam( "lastUpdateDate", Long.toString( lastUpdate.getTime()));
0341: //
0342: // // check for modified records with a delete date
0343: // ResultSet resultSet = getModifiedRecords( tableName );
0344: // XMLElement data = new XMLElement( "deletes" );
0345: // long updatedRecords = convertData( data, resultSet );
0346: // // close the query opened in getModifiedRecords method
0347: // if ( resultSet != null )
0348: // resultSet.close();
0349: // dataConnection.closeQuery();
0350: //
0351: // String s = serializeData( data );
0352: // args.setPassParam( "data", s );
0353: // Object rc = remoteService.call( "postDeletes", context );
0354: //
0355: // if ( !context.hasErrors() && ( rc == ServiceProxy.COMPLETE )) {
0356: // Object returnValue = context.getReturnValue();
0357: // if ( returnValue != null ) {
0358: // String xml = returnValue.toString();
0359: // if (( xml != null ) && ( xml.length() > 0 )) {
0360: // XmlSource src = new XmlSource();
0361: // XmlElement rsRemote = src.read( new StringReader( xml ));
0362: //
0363: // Vector rows = rsRemote.getChildren();
0364: // int numRows = rows.size();
0365: // String[] ids = new String[ numRows ];
0366: // for ( int i = 0; i < numRows; i++ ) {
0367: // XmlElement row = (XmlElement)rows.elementAt( i );
0368: // ids[ i ] = row.getAttribute( "b" );
0369: // }
0370: // removeDeleteRecords( tableName, ids );
0371: //
0372: // }
0373: // }
0374: // }
0375: //
0376: // }
0377: // }
0378: // catch( Exception ex ) {
0379: // ex.printStackTrace();
0380: // }
0381: // return 0;
0382: // }
0383: private synchronized int postDeletes(String tableName) {
0384: try {
0385: if (!tableName.startsWith("XSYS")) {
0386: Timestamp lastUpdate;
0387: long lastUpdateID = 0;
0388:
0389: // 1 Get the last update date
0390: ResultSet resultSet = null;
0391: try {
0392: resultSet = dataConnection
0393: .doQuery("SELECT MAXPSEUDOID, LASTUPDATE "
0394: + "FROM XSYSLOCALTIMESTAMPS WHERE tableName='"
0395: + tableName + "'");
0396: resultSet.next();
0397:
0398: lastUpdateID = resultSet.getLong(1);
0399: lastUpdate = resultSet.getTimestamp(2);
0400: } catch (Exception ex) {
0401: return 0;
0402: } finally {
0403: dataConnection.closeQuery();
0404: }
0405:
0406: ServiceContext context = new ServiceContext();
0407: ServiceProxyArgs args = context.getArgs();
0408: args.setPassParam("table", tableName);
0409: args.setPassParam("lastUpdateDate", Long
0410: .toString(lastUpdate.getTime()));
0411:
0412: // 2 Check for modified records with a delete date (the last update date from
0413: resultSet = dataConnection
0414: .doQuery("SELECT DISTINCT PseudoId FROM XSYSLOCALDELETIONS "
0415: + "WHERE tableName='"
0416: + tableName
0417: + "' AND PSEUDOID<=" + lastUpdateID);
0418: XMLElement data = new XMLElement("deletes");
0419: long updatedRecords = serializeData(data, resultSet,
0420: args);
0421: if (resultSet != null)
0422: resultSet.close();
0423: dataConnection.closeQuery();
0424:
0425: Object rc = remoteService.call("postDeletes", context);
0426:
0427: // 4 On success remove the delete records from the deletions table
0428: if (!context.hasErrors()
0429: && (rc == ServiceProxy.COMPLETE)) {
0430: Object returnValue = context.getReturnValue();
0431: if (returnValue != null) {
0432: String xml = returnValue.toString();
0433:
0434: if ((xml != null) && (xml.length() > 0)) {
0435: XmlSource src = new XmlSource();
0436: XmlElement rsRemote = src
0437: .read(new StringReader(xml));
0438: Vector rows = rsRemote.getChildren();
0439: String ids = "";
0440: int numRows = rows.size();
0441: if (numRows > 0) {
0442: for (int i = 0; i < numRows; i++) {
0443: if (i > 0)
0444: ids += ",";
0445:
0446: XmlElement row = (XmlElement) rows
0447: .elementAt(i);
0448: ids += row.getAttribute("b");
0449: }
0450:
0451: dataConnection.doUpdate("DELETE FROM "
0452: + tableName + " "
0453: + "WHERE PSEUDOID IN(" + ids
0454: + ")");
0455: dataConnection
0456: .doUpdate("DELETE FROM XSYSLOCALDELETIONS "
0457: + "WHERE TABLENAME='"
0458: + tableName
0459: + "' AND PSEUDOID IN("
0460: + ids + ")");
0461: }
0462: }
0463:
0464: }
0465: }
0466: }
0467: } catch (Exception ex) {
0468: ex.printStackTrace();
0469: }
0470:
0471: return 0;
0472: }
0473:
0474: /**
0475: * Post new records back to the server
0476: * @param tableName the current table to process
0477: */
0478: private synchronized long postUpdates(String tableName) {
0479: long updatedRecords = 0L;
0480: try {
0481: if (!tableName.equals("XSYSSERVERTIMESTAMPS")) {
0482: Timestamp lastUpdate;
0483: long oldMaxPseudoId = 0L;
0484:
0485: // 1 Send the new Records
0486: // The Pseudo ID is an auto incrementing field used to uniquely identify a row.
0487: ResultSet resultSet = dataConnection
0488: .doQuery("SELECT MAXPSEUDOID, LASTUPDATE FROM XSYSLOCALTIMESTAMPS WHERE tableName='"
0489: + tableName + "'");
0490: resultSet.next();
0491: try {
0492: oldMaxPseudoId = resultSet.getLong(1);
0493: lastUpdate = resultSet.getTimestamp(2);
0494: } catch (SQLException ex) {
0495: return 0L;
0496: } finally {
0497: dataConnection.closeQuery();
0498: }
0499:
0500: // if ( oldMaxPseudoId == 0 )
0501: // return -1L;
0502:
0503: ServiceContext context = new ServiceContext();
0504: ServiceProxyArgs args = context.getArgs();
0505: args.setPassParam("table", tableName);
0506:
0507: long newMaxPseudoId = getMaxPseudoId(tableName);
0508: if (newMaxPseudoId > oldMaxPseudoId) {
0509: resultSet = dataConnection.doQuery("SELECT * FROM "
0510: + tableName + " WHERE pseudoId>"
0511: + oldMaxPseudoId);
0512:
0513: // Serialize the update records.
0514: XMLElement data = new XMLElement("inserts");
0515: updatedRecords = serializeData(data, resultSet,
0516: args);
0517: dataConnection.closeQuery();
0518: remoteService.call("postInserts", context);
0519:
0520: // 2 Delete the updated records so as to fix up their IDs
0521: if (updatedRecords > 0) {
0522: dataConnection.doUpdate("DELETE FROM "
0523: + tableName + " WHERE pseudoId>"
0524: + oldMaxPseudoId);
0525: dataConnection
0526: .doUpdate("DELETE FROM xSysLocalDeletions WHERE tableName='"
0527: + tableName
0528: + "' AND pseudoId>"
0529: + oldMaxPseudoId);
0530: }
0531: dataConnection.closeQuery();
0532: }
0533:
0534: // 3 Check for modified records (between the time of the last sync and
0535: // the start of this method -don't regard the fixup records as modified
0536: // records).
0537: resultSet = dataConnection.doQuery("SELECT * FROM "
0538: + tableName + " WHERE " + tableName + "_ts>'"
0539: + lastUpdate.toString() + "'");
0540: if (resultSet != null) {
0541: XMLElement data = new XMLElement("updates");
0542: data.setAttribute("table", tableName);
0543: long modifiedRecords = serializeData(data,
0544: resultSet, args);
0545: if (modifiedRecords > 0) {
0546: Object rc = remoteService.call("postUpdates",
0547: context);
0548: if (!context.hasErrors()
0549: && (rc == ServiceProxy.COMPLETE)) {
0550: Timestamp timestamp = new Timestamp(
0551: new Date().getTime());
0552: setLocalTimestamp(tableName, timestamp);
0553: }
0554: }
0555:
0556: resultSet.close();
0557: }
0558: dataConnection.closeQuery();
0559: }
0560: } catch (Exception ex) {
0561: ex.printStackTrace();
0562: }
0563:
0564: return updatedRecords;
0565: }
0566:
0567: protected int convertData(XMLElement data, ResultSet resultSet)
0568: throws SQLException {
0569: int updatedRecords = 0;
0570:
0571: if (resultSet != null) {
0572: // Return the next set of rows.
0573: ResultSetMetaData rsmd = resultSet.getMetaData();
0574: int recordCount = 0;
0575: int numFields = rsmd.getColumnCount();
0576: while (resultSet.next()) {
0577: IXMLElement child = data.createElement("row");
0578: for (int i = 1; i <= numFields; i++) {
0579: int fieldType = rsmd.getColumnType(i);
0580: switch (fieldType) {
0581: case java.sql.Types.BIGINT:
0582: case java.sql.Types.DECIMAL:
0583: case java.sql.Types.INTEGER:
0584: case java.sql.Types.NUMERIC:
0585: case java.sql.Types.REAL:
0586: case java.sql.Types.SMALLINT:
0587: case java.sql.Types.DOUBLE:
0588: case java.sql.Types.FLOAT:
0589: child.setAttribute(fieldAttrName[i], resultSet
0590: .getString(i));
0591: break;
0592: case java.sql.Types.TIMESTAMP:
0593: child.setAttribute(fieldAttrName[i], Long
0594: .toString(resultSet.getTimestamp(i)
0595: .getTime()));
0596: break;
0597: default:
0598: child.setAttribute(fieldAttrName[i],
0599: sqlEscape(resultSet.getString(i)));
0600: break;
0601: }
0602: }
0603: data.addChild(child);
0604: updatedRecords++;
0605: replicationStatementCount++;
0606: }
0607: }
0608: return updatedRecords;
0609: }
0610:
0611: public int serializeData(XMLElement data, ResultSet resultSet,
0612: ServiceProxyArgs args) throws SQLException, IOException {
0613: int updatedRecords = 0;
0614:
0615: if (resultSet != null) {
0616: // Return the next set of rows.
0617: ResultSetMetaData rsmd = resultSet.getMetaData();
0618: int recordCount = 0;
0619: int numFields = rsmd.getColumnCount();
0620: while (resultSet.next()) {
0621: IXMLElement child = data.createElement("row");
0622: for (int i = 1; i <= numFields; i++) {
0623: int fieldType = rsmd.getColumnType(i);
0624: switch (fieldType) {
0625: case java.sql.Types.BIGINT:
0626: case java.sql.Types.DECIMAL:
0627: case java.sql.Types.INTEGER:
0628: case java.sql.Types.NUMERIC:
0629: case java.sql.Types.REAL:
0630: case java.sql.Types.SMALLINT:
0631: case java.sql.Types.DOUBLE:
0632: case java.sql.Types.FLOAT:
0633: child.setAttribute(fieldAttrName[i], resultSet
0634: .getString(i));
0635: break;
0636: case java.sql.Types.TIMESTAMP:
0637: child.setAttribute(fieldAttrName[i], Long
0638: .toString(resultSet.getTimestamp(i)
0639: .getTime()));
0640: break;
0641: default:
0642: child.setAttribute(fieldAttrName[i],
0643: sqlEscape(resultSet.getString(i)));
0644: break;
0645: }
0646: }
0647: data.addChild(child);
0648: updatedRecords++;
0649: replicationStatementCount++;
0650: }
0651:
0652: if (updatedRecords > 0) {
0653: System.out.println(updatedRecords);
0654: StringWriter writer = new StringWriter();
0655: XMLWriter xmlWriter = new XMLWriter(writer);
0656: xmlWriter.write(data, true, 2);
0657:
0658: args
0659: .setPassParam("data", writer.getBuffer()
0660: .toString());
0661: }
0662: }
0663:
0664: return updatedRecords;
0665: }
0666:
0667: protected String serializeData(XMLElement data) throws IOException {
0668: StringWriter writer = new StringWriter();
0669: XMLWriter xmlWriter = new XMLWriter(writer);
0670: xmlWriter.write(data, true, 2);
0671: return writer.getBuffer().toString();
0672: }
0673:
0674: /**
0675: * Escape the ' character in SQL values
0676: * @return the escaped string
0677: */
0678: private String sqlEscape(String s) {
0679: if (s == null)
0680: return null;
0681:
0682: return s.replaceAll("'", "''");
0683: }
0684:
0685: /**
0686: * Record the max id from the named table
0687: * @param tableName the table to monitor
0688: */
0689: private synchronized void recordMaxId(String tableName) {
0690: if (!tableName.equals("XSYSSERVERTIMESTAMPS")) {
0691: long maxPseudoId = getMaxPseudoId(tableName);
0692: if (maxPseudoId >= 0)
0693: dataConnection
0694: .doUpdate("UPDATE XSYSLOCALTIMESTAMPS SET maxPseudoID="
0695: + maxPseudoId
0696: + " WHERE tableName='"
0697: + tableName + "'");
0698: }
0699: }
0700:
0701: /**
0702: * Get the max PSUEDOID for the names table
0703: * @param tableName the table name whose ID is to be inspected
0704: * @return the next ID or a negative value if invalid
0705: */
0706: private synchronized long getMaxPseudoId(String tableName) {
0707: long tableMax = 0;
0708: try {
0709: dataConnection.borrowConnection();
0710: ResultSet rsMax = dataConnection
0711: .doQuery("SELECT MAX(PSEUDOID) FROM " + tableName);
0712: rsMax.next();
0713: tableMax = rsMax.getLong(1);
0714: rsMax.close();
0715: } catch (Exception ex) {
0716: DebugLogger.logError("Unable to getMaxPseudoId for table "
0717: + tableName);
0718: } finally {
0719: dataConnection.closeQuery();
0720: }
0721:
0722: long id = -1;
0723: try {
0724: ResultSet rsMax = dataConnection
0725: .doQuery("SELECT MAXPSEUDOID FROM XSYSLOCALTIMESTAMPS WHERE TABLENAME='"
0726: + tableName + "'");
0727: rsMax.next();
0728: id = rsMax.getLong(1);
0729: rsMax.close();
0730: } catch (Exception ex) {
0731: DebugLogger.logError("Unable to getMaxPseudoId for table "
0732: + tableName);
0733: } finally {
0734: dataConnection.closeQuery();
0735: }
0736: id = Math.max(tableMax, id);
0737: return id;
0738: }
0739:
0740: /**
0741: * Get the next PSUEDOID for the names table
0742: * @param tableName the table name whose ID is to be inspected
0743: * @return the next ID or a negative value if invalid
0744: */
0745: public long getNextPseudoId(String tableName) {
0746: long id = getMaxPseudoId(tableName);
0747: if (id >= 0)
0748: return id + 1L;
0749:
0750: return -1L;
0751: }
0752:
0753: /**
0754: * Get the next ID for the named table, assuming one of the table's fields is
0755: * a managed field.
0756: * @param tableName the table name whose ID is to be inspected
0757: * @return the next ID or a negative value if invalid
0758: */
0759: public long getNextId(String tableName) {
0760: if (remoteService == null)
0761: return -1L;
0762:
0763: try {
0764: ServiceContext context = new ServiceContext();
0765: ServiceProxyArgs args = context.getArgs();
0766: args.setPassParam("table", tableName);
0767:
0768: // Serialize the update records.
0769: XMLElement data = new XMLElement("getNextId");
0770: Object status = remoteService.call("getNextId", context);
0771: if (status == ServiceProxy.COMPLETE)
0772: return Long.parseLong(context.getReturnValue()
0773: .toString());
0774: } catch (Exception ex) {
0775: ex.printStackTrace();
0776: }
0777:
0778: return -1L;
0779: }
0780:
0781: /**
0782: * Get a tables local timestamp. This timestamp represents the server time
0783: * when the server's data was last updated
0784: * @param tableName
0785: * @param sysTable the name of the system table to access
0786: * @return the timestamp
0787: */
0788: private java.sql.Timestamp getTimestamp(String tableName,
0789: String sysTable) {
0790: // Remove the local data
0791: boolean hasTimestampEntry = false;
0792:
0793: ResultSet rs = null;
0794: try {
0795: rs = dataConnection.doQuery("SELECT lastUpdate FROM "
0796: + sysTable + " WHERE tableName='"
0797: + tableName.toUpperCase() + "'");
0798: if (rs != null) {
0799: try {
0800: if (rs.next()) {
0801: hasTimestampEntry = true;
0802: return rs.getTimestamp(1);
0803: }
0804: } catch (SQLException ex) {
0805: if (BuildProperties.DEBUG)
0806: DebugLogger.trace("No timestamp available for "
0807: + tableName.toUpperCase() + " in "
0808: + sysTable);
0809: }
0810: }
0811: } finally {
0812: dataConnection.closeQuery();
0813: }
0814:
0815: // Setup the timestamp table
0816: if (!hasTimeStamps) {
0817: java.sql.Timestamp createDate = new java.sql.Timestamp(0);
0818: String createTime = createDate.toString();
0819:
0820: createTimestampTables();
0821:
0822: insertCreationRecords(sysTable, createTime);
0823:
0824: // Force download of the server times.
0825: checkLocalDatabase("XSYSSERVERTIMESTAMPS",
0826: DatabaseTable.PARANOID_UPDATE);
0827: return createDate;
0828: }
0829:
0830: java.sql.Timestamp date = new java.sql.Timestamp(0);
0831: if (!tableName.equals(sysTable)) {
0832: // Insert the new record.
0833: String sql = "";
0834: if (sysTable.equals("XSYSSERVERTIMESTAMPS"))
0835: sql = "INSERT INTO " + sysTable
0836: + " (TABLENAME,LASTUPDATE) VALUES(?,?)";
0837: else
0838: sql = "INSERT INTO "
0839: + sysTable
0840: + " (TABLENAME,LASTUPDATE,MAXPSEUDOID) VALUES(?,?,0)";
0841:
0842: PreparedStatement ps = null;
0843: try {
0844: // ps = createPreparedStatement( sql, "default" );
0845: ps = dataConnection.createPreparedStatement(sql,
0846: "default");
0847: ps.setString(1, tableName.toUpperCase());
0848: ps.setTimestamp(2, date);
0849: ps.executeUpdate();
0850: } catch (SQLException ex) {
0851: ex.printStackTrace();
0852: } finally {
0853: if (ps != null)
0854: dataConnection.closePreparedStatement(ps);
0855: }
0856: }
0857: return date;
0858: }
0859:
0860: /**
0861: * Creates the timestamp tables.
0862: */
0863: protected abstract void createTimestampTables();
0864:
0865: /**
0866: * Set a tables local timestamp. This timestamp represents the server time
0867: * when the server's data was last updated
0868: * @param tableName
0869: * @param sysTable the name of the system table to access
0870: * @return the timestamp
0871: */
0872: private void setTimestamp(String tableName, Timestamp timeStamp,
0873: String sysTable) {
0874: try {
0875: String sql = "UPDATE " + sysTable
0876: + " SET lastUpdate=? WHERE tableName=?";
0877: PreparedStatement ps = dataConnection
0878: .createPreparedStatement(sql, "default");
0879: ps.setTimestamp(1, timeStamp);
0880: ps.setString(2, tableName.toUpperCase());
0881: int rc = ps.executeUpdate();
0882:
0883: if (rc == 0) {
0884: if (sysTable.equals("XSYSSERVERTIMESTAMPS"))
0885: sql = "INSERT INTO " + sysTable
0886: + " (TABLENAME,LASTUPDATE) VALUES(?,?)";
0887: else
0888: sql = "INSERT INTO "
0889: + sysTable
0890: + " (TABLENAME,LASTUPDATE,MAXPSEUDOID) VALUES(?,?,0)";
0891:
0892: ps = dataConnection.createPreparedStatement(sql,
0893: "default");
0894: ps.setString(1, tableName.toUpperCase());
0895: ps.setTimestamp(2, timeStamp);
0896: ps.executeUpdate();
0897: }
0898: dataConnection.closePreparedStatement(ps);
0899: } catch (SQLException ex) {
0900: ex.printStackTrace();
0901: }
0902: }
0903:
0904: /**
0905: * Get the server timestamp for a particular table.
0906: * @return
0907: */
0908: private Timestamp getLocalTimestamp(String tableName) {
0909: return getTimestamp(tableName, "XSYSLOCALTIMESTAMPS");
0910: }
0911:
0912: /**
0913: * Set the server timestamp for a particular table.
0914: */
0915: private void setLocalTimestamp(String tableName, Timestamp ts) {
0916: setTimestamp(tableName, ts, "XSYSLOCALTIMESTAMPS");
0917: // setServerTimestamp( tableName, ts );
0918: }
0919:
0920: /**
0921: * Get the server timestamp for a particular table.
0922: * @return
0923: */
0924: private Timestamp getServerTimestamp(String tableName) {
0925: return getTimestamp(tableName, "XSYSSERVERTIMESTAMPS");
0926: }
0927:
0928: /**
0929: * Set the server timestamp for a particular table.
0930: */
0931: private void setServerTimestamp(String tableName, Timestamp ts) {
0932: setTimestamp(tableName, ts, "XSYSSERVERTIMESTAMPS");
0933: }
0934:
0935: /**
0936: * Delete the contents of a table
0937: * @param tableName the table to clear
0938: * @throws SQLException
0939: */
0940: private void clearLocalData(String tableName) {
0941: // Remove the local data
0942: dataConnection.doUpdate("DELETE FROM " + tableName);
0943: dataConnection
0944: .doUpdate("DELETE FROM xSysLocalDeletions WHERE tableName='"
0945: + tableName + "'");
0946: }
0947:
0948: /**
0949: * Append or replace data in the local table
0950: * @param tableName the name of the local table
0951: * @param rsRemote the data from the remote table
0952: * @param maxPseudoId the max ID in the local table
0953: * @return the update count
0954: * @throws SQLException
0955: */
0956: private long appendLocalData(String tableName, XmlElement rsRemote,
0957: long maxPseudoId) throws SQLException {
0958: long updateCount = 0L;
0959:
0960: // The XML data does not contain the proper field names so they need to be
0961: // retrieved from the local database
0962: ConnectionObject connObj = dataConnection.getConnectionObject();
0963: Statement stmt = connObj.createStatement(
0964: ResultSet.TYPE_SCROLL_INSENSITIVE,
0965: ResultSet.CONCUR_UPDATABLE);
0966: //ResultSet rs = ps.executeQuery( "SELECT TOP 1 * FROM " + tableName ); // TOP is notANSI SQL
0967: if (BuildProperties.DEBUG)
0968: DebugLogger.trace("SELECT * FROM " + tableName);
0969:
0970: ResultSet rs = stmt.executeQuery("SELECT * FROM " + tableName);
0971: ResultSetMetaData rsmd = rs.getMetaData();
0972: int numCols = rsmd.getColumnCount();
0973:
0974: // Store the column types
0975: int[] columnTypes = new int[numCols];
0976: String[] columnNames = new String[numCols];
0977: for (int i = 1; i <= columnTypes.length; i++) {
0978: columnTypes[i - 1] = rsmd.getColumnType(i);
0979: columnNames[i - 1] = rsmd.getColumnName(i);
0980: }
0981: connObj.closeStatement();
0982:
0983: int idColIdx = numCols;
0984: String idColName = null;
0985:
0986: // Create the PreparedStatement
0987: String updateSql, insertSql;
0988: insertSql = "INSERT INTO " + tableName + "(";
0989: updateSql = "UPDATE " + tableName + " SET ";
0990:
0991: String values = "";
0992: String updateValues = "";
0993: String fields = "";
0994: int idCol = 0;
0995: for (int colIdx = 1; colIdx <= numCols; colIdx++) {
0996: String fieldName = columnNames[colIdx - 1];
0997:
0998: if (values.length() > 0) {
0999: values += ",";
1000: fields += ",";
1001: if (!fieldName.equalsIgnoreCase("pseudoid"))
1002: updateValues += ",";
1003: else {
1004: idCol = colIdx;
1005: idColName = fieldAttrName[idCol];
1006: }
1007: }
1008:
1009: values += "?";
1010: if (!fieldName.equalsIgnoreCase("pseudoid"))
1011: updateValues += fieldName + "=?";
1012: fields += fieldName;
1013: }
1014:
1015: updateSql += updateValues + " WHERE PseudoId=?";
1016: insertSql += fields + ") VALUES(" + values + ")";
1017:
1018: // Save the data locally
1019: Vector rows = rsRemote.getChildren();
1020: int numRows = rows.size();
1021: boolean updatesReceived = false;
1022: for (int i = 0; i < numRows; i++) {
1023: XmlElement row = (XmlElement) rows.elementAt(i);
1024:
1025: // If the record already exists in the table an update is needed instead of
1026: // an insert
1027: boolean bUpdate = false;
1028: String rowId = row.getAttribute(idColName);
1029: PreparedStatement ps = null;
1030: if ((maxPseudoId == 0)
1031: || (maxPseudoId < new Long(rowId).longValue())) {
1032: bUpdate = false;
1033: ps = dataConnection.createPreparedStatement(insertSql,
1034: "default", true);
1035: } else {
1036: bUpdate = true;
1037: ps = dataConnection.createPreparedStatement(updateSql,
1038: "default", true);
1039: }
1040:
1041: int paramIdx = 1;
1042: for (int colIdx = 1; colIdx <= numCols; colIdx++) {
1043: String fieldValue = row
1044: .getAttribute(fieldAttrName[colIdx]);
1045: int fieldType = columnTypes[colIdx - 1];//rsmd.getColumnType( colIdx );
1046:
1047: // The colIdx has already been incremented, so it's 1 after rge actual index
1048: if (bUpdate && (colIdx == idCol))
1049: continue;
1050:
1051: if ("null".equals(fieldValue))
1052: ps.setNull(paramIdx, fieldType);
1053: else {
1054: switch (fieldType) {
1055: case java.sql.Types.BIGINT:
1056: ps
1057: .setLong(paramIdx, Long
1058: .parseLong(fieldValue));
1059: break;
1060: case java.sql.Types.SMALLINT:
1061: case java.sql.Types.INTEGER:
1062: ps.setInt(paramIdx, Integer
1063: .parseInt(fieldValue));
1064: break;
1065: case java.sql.Types.DOUBLE:
1066: ps.setDouble(paramIdx, Double
1067: .parseDouble(fieldValue));
1068: break;
1069: case java.sql.Types.NUMERIC:
1070: case java.sql.Types.REAL:
1071: case java.sql.Types.DECIMAL:
1072: case java.sql.Types.FLOAT:
1073: ps.setFloat(paramIdx, Float
1074: .parseFloat(fieldValue));
1075: break;
1076: case java.sql.Types.TIME:
1077: ps.setTime(paramIdx, new Time(Long
1078: .parseLong(fieldValue)));
1079: break;
1080: case java.sql.Types.DATE:
1081: ps.setDate(paramIdx, new java.sql.Date(Long
1082: .parseLong(fieldValue)));
1083: break;
1084: case java.sql.Types.TIMESTAMP:
1085: ps.setTimestamp(paramIdx, new Timestamp(Long
1086: .parseLong(fieldValue)));
1087: break;
1088: default:
1089: ps.setString(paramIdx, fieldValue.trim()); // SQL Server problem with ANSI_PADDING. Shouldn't need to trima
1090: break;
1091: }
1092: }
1093: paramIdx++;
1094: }
1095:
1096: if (bUpdate) {
1097: ps.setLong(numCols, Long.parseLong(rowId));
1098: if (BuildProperties.DEBUG)
1099: DebugLogger.trace(updateSql);
1100: }
1101:
1102: ps.executeUpdate();
1103: dataConnection.closePreparedStatement(ps);
1104: replicationStatementCount++;
1105: updateCount++;
1106: updatesReceived = true;
1107: }
1108: rs.close();
1109: connObj.closeStatement();
1110:
1111: if (updatesReceived)
1112: recordMaxId(tableName.toUpperCase());
1113:
1114: return updateCount;
1115: }
1116:
1117: private void insertCreationRecords(String sysTable,
1118: String createTime) {
1119: String[] tableNames = { "XSYSLOCALTIMESTAMPS",
1120: "XSYSSERVERTIMESTAMPS" };
1121: for (int i = 0; i < tableNames.length; i++) {
1122: boolean hasTimestampEntry = false;
1123: ResultSet rs = null;
1124: try {
1125: rs = dataConnection.doQuery("SELECT tableName FROM "
1126: + sysTable + " WHERE tableName='"
1127: + tableNames[i] + "'");
1128: if (rs != null) {
1129: try {
1130: if (rs.next()) {
1131: hasTimestampEntry = true;
1132: }
1133: } catch (SQLException ex) {
1134: }
1135: }
1136: } finally {
1137: dataConnection.closeQuery();
1138: }
1139:
1140: if (!hasTimestampEntry)
1141: dataConnection.doUpdate("INSERT INTO " + sysTable
1142: + " (tableName,lastUpdate) VALUES('"
1143: + tableNames[i] + "','" + createTime + "')");
1144: }
1145: }
1146:
1147: public static int getReplicationStatementCount() {
1148: return replicationStatementCount;
1149: }
1150:
1151: public static void resetReplicationStatementCount() {
1152: replicationStatementCount = 0;
1153: }
1154:
1155: /**
1156: * Indicates whether the specified table should be updated
1157: * @param tableName the name of the table to be queried
1158: * @return true if the table requires an udpate, false otherwise
1159: */
1160: protected boolean requiresUpdate(String tableName) {
1161: return (!tableName.startsWith("TEMP_") && !tableName
1162: .startsWith("XSYSLOCAL"));
1163: }
1164:
1165: /**
1166: * Indicates whether the specified table is a system table
1167: * @param table the name of the table to be queried
1168: * @return true if the table is a system table, false otherwise
1169: */
1170: protected boolean isSysTable(String tableName) {
1171: return (tableName.startsWith("XSYS"));
1172: }
1173:
1174: }
|