0001: /* ====================================================================
0002: * The LateralNZ Software License, Version 1.0
0003: *
0004: * Copyright (c) 2003 LateralNZ. All rights reserved.
0005: *
0006: * Redistribution and use in source and binary forms, with or without
0007: * modification, are permitted provided that the following conditions
0008: * are met:
0009: *
0010: * 1. Redistributions of source code must retain the above copyright
0011: * notice, this list of conditions and the following disclaimer.
0012: *
0013: * 2. Redistributions in binary form must reproduce the above copyright
0014: * notice, this list of conditions and the following disclaimer in
0015: * the documentation and/or other materials provided with the
0016: * distribution.
0017: *
0018: * 3. The end-user documentation included with the redistribution,
0019: * if any, must include the following acknowledgment:
0020: * "This product includes software developed by
0021: * LateralNZ (http://www.lateralnz.org/) and other third parties."
0022: * Alternately, this acknowledgment may appear in the software itself,
0023: * if and wherever such third-party acknowledgments normally appear.
0024: *
0025: * 4. The names "LateralNZ" must not be used to endorse or promote
0026: * products derived from this software without prior written
0027: * permission. For written permission, please
0028: * contact oss@lateralnz.org.
0029: *
0030: * 5. Products derived from this software may not be called "Panther",
0031: * or "Lateral" or "LateralNZ", nor may "PANTHER" or "LATERAL" or
0032: * "LATERALNZ" appear in their name, without prior written
0033: * permission of LateralNZ.
0034: *
0035: * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
0036: * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
0037: * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
0038: * DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR
0039: * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
0040: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
0041: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
0042: * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
0043: * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
0044: * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
0045: * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
0046: * SUCH DAMAGE.
0047: * ====================================================================
0048: *
0049: * This software consists of voluntary contributions made by many
0050: * individuals on behalf of LateralNZ. For more
0051: * information on Lateral, please see http://www.lateralnz.com/ or
0052: * http://www.lateralnz.org
0053: *
0054: */
0055: package org.lateralnz.c3d;
0056:
0057: import java.io.IOException;
0058: import java.sql.CallableStatement;
0059: import java.sql.Connection;
0060: import java.sql.DatabaseMetaData;
0061: import java.sql.DriverManager;
0062: import java.sql.PreparedStatement;
0063: import java.sql.ResultSet;
0064: import java.sql.ResultSetMetaData;
0065: import java.sql.SQLException;
0066: import java.sql.Statement;
0067: import java.util.HashMap;
0068: import java.util.Iterator;
0069: import java.util.ArrayList;
0070: import java.util.List;
0071: import java.util.Properties;
0072: import java.util.StringTokenizer;
0073: import java.util.Timer;
0074: import java.util.TimerTask;
0075: import javax.naming.NamingException;
0076:
0077: import org.apache.log4j.Logger;
0078:
0079: import org.lateralnz.common.util.Constants;
0080: import org.lateralnz.common.util.DAOUtils;
0081: import org.lateralnz.common.util.JNDIUtils;
0082: import org.lateralnz.common.util.StringUtils;
0083:
0084: import org.lateralnz.messaging.Message;
0085: import org.lateralnz.messaging.MessageListener;
0086: import org.lateralnz.messaging.MessageHandler;
0087:
0088: import org.lateralnz.c3d.util.CacheOperation;
0089: import org.lateralnz.c3d.util.Column;
0090: import org.lateralnz.c3d.util.DBUtils;
0091: import org.lateralnz.c3d.util.ResultWrapper;
0092:
0093: /**
0094: * core 'database' functionality -- basically this is the interceptor for all SQL
0095: * statements so we can cache resultsets, refresh the cache and so on.
0096: */
0097: public class DatabaseEngine implements Constants, MessageListener {
0098: private static final Logger log = Logger
0099: .getLogger(DatabaseEngine.class.getName());
0100:
0101: private static final Class HASHMAP_CLASS = HashMap.class;
0102: private static final Class ARRAYLIST_CLASS = ArrayList.class;
0103:
0104: private static final String CACHE_BEGIN_TAG = "cache(";
0105: private static final String CACHE_END_TAG = ")";
0106: private static final String CACHES = "caches";
0107: private static final String DATABASE_CACHE = "database_cache_";
0108: private static final String SELECT_FROM = "select * from ";
0109: private static final String WHERE = " where ";
0110:
0111: private static final String RESULTS_TO_CACHE_LINK = "RESULTS_TO_CACHE_LINK";
0112:
0113: private static HashMap dbengines = new HashMap();
0114:
0115: private static MessageHandler mh = null;
0116:
0117: private String dbname; // the name of this database cache/bridge
0118: private String messageGroup;
0119: private Properties props; // properties of this databases
0120:
0121: private Timer timer = new Timer();
0122:
0123: private HashMap users = new HashMap();
0124: private HashMap datacache = new HashMap();
0125: private HashMap caches = new HashMap();
0126: private HashMap tableCheck = new HashMap(); // map of primary tables
0127: private HashMap updateCheck = new HashMap(); // map of all monitored tables
0128: private HashMap operationsAwaitingCommit = new HashMap();
0129:
0130: class CacheProperties {
0131: boolean resetOnInsert = false;
0132: String primaryTable;
0133: ArrayList primaryKeys = new ArrayList();
0134: ArrayList deleteCheck = new ArrayList();
0135: long timeout = Long.MIN_VALUE;
0136:
0137: public String toString() {
0138: return "resetinsert=" + resetOnInsert + ",primarykeys="
0139: + primaryKeys.toString();
0140: }
0141: }
0142:
0143: class CacheCleaner extends TimerTask {
0144: private String cacheName;
0145: private DatabaseEngine dbengine;
0146: private long last = 0;
0147: private long period;
0148:
0149: CacheCleaner(DatabaseEngine dbengine, String cacheName,
0150: long period) {
0151: super ();
0152: this .dbengine = dbengine;
0153: this .cacheName = cacheName;
0154: this .period = period;
0155: }
0156:
0157: public boolean cancel() {
0158: return false;
0159: }
0160:
0161: public void run() {
0162: last = System.currentTimeMillis();
0163: try {
0164: List l = dbengine.getCachedResultsetNames(cacheName);
0165: Iterator iter = l.iterator();
0166: while (iter.hasNext()) {
0167: String sql = (String) iter.next();
0168: DCResultSet dcrs = (DCResultSet) dbengine
0169: .getCachedResultSet(cacheName, sql);
0170: if (last - dcrs.lastAccessed > period) {
0171: if (log.isDebugEnabled()) {
0172: log.debug("clearing timed out data ("
0173: + cacheName + ") " + sql);
0174: }
0175: dbengine.clearResultsByKey(cacheName, sql);
0176: }
0177: }
0178: } catch (Exception e) {
0179: e.printStackTrace();
0180: log.error(e);
0181: }
0182: }
0183:
0184: public long scheduledExecutionTime() {
0185: return last;
0186: }
0187: }
0188:
0189: private DatabaseEngine(String dbname, Properties props)
0190: throws SQLException {
0191: this .dbname = dbname;
0192: this .messageGroup = DATABASE_CACHE + dbname;
0193: this .props = props;
0194: String tmp = getProperty(CACHES) + COMMA;
0195: StringTokenizer st = new StringTokenizer(tmp, COMMA);
0196: while (st.hasMoreTokens()) {
0197: // cache name
0198: String cache = st.nextToken();
0199: CacheProperties cprops = new CacheProperties();
0200:
0201: // do we reset this cache when an insert statement is executed?
0202: cprops.resetOnInsert = Boolean.valueOf(
0203: getProperty(cache + ".reset_on_insert"))
0204: .booleanValue();
0205: if (log.isInfoEnabled()) {
0206: log.info(cache + " reset on insert is "
0207: + cprops.resetOnInsert);
0208: }
0209: cprops.primaryTable = getProperty(cache + ".primary_table");
0210:
0211: // get the meta data for the primary table
0212: // and add the primary keys into our watch list
0213: Connection conn = DriverManager.getConnection(
0214: getProperty("dburl"), getProperty("dbuser"),
0215: getProperty("dbpassword"));
0216: DatabaseMetaData dbmd = conn.getMetaData();
0217: ResultSet rs = dbmd.getPrimaryKeys(EMPTY, EMPTY,
0218: cprops.primaryTable);
0219: while (rs.next()) {
0220: cprops.primaryKeys.add(rs.getString(4));
0221: }
0222:
0223: ArrayList al = (ArrayList) DBUtils.getObjectFromMap(
0224: cprops.primaryTable, updateCheck, ARRAYLIST_CLASS);
0225: al.add(cache);
0226: al = (ArrayList) DBUtils.getObjectFromMap(
0227: cprops.primaryTable, tableCheck, ARRAYLIST_CLASS);
0228: al.add(cache);
0229:
0230: StringUtils.toList(getProperty(cache + ".delete_check"),
0231: COMMA, cprops.deleteCheck);
0232: Iterator iter = cprops.deleteCheck.iterator();
0233: while (iter.hasNext()) {
0234: String tab = (String) iter.next();
0235: al = (ArrayList) DBUtils.getObjectFromMap(tab,
0236: tableCheck, ARRAYLIST_CLASS);
0237: al.add(cache);
0238: }
0239:
0240: if (log.isInfoEnabled()) {
0241: log.info("delete check for " + cache + " is "
0242: + cprops.deleteCheck);
0243: }
0244:
0245: String timeout = getProperty(cache + ".timeout");
0246: if (!StringUtils.isEmpty(timeout)) {
0247: // schedule regular cache cleanouts
0248: cprops.timeout = 60000L * 60L * Integer
0249: .parseInt(timeout);
0250:
0251: timer
0252: .schedule(new CacheCleaner(this , cache,
0253: cprops.timeout), cprops.timeout,
0254: cprops.timeout);
0255: }
0256:
0257: caches.put(cache, cprops);
0258: if (log.isInfoEnabled()) {
0259: log.info("db(" + dbname + "), cache(" + cache
0260: + ") initialised with properties: "
0261: + cprops.toString());
0262: }
0263: }
0264:
0265: // user access to the database
0266: String usersstr = props.getProperty("users");
0267: StringUtils.toMap(users, usersstr, AMPERSAND);
0268:
0269: // messaging for cache changes
0270: try {
0271: String msgservice = getProperty("messaging_service_name");
0272: if (!StringUtils.isEmpty(msgservice)) {
0273: mh = (MessageHandler) JNDIUtils.get(msgservice);
0274: mh.addListener(messageGroup, this );
0275: }
0276: } catch (NamingException ne) {
0277: throw new SQLException("unable to get message service");
0278: }
0279: }
0280:
0281: public static final DatabaseEngine getInstance(String dbname,
0282: Properties props) throws SQLException {
0283: if (!dbengines.containsKey(dbname)) {
0284: synchronized (dbengines) {
0285: if (!dbengines.containsKey(dbname)) {
0286: if (props == null) {
0287: throw new SQLException("no such database "
0288: + dbname);
0289: }
0290: DatabaseEngine dbengine = new DatabaseEngine(
0291: dbname, props);
0292: dbengines.put(dbname, dbengine);
0293: }
0294: }
0295: }
0296:
0297: return (DatabaseEngine) dbengines.get(dbname);
0298: }
0299:
0300: private final void clearAll() {
0301: if (log.isDebugEnabled()) {
0302: log.debug("clearing all cached data");
0303: }
0304: synchronized (datacache) {
0305: datacache.clear();
0306: }
0307:
0308: Runtime.getRuntime().gc();
0309: }
0310:
0311: private final void clearCache(String cacheName) {
0312: if (log.isDebugEnabled()) {
0313: log.debug("clearing data for " + cacheName);
0314: }
0315:
0316: HashMap hm = (HashMap) datacache.get(cacheName);
0317:
0318: if (hm != null) {
0319: synchronized (hm) {
0320: hm.clear();
0321: }
0322: }
0323: }
0324:
0325: private final void clearResultsByKey(String cache, String key)
0326: throws SQLException {
0327: HashMap hm = (HashMap) DBUtils.getObjectFromMap(cache,
0328: datacache, HASHMAP_CLASS);
0329: HashMap rtc = (HashMap) DBUtils.getObjectFromMap(
0330: RESULTS_TO_CACHE_LINK, hm, HASHMAP_CLASS);
0331:
0332: ArrayList ll = (ArrayList) rtc.get(key);
0333: if (ll != null) {
0334: synchronized (ll) {
0335: Iterator iter = ll.iterator();
0336: while (iter.hasNext()) {
0337: String sql = (String) iter.next();
0338: iter.remove();
0339:
0340: if (hm.remove(sql) != null) {
0341: if (log.isDebugEnabled()) {
0342: log.debug("remove data for cache " + cache
0343: + "/key " + sql + " succeeded");
0344: }
0345: }
0346: }
0347: }
0348: }
0349: rtc.remove(key);
0350: }
0351:
0352: protected void commit(DCConnection conn) throws SQLException {
0353: ArrayList ll = (ArrayList) operationsAwaitingCommit.get(conn);
0354: if (log.isDebugEnabled()) {
0355: log.debug("committing " + conn + " with ops "
0356: + (ll != null ? Integer.toString(ll.size()) : "0"));
0357: }
0358: operationsAwaitingCommit.remove(conn);
0359:
0360: if (ll != null && ll.size() > 0) {
0361: triggerCacheChanges(ll);
0362:
0363: if (mh != null) {
0364: try {
0365: mh.send(new Message(1, messageGroup, ll));
0366: } catch (IOException ioe) {
0367: log.error("commit error: " + ioe.getMessage(), ioe);
0368: }
0369: }
0370: }
0371:
0372: if (log.isDebugEnabled()) {
0373: log.debug(Integer.toString(operationsAwaitingCommit.size())
0374: + " operations (after commit)");
0375: }
0376: }
0377:
0378: protected ResultWrapper execute(DCStatement st) throws SQLException {
0379: String sql = st.getSQL();
0380: List l = DBUtils.splitSQL(sql);
0381: if (DBUtils.countSelects(l) > 1) {
0382: throw new SQLException(
0383: "multiple resultsets are not supported");
0384: }
0385:
0386: ResultWrapper rw = new ResultWrapper();
0387: Iterator iter = l.iterator();
0388: while (iter.hasNext()) {
0389: sql = (String) iter.next();
0390:
0391: if (DBUtils.isStatementType(sql, DBUtils.SELECT_STATEMENT)) {
0392: processSelect(st, sql, rw);
0393: } else if (DBUtils.isStatementType(sql,
0394: DBUtils.INSERT_STATEMENT)) {
0395: processInsert(st, sql, rw);
0396: } else if (DBUtils.isStatementType(sql,
0397: DBUtils.UPDATE_STATEMENT)) {
0398: processUpdateOrDelete(true, st, sql, rw);
0399: } else if (DBUtils.isStatementType(sql,
0400: DBUtils.DELETE_STATEMENT)) {
0401: processUpdateOrDelete(false, st, sql, rw);
0402: } else if (DBUtils.isStatementType(sql,
0403: DBUtils.CLEAR_CACHE_STATEMENT)) {
0404: processClearCache(st, sql, rw);
0405: } else if (DBUtils.isStatementType(sql,
0406: DBUtils.QUERY_CACHE_STATEMENT)) {
0407: processQueryCache(st, sql, rw);
0408: } else if (DBUtils.isStatementType(sql,
0409: DBUtils.QUERY_STATS_STATEMENT)) {
0410: processQueryStats(st, sql, rw);
0411: } else {
0412: if (log.isDebugEnabled()) {
0413: log.debug("passing thru unrecognised statement: "
0414: + sql);
0415: }
0416:
0417: boolean results = false;
0418: switch (st.statementType) {
0419: case DCStatement.STATEMENT:
0420: results = st.getRealStatement().execute(sql);
0421: break;
0422: case DCStatement.PREPARED:
0423: results = ((PreparedStatement) st
0424: .getRealStatement()).execute();
0425: break;
0426: case DCStatement.CALLABLE:
0427: results = ((CallableStatement) st
0428: .getRealStatement()).execute();
0429: }
0430: if (results) {
0431: rw.rs = st.getRealStatement().getResultSet();
0432: } else {
0433: rw.updateCount = st.getRealStatement()
0434: .getUpdateCount();
0435: }
0436: }
0437: }
0438:
0439: if (st.conn.getAutoCommit()) {
0440: st.conn.commit();
0441: }
0442:
0443: return rw;
0444: }
0445:
0446: public List getCacheNames() {
0447: ArrayList ll = new ArrayList();
0448: ll.addAll(caches.keySet());
0449: return ll;
0450: }
0451:
0452: public List getCachedResultsetNames(String cacheName)
0453: throws SQLException {
0454: ArrayList ll = new ArrayList();
0455: HashMap hm = (HashMap) DBUtils.getObjectFromMap(cacheName,
0456: datacache, HASHMAP_CLASS);
0457: if (hm != null) {
0458: ll.addAll(hm.keySet());
0459: }
0460:
0461: return ll;
0462: }
0463:
0464: /**
0465: * return the column data based on an array of primary key columns.
0466: * For example, if the primary keys are in columns 1, 5 and 6 of the resultset,
0467: * this will return the data in those columns as a 3 element array.
0468: */
0469: protected final String[] getKeyColumnData(int[] keyColumns,
0470: ResultSet rs) throws SQLException {
0471: String[] keyData = new String[keyColumns.length];
0472: for (int i = 0; i < keyColumns.length; i++) {
0473: keyData[i] = rs.getString(keyColumns[i]);
0474: }
0475: return keyData;
0476: }
0477:
0478: private final ArrayList getOperations(DCConnection conn) {
0479: ArrayList ll = (ArrayList) operationsAwaitingCommit.get(conn);
0480: if (ll == null) {
0481: synchronized (operationsAwaitingCommit) {
0482: ll = (ArrayList) operationsAwaitingCommit.get(conn);
0483: if (ll == null) {
0484: ll = new ArrayList();
0485: operationsAwaitingCommit.put(conn, ll);
0486: }
0487: }
0488: }
0489: return ll;
0490: }
0491:
0492: protected int[] getPrimaryKeyColumns(String cacheName,
0493: ResultSetMetaData meta) throws SQLException {
0494: CacheProperties cprops = (CacheProperties) caches
0495: .get(cacheName);
0496: if (cprops == null) {
0497: throw new SQLException(cacheName + " is not a valid cache");
0498: }
0499: int[] rtn = new int[cprops.primaryKeys.size()];
0500: int j = 0;
0501: for (int i = 1; i < meta.getColumnCount(); i++) {
0502: if (cprops.primaryKeys.contains(meta.getColumnName(i))) {
0503: rtn[j] = i;
0504: j++;
0505: }
0506: if (j >= rtn.length) {
0507: break;
0508: }
0509: }
0510: if (j != rtn.length) {
0511: log
0512: .warn("warning: unable to match returned data with key columns for cache "
0513: + cacheName);
0514: return null;
0515: } else {
0516: return rtn;
0517: }
0518: }
0519:
0520: public ResultSet getCachedResultSet(String cacheName, String sql)
0521: throws SQLException {
0522: HashMap hm = (HashMap) DBUtils.getObjectFromMap(cacheName,
0523: datacache, HASHMAP_CLASS);
0524:
0525: if (hm != null && hm.containsKey(sql)) {
0526: DCResultSet dcrs = (DCResultSet) hm.get(sql);
0527: dcrs.lastAccessed = System.currentTimeMillis();
0528: return dcrs;
0529: }
0530:
0531: return null;
0532: }
0533:
0534: protected String getProperty(String name) {
0535: return props.getProperty(name);
0536: }
0537:
0538: public void handle(Message msg) {
0539: ArrayList ll = (ArrayList) msg.getValue();
0540: try {
0541: triggerCacheChanges(ll);
0542: } catch (SQLException se) {
0543: log.error("sqlerror: " + se.getMessage(), se);
0544: }
0545: }
0546:
0547: private final void processClearCache(DCStatement st, String sql,
0548: ResultWrapper rw) {
0549: String cacheName = DBUtils.getTargetName(sql,
0550: DBUtils.CLEAR_CACHE_STATEMENT);
0551:
0552: ArrayList operations = getOperations(st.conn);
0553: if (StringUtils.isEmpty(cacheName)) {
0554: operations.add(new CacheOperation(
0555: CacheOperation.ALL_CHANGE, null, null));
0556: } else {
0557: operations.add(new CacheOperation(
0558: CacheOperation.CACHE_CHANGE, cacheName, null));
0559: }
0560:
0561: rw.updateCount = 1;
0562:
0563: }
0564:
0565: /**
0566: * query the contents of the cache
0567: */
0568: private final void processQueryCache(DCStatement st, String sql,
0569: ResultWrapper rw) throws SQLException {
0570: String cacheName = DBUtils.getTargetName(sql,
0571: DBUtils.QUERY_CACHE_STATEMENT);
0572:
0573: // if no cachename specified just dump the list of caches
0574: if (StringUtils.isEmpty(cacheName)) {
0575:
0576: String[] cols = new String[] { "cache_name",
0577: "reset_on_insert", "timeout" };
0578: ArrayList al = new ArrayList();
0579: Iterator iter = caches.keySet().iterator();
0580: int i = 0;
0581:
0582: while (iter.hasNext()) {
0583: String cache = (String) iter.next();
0584: CacheProperties cprops = (CacheProperties) caches
0585: .get(cache);
0586:
0587: al
0588: .add(new Column[] {
0589: DBUtils.createColumn(cache),
0590: DBUtils
0591: .createColumn((cprops.resetOnInsert ? TRUE
0592: : FALSE)),
0593: DBUtils
0594: .createColumn((cprops.timeout > 0 ? Long
0595: .toString(cprops.timeout)
0596: : EMPTY)) });
0597: }
0598:
0599: rw.rs = new DCResultSet(cols, al, this );
0600: } else {
0601: HashMap hm = (HashMap) DBUtils.getObjectFromMap(cacheName,
0602: datacache, HASHMAP_CLASS);
0603:
0604: // either return the contents of a particular cached resultset
0605: if (DBUtils.isStatementType(sql,
0606: DBUtils.QUERY_CACHE_DUMP_STATEMENT)) {
0607: String rname = DBUtils.getTargetName(sql,
0608: DBUtils.QUERY_CACHE_DUMP_STATEMENT);
0609:
0610: rw.rs = getCachedResultSet(cacheName, rname);
0611: }
0612: // or return the list of resultsets stored in a particular cache
0613: else {
0614: String[] cols = new String[] { "cache_name",
0615: "result_name" };
0616:
0617: ArrayList al = new ArrayList();
0618: Iterator iter = hm.keySet().iterator();
0619: while (iter.hasNext()) {
0620: String result = (String) iter.next();
0621: if (!result.equals(RESULTS_TO_CACHE_LINK)) {
0622: al.add(new Column[] {
0623: DBUtils.createColumn(cacheName),
0624: DBUtils.createColumn(result) });
0625: }
0626: }
0627: rw.rs = new DCResultSet(cols, al, this );
0628: }
0629:
0630: }
0631: }
0632:
0633: private final void processQueryStats(DCStatement st, String sql,
0634: ResultWrapper rw) throws SQLException {
0635: String[] cols = new String[] { "component", "subcomponent",
0636: "size", "extra" };
0637: ArrayList al = new ArrayList();
0638:
0639: al.add(DBUtils.createRow(new String[] { "caches", null,
0640: Integer.toString(caches.size()), null }));
0641: al.add(DBUtils.createRow(new String[] { "table_check", null,
0642: Integer.toString(tableCheck.size()), null }));
0643: al.add(DBUtils.createRow(new String[] { "update_check", null,
0644: Integer.toString(updateCheck.size()), null }));
0645: al.add(DBUtils
0646: .createRow(new String[] {
0647: "ops_awaiting_commit",
0648: null,
0649: Integer.toString(operationsAwaitingCommit
0650: .size()), null }));
0651:
0652: al.add(DBUtils.createRow(new String[] { "data_cache", null,
0653: Integer.toString(datacache.size()), null }));
0654:
0655: Iterator iter = datacache.keySet().iterator();
0656: while (iter.hasNext()) {
0657: String key = (String) iter.next();
0658: HashMap hm = (HashMap) datacache.get(key);
0659:
0660: al.add(DBUtils.createRow(new String[] {
0661: "data_cache",
0662: key,
0663: Integer
0664: .toString((hm.size() > 1 ? hm.size() - 1
0665: : 0)), null }));
0666: }
0667:
0668: HashMap hm = (HashMap) operationsAwaitingCommit.clone();
0669: iter = hm.keySet().iterator();
0670: while (iter.hasNext()) {
0671: Object key = iter.next();
0672: ArrayList oal = (ArrayList) hm.get(key);
0673:
0674: al.add(DBUtils.createRow(new String[] {
0675: "ops_awaiting_commit", (String) key,
0676: Integer.toString(oal.size()), oal.toString() }));
0677: }
0678:
0679: rw.rs = new DCResultSet(cols, al, this );
0680: }
0681:
0682: /**
0683: * process a select statement
0684: */
0685: private final void processSelect(DCStatement st, String sql,
0686: ResultWrapper rw) throws SQLException {
0687: Statement rst = null;
0688: ResultSet rrs = null;
0689: boolean close = true;
0690: try {
0691: String cacheName = null;
0692: String params = EMPTY;
0693:
0694: // if we are caching
0695: if (!st.nocaching && !st.conn.blockCache) {
0696: cacheName = StringUtils.getTagValue(sql,
0697: CACHE_BEGIN_TAG, CACHE_END_TAG);
0698:
0699: if (!StringUtils.isEmpty(cacheName)) {
0700: if (st.statementType != DCStatement.STATEMENT) {
0701: params = DBUtils.SEP
0702: + DBUtils
0703: .flatten(
0704: ((DCPreparedStatement) st).params,
0705: PIPE);
0706: }
0707:
0708: // lookup the cached result set and return if it's found
0709: DCResultSet tmp = (DCResultSet) getCachedResultSet(
0710: cacheName, sql + params);
0711: if (tmp != null) {
0712: if (tmp.getRowCount() > 0) {
0713: if (log.isDebugEnabled()) {
0714: log.debug("return cached data for "
0715: + sql + params);
0716: }
0717: rw.rs = tmp;
0718: rw.updateCount = -1;
0719: return;
0720: } else {
0721: if (log.isInfoEnabled()) {
0722: log.info("clearing empty result set ("
0723: + cacheName + ") " + sql
0724: + params);
0725: }
0726: clearResultsByKey(cacheName, sql + params);
0727: }
0728: }
0729: }
0730: }
0731:
0732: // create a real statement
0733: if (st.statementType == DCStatement.STATEMENT) {
0734: rst = st.getRealStatement();
0735: rrs = rst.executeQuery(sql);
0736: } else if (st.statementType == DCStatement.PREPARED) {
0737: rst = st.getRealStatement();
0738: PreparedStatement ps = (PreparedStatement) rst;
0739: rrs = ps.executeQuery();
0740: } else {
0741: // shouldn't get here unless something has gone bizarrely wrong
0742: throw new SQLException(
0743: "system error. unknown statement type");
0744: }
0745:
0746: // if we're not caching the results then just return the real resultset
0747: //if (StringUtils.isEmpty(cacheName) || rrs.getFetchSize() > getMaxRowsForCache(cacheName)) {
0748: if (StringUtils.isEmpty(cacheName)) {
0749: if (log.isDebugEnabled()) {
0750: log.debug("pass through SQL " + sql);
0751: }
0752: rw.rs = rrs;
0753: rw.updateCount = -1;
0754: close = false;
0755: } else {
0756: // otherwise we need to process the results and cache the data
0757: if (log.isDebugEnabled()) {
0758: log.debug("caching SQL " + sql + params);
0759: }
0760:
0761: DCResultSet dcrs = new DCResultSet(cacheName, sql
0762: + params, rrs, this , rrs.getConcurrency(), rrs
0763: .getType());
0764: rw.rs = dcrs;
0765: rw.updateCount = -1;
0766: }
0767: } finally {
0768: if (close) {
0769: DAOUtils.close(rrs);
0770: DAOUtils.close(rst);
0771: }
0772: }
0773: }
0774:
0775: /**
0776: * process an insert statement
0777: */
0778: private final void processInsert(DCStatement st, String sql,
0779: ResultWrapper rw) throws SQLException {
0780: Statement rst = null;
0781: try {
0782: rst = st.getRealStatement();
0783:
0784: // execute the update depending upon what params have been set in the statement
0785: if (st.autoGeneratedKeys != Integer.MIN_VALUE) {
0786: rw.updateCount = rst.executeUpdate(sql,
0787: st.autoGeneratedKeys);
0788: } else if (st.insertKeyColumnIndexes != null) {
0789: rw.updateCount = rst.executeUpdate(sql,
0790: st.insertKeyColumnIndexes);
0791: } else if (st.insertKeyColumns != null) {
0792: rw.updateCount = rst.executeUpdate(sql,
0793: st.insertKeyColumns);
0794: } else if (st.statementType == DCStatement.PREPARED) {
0795: DCPreparedStatement dcp = (DCPreparedStatement) st;
0796: dcp.realPS.execute();
0797: rw.updateCount = dcp.realPS.getUpdateCount();
0798: } else {
0799: rw.updateCount = rst.executeUpdate(sql);
0800: }
0801:
0802: // if an insert has occurred
0803: if (rw.updateCount > 0) {
0804: // get the table name for the insert
0805: String table = DBUtils.getTargetName(sql,
0806: DBUtils.INSERT_STATEMENT);
0807:
0808: // if we are watching this table
0809: if (!StringUtils.isEmpty(table)
0810: && updateCheck.containsKey(table)) {
0811: if (log.isDebugEnabled()) {
0812: log.debug("found a watched table " + table);
0813: }
0814:
0815: // get the list of caches linked to this table
0816: ArrayList ll = (ArrayList) updateCheck.get(table);
0817: if (ll != null) {
0818: // we'll store any cache operations here until a commit or rollback
0819: ArrayList operations = getOperations(st.conn);
0820:
0821: // loop through the caches
0822: Iterator citer = ll.iterator();
0823: while (citer.hasNext()) {
0824: String cache = (String) citer.next();
0825: CacheProperties cprops = (CacheProperties) caches
0826: .get(cache);
0827:
0828: if (cprops != null && cprops.resetOnInsert) {
0829: // add a remove cache op
0830: operations.add(new CacheOperation(
0831: CacheOperation.CACHE_CHANGE,
0832: cache, null));
0833: }
0834: }
0835: }
0836: }
0837: }
0838:
0839: } finally {
0840: DAOUtils.close(rst);
0841: }
0842: }
0843:
0844: private final void processUpdateOrDelete(boolean update,
0845: DCStatement st, String sql, ResultWrapper rw)
0846: throws SQLException {
0847: String table;
0848: if (update) {
0849: table = DBUtils
0850: .getTargetName(sql, DBUtils.UPDATE_STATEMENT);
0851: } else {
0852: table = DBUtils
0853: .getTargetName(sql, DBUtils.DELETE_STATEMENT);
0854: }
0855:
0856: Statement rst = st.getRealStatement();
0857: ResultSet rrs = null;
0858: DCResultSet dcrs = null;
0859: ResultSetMetaData rsmd = null;
0860: boolean docache = true;
0861: try {
0862: if (updateCheck.containsKey(table)) {
0863: String whereClause = DBUtils.getWhereClause(sql);
0864:
0865: if (StringUtils.isEmpty(whereClause)) {
0866: // no where clause, so wipe the entire cache
0867: ArrayList operations = getOperations(st.conn);
0868: ArrayList ll = (ArrayList) updateCheck.get(table);
0869: Iterator iter = ll.iterator();
0870: while (iter.hasNext()) {
0871: operations.add(new CacheOperation(
0872: CacheOperation.CACHE_CHANGE,
0873: (String) iter.next(), null));
0874: }
0875: docache = false;
0876: } else {
0877: String qurSQL = SELECT_FROM + table + WHERE
0878: + whereClause;
0879: int paramsCount = StringUtils.countOccurrences(
0880: whereClause, '?');
0881: // find out what data we need to clear from the cache
0882: try {
0883: if (st.statementType == DCStatement.PREPARED) {
0884: // we try to reuse a prepared statement if possible
0885: DCPreparedStatement dcp = (DCPreparedStatement) st;
0886:
0887: if (dcp.queryUpdateRowsSQL != null
0888: && !dcp.queryUpdateRowsSQL
0889: .equals(qurSQL)) {
0890: // but if the SQL isn't the same we need to create a new one
0891: DAOUtils.close(dcp.queryUpdateRowsPS);
0892: dcp.queryUpdateRowsPS = null;
0893: }
0894:
0895: if (dcp.queryUpdateRowsPS == null) {
0896: dcp.queryUpdateRowsPS = dcp.conn
0897: .getRealConnection()
0898: .prepareStatement(qurSQL);
0899: dcp.queryUpdateRowsSQL = qurSQL;
0900: }
0901:
0902: DBUtils.setParams(dcp.queryUpdateRowsPS,
0903: dcp.params, dcp.params.length
0904: - paramsCount, paramsCount);
0905: rrs = dcp.queryUpdateRowsPS.executeQuery();
0906: } else {
0907: rrs = rst.executeQuery(qurSQL);
0908: }
0909:
0910: dcrs = new DCResultSet(null, null, rrs, this ,
0911: rrs.getConcurrency(), rrs.getType());
0912:
0913: rsmd = dcrs.getMetaData();
0914:
0915: DAOUtils.close(rrs);
0916: } catch (SQLException se) {
0917: se.printStackTrace();
0918: log.warn("invalid SQL " + qurSQL);
0919: }
0920: }
0921: }
0922:
0923: if (st.statementType == DCStatement.PREPARED) {
0924: DCPreparedStatement dcp = (DCPreparedStatement) st;
0925: rw.updateCount = ((PreparedStatement) dcp
0926: .getRealStatement()).executeUpdate();
0927: } else {
0928: rw.updateCount = st.getRealStatement().executeUpdate(
0929: sql);
0930: }
0931:
0932: if (rw.updateCount > 0 && docache
0933: && tableCheck.containsKey(table)) {
0934: st.conn.blockCache = true;
0935: ArrayList ll = (ArrayList) tableCheck.get(table);
0936: Iterator citer = ll.iterator();
0937: while (citer.hasNext()) {
0938: String cache = (String) citer.next();
0939: CacheProperties cp = (CacheProperties) caches
0940: .get(cache);
0941: if (dcrs == null
0942: || (!update && cp.deleteCheck
0943: .contains(table))) {
0944: // this could be failed SQL query, i.e. one where we have no idea what will
0945: // have been updated, so we must clear the entire cache.
0946: // or it is in our 'delete check' list. in which case we also clear the cache
0947: ArrayList operations = getOperations(st.conn);
0948: operations.add(new CacheOperation(
0949: CacheOperation.CACHE_CHANGE, cache,
0950: null));
0951: } else if (cp.primaryTable.equals(table)) {
0952: // otherwise for an update of the primary table we'll only refresh
0953: // the cache for specific key changes
0954:
0955: dcrs.beforeFirst();
0956: int[] keyColumns = this .getPrimaryKeyColumns(
0957: cache, rsmd);
0958: if (keyColumns == null) {
0959: // if we can't work out the key columns, we'll have to reset the
0960: // cache as well
0961: if (log.isDebugEnabled()) {
0962: log
0963: .debug("unable to find key columns, resetting cache "
0964: + cache);
0965: }
0966: ArrayList operations = getOperations(st.conn);
0967: operations.add(new CacheOperation(
0968: CacheOperation.CACHE_CHANGE, cache,
0969: null));
0970: } else {
0971: // otherwise we'll just remove the results that are potentially
0972: // affected by the update
0973: dcrs.beforeFirst();
0974: while (dcrs.next()) {
0975: String key = DBUtils
0976: .getResultSetDataKey(cache,
0977: getKeyColumnData(
0978: keyColumns,
0979: dcrs));
0980: if (log.isDebugEnabled()) {
0981: log.debug("clearing data " + key);
0982: }
0983:
0984: ArrayList operations = getOperations(st.conn);
0985: operations.add(new CacheOperation(
0986: CacheOperation.KEY_CHANGE,
0987: cache, key));
0988: }
0989: }
0990: }
0991: }
0992: }
0993: } finally {
0994: DAOUtils.close(rrs);
0995: }
0996: }
0997:
0998: public void triggerCacheChanges(ArrayList ll) throws SQLException {
0999: if (ll != null) {
1000: Iterator iter = ll.iterator();
1001: while (iter.hasNext()) {
1002: CacheOperation op = (CacheOperation) iter.next();
1003: if (op.type == CacheOperation.CACHE_CHANGE) {
1004: if (log.isDebugEnabled()) {
1005: log.debug("removing all data for cache "
1006: + op.cache);
1007: }
1008: clearCache(op.cache);
1009: } else if (op.type == CacheOperation.KEY_CHANGE) {
1010: if (log.isDebugEnabled()) {
1011: log.debug("removing all data for key "
1012: + op.data);
1013: }
1014: clearResultsByKey(op.cache, op.data);
1015: } else if (op.type == CacheOperation.ALL_CHANGE) {
1016: if (log.isDebugEnabled()) {
1017: log.debug("removing all data");
1018: }
1019: clearAll();
1020: }
1021: }
1022: }
1023: }
1024:
1025: /**
1026: * rollback a connection (get rid of operations list)
1027: */
1028: protected void rollback(DCConnection conn) {
1029: operationsAwaitingCommit.remove(conn);
1030: if (log.isDebugEnabled()) {
1031: log.debug(Integer.toString(operationsAwaitingCommit.size())
1032: + " operations (after rollback)");
1033: }
1034: }
1035:
1036: /**
1037: * link a key to a resultset
1038: * @param cacheName the cache we're linking in
1039: * @param rsKeys the primary key data to use as key
1040: * @param sql the sql query used to generate the resultset
1041: */
1042: protected void setKeyToResultSetLink(String cacheName,
1043: String[] rsKeys, String sql) throws SQLException {
1044: String key = DBUtils.getResultSetDataKey(cacheName, rsKeys);
1045:
1046: HashMap hm = (HashMap) DBUtils.getObjectFromMap(cacheName,
1047: datacache, HASHMAP_CLASS);
1048: HashMap rtc = (HashMap) DBUtils.getObjectFromMap(
1049: RESULTS_TO_CACHE_LINK, hm, HASHMAP_CLASS);
1050: ArrayList ll = (ArrayList) DBUtils.getObjectFromMap(key, rtc,
1051: ARRAYLIST_CLASS);
1052:
1053: ll.add(sql);
1054: }
1055:
1056: /**
1057: * add a resultset to the cache
1058: */
1059: protected void setCachedResultSet(String cacheName, String sql,
1060: ResultSet rs) throws SQLException {
1061: if (log.isDebugEnabled()) {
1062: log.debug("adding cache result for " + cacheName + "::"
1063: + sql);
1064: }
1065:
1066: HashMap hm = (HashMap) DBUtils.getObjectFromMap(cacheName,
1067: datacache, HASHMAP_CLASS);
1068: synchronized (hm) {
1069: hm.put(sql, rs);
1070: }
1071: }
1072:
1073: /**
1074: * validate user access to this 'database'
1075: */
1076: protected boolean validate(String user, String password) {
1077: String pass = (String) users.get(user);
1078: return (!StringUtils.isEmpty(pass) && pass.equals(password));
1079: }
1080:
1081: }
|