001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017: package org.apache.jetspeed.statistics.impl;
018:
019: import java.sql.Connection;
020: import java.sql.PreparedStatement;
021: import java.sql.SQLException;
022: import java.util.Iterator;
023: import java.util.LinkedList;
024: import java.util.List;
025:
026: import javax.sql.DataSource;
027:
028: /**
029: * <p>
030: * BatchedStatistics
031: * </p>
032: *
033: * @author <a href="mailto:chris@bluesunrise.com">Chris Schaefer </a>
034: * @author <a href="mailto:taylor@apache.org">David Sean Taylor </a>
035: * @version $Id: TestPortletEntityDAO.java,v 1.3 2005/05/24 14:43:19 ate Exp $
036: */
037: public abstract class BatchedStatistics implements Runnable {
038:
039: public BatchedStatistics(DataSource ds, int batchSize,
040: long msElapsedTimeThreshold, String name) {
041: this .ds = ds;
042: this .msElapsedTimeThreshold = msElapsedTimeThreshold;
043: this .batchSize = batchSize;
044: this .name = name;
045: if (this .name == null) {
046: this .name = this .getClass().getName();
047: }
048: msLastFlushTime = System.currentTimeMillis();
049: thread = new Thread(this , name);
050:
051: }
052:
053: public void startThread() {
054:
055: thread.start();
056:
057: // give a quick break until the thread is running
058: // we know thread is running when done is false
059: while (this .done) {
060: try {
061: Thread.sleep(1);
062: } catch (InterruptedException e) {
063: }
064: }
065: }
066:
067: protected Connection getConnection() throws SQLException {
068: return ds.getConnection();
069: }
070:
071: /**
072: * should only be called from code synchronized to the linked list
073: */
074: private void checkAndDoFlush() {
075: long msCurrentTime = System.currentTimeMillis();
076: if ((logRecords.size() >= batchSize)
077: || (msCurrentTime - msLastFlushTime > msElapsedTimeThreshold)) {
078: flush();
079: msLastFlushTime = msCurrentTime;
080: }
081: }
082:
083: public void addStatistic(LogRecord logRecord) {
084: synchronized (logRecords) {
085: logRecords.add(logRecord);
086: checkAndDoFlush();
087: }
088: }
089:
090: public boolean isDone() {
091: return done;
092: }
093:
094: public void tellThreadToStop() {
095: keepRunning = false;
096: //this.thread.notify();
097: }
098:
099: private boolean done = true;
100:
101: private boolean keepRunning = true;
102:
103: public void run() {
104: done = false;
105: while (keepRunning) {
106: try {
107: synchronized (this .thread) {
108: this .thread.wait(msElapsedTimeThreshold / 4);
109: }
110: } catch (InterruptedException ie) {
111: keepRunning = false;
112: }
113: synchronized (logRecords) {
114: checkAndDoFlush();
115: }
116: }
117: // force a flush on the way out even if the constraints have not been
118: // met
119: synchronized (logRecords) {
120: flush();
121: }
122: done = true;
123: }
124:
125: /*
126: * (non-Javadoc)
127: *
128: * @see org.apache.jetspeed.statistics.impl.BatchedStatistics#flush() should
129: * only be called from code synchronized to the linked list
130: */
131: public void flush() {
132: if (logRecords.isEmpty())
133: return;
134:
135: Connection con = null;
136: PreparedStatement stm = null;
137:
138: try {
139: con = getConnection();
140: boolean autoCommit = con.getAutoCommit();
141: con.setAutoCommit(false);
142:
143: stm = getPreparedStatement(con);
144: Iterator recordIterator = logRecords.iterator();
145: while (recordIterator.hasNext()) {
146: LogRecord record = (LogRecord) recordIterator.next();
147:
148: loadOneRecordToStatement(stm, record);
149:
150: stm.addBatch();
151: }
152: stm.executeBatch();
153: con.commit();
154: // only clear the records if we actually store them...
155: logRecords.clear();
156: con.setAutoCommit(autoCommit);
157: } catch (SQLException e) {
158: // todo log to standard Jetspeed logger
159: e.printStackTrace();
160: try {
161: con.rollback();
162: } catch (Exception e2) {
163: }
164: } finally {
165: try {
166: if (stm != null)
167: stm.close();
168: } catch (SQLException se) {
169: }
170: releaseConnection(con);
171: }
172: }
173:
174: abstract protected PreparedStatement getPreparedStatement(
175: Connection con) throws SQLException;
176:
177: abstract protected void loadOneRecordToStatement(
178: PreparedStatement stm, LogRecord rec) throws SQLException;
179:
180: void releaseConnection(Connection con) {
181: try {
182: if (con != null)
183: con.close();
184: } catch (SQLException e) {
185: }
186: }
187:
188: protected Thread thread;
189:
190: protected long msLastFlushTime = 0;
191:
192: protected int batchSize = 10;
193:
194: protected long msElapsedTimeThreshold = 5000;
195:
196: protected List logRecords = new LinkedList();
197:
198: protected DataSource ds = null;
199:
200: protected String name;
201:
202: public abstract boolean canDoRecordType(LogRecord rec);
203:
204: }
|