001: /*
002: * Copyright Aduna (http://www.aduna-software.com/) (c) 2008.
003: *
004: * Licensed under the Aduna BSD-style license.
005: */
006: package org.openrdf.sail.rdbms.schema;
007:
008: import java.sql.PreparedStatement;
009: import java.sql.ResultSet;
010: import java.sql.SQLException;
011: import java.sql.Types;
012: import java.util.ArrayList;
013: import java.util.List;
014: import java.util.concurrent.BlockingQueue;
015:
016: /**
017: * Manages the rows in a value table. These tables have two columns: an internal
018: * id column and a value column.
019: *
020: * @author James Leigh
021: *
022: */
023: public class ValueTable {
024: public static int BATCH_SIZE = 8 * 1024;
025: public static final long NIL_ID = 0; // TODO
026: private static final String[] PKEY = { "id" };
027: private static final String[] VALUE_INDEX = { "value" };
028: private int length = -1;
029: private int sqlType;
030: private int idType;
031: private String INSERT;
032: private String INSERT_SELECT;
033: private String EXPUNGE;
034: private RdbmsTable table;
035: private RdbmsTable temporary;
036: private int removedStatementsSinceExpunge;
037: private ValueBatch batch;
038: private BlockingQueue<Batch> queue;
039: private boolean indexingValues;
040: private PreparedStatement insertSelect;
041:
042: public void setQueue(BlockingQueue<Batch> queue) {
043: this .queue = queue;
044: }
045:
046: public boolean isIndexingValues() {
047: return indexingValues;
048: }
049:
050: public void setIndexingValues(boolean indexingValues) {
051: this .indexingValues = indexingValues;
052: }
053:
054: public int getLength() {
055: return length;
056: }
057:
058: public void setLength(int length) {
059: this .length = length;
060: }
061:
062: public int getSqlType() {
063: return sqlType;
064: }
065:
066: public void setSqlType(int sqlType) {
067: this .sqlType = sqlType;
068: }
069:
070: public int getIdType() {
071: return idType;
072: }
073:
074: public void setIdType(int sqlType) {
075: this .idType = sqlType;
076: }
077:
078: public RdbmsTable getRdbmsTable() {
079: return table;
080: }
081:
082: public void setRdbmsTable(RdbmsTable table) {
083: this .table = table;
084: }
085:
086: public RdbmsTable getTemporaryTable() {
087: return temporary;
088: }
089:
090: public void setTemporaryTable(RdbmsTable temporary) {
091: this .temporary = temporary;
092: }
093:
094: public String getName() {
095: return table.getName();
096: }
097:
098: public long size() {
099: return table.size();
100: }
101:
102: public int getBatchSize() {
103: return BATCH_SIZE;
104: }
105:
106: public void initialize() throws SQLException {
107: StringBuilder sb = new StringBuilder();
108: sb.append("INSERT INTO ").append(getInsertTable().getName());
109: sb.append(" (id, value) VALUES (?, ?)");
110: INSERT = sb.toString();
111: sb.delete(0, sb.length());
112: sb.append("DELETE FROM ").append(table.getName()).append("\n");
113: sb.append("WHERE 1=1 ");
114: EXPUNGE = sb.toString();
115: if (temporary != null) {
116: sb.delete(0, sb.length());
117: sb.append("INSERT INTO ").append(table.getName());
118: sb.append(" (id, value) SELECT DISTINCT id, value FROM ");
119: sb.append(temporary.getName()).append(" tmp\n");
120: sb.append("WHERE NOT EXISTS (SELECT id FROM ").append(
121: table.getName());
122: sb.append(" val WHERE val.id = tmp.id)");
123: INSERT_SELECT = sb.toString();
124: }
125: if (!table.isCreated()) {
126: createTable(table);
127: table.index(PKEY);
128: if (isIndexingValues()) {
129: table.index(VALUE_INDEX);
130: }
131: } else {
132: table.count();
133: }
134: if (temporary != null && !temporary.isCreated()) {
135: createTemporaryTable(temporary);
136: }
137: }
138:
139: public void close() throws SQLException {
140: if (insertSelect != null) {
141: insertSelect.close();
142: }
143: if (temporary != null) {
144: temporary.close();
145: }
146: table.close();
147: }
148:
149: public synchronized void insert(Number id, Object value)
150: throws SQLException, InterruptedException {
151: ValueBatch batch = getValueBatch();
152: if (isExpired(batch)) {
153: batch = newValueBatch();
154: initBatch(batch);
155: }
156: batch.setObject(1, id);
157: batch.setObject(2, value);
158: batch.addBatch();
159: queue(batch);
160: }
161:
162: public ValueBatch getValueBatch() {
163: return this .batch;
164: }
165:
166: public boolean isExpired(ValueBatch batch) {
167: if (batch == null || batch.isFull())
168: return true;
169: return queue == null || !queue.remove(batch);
170: }
171:
172: public ValueBatch newValueBatch() {
173: return new ValueBatch();
174: }
175:
176: public void initBatch(ValueBatch batch) throws SQLException {
177: batch.setTable(table);
178: batch.setBatchStatement(prepareInsert(INSERT));
179: batch.setMaxBatchSize(getBatchSize());
180: if (temporary != null) {
181: batch.setTemporary(temporary);
182: if (insertSelect == null) {
183: insertSelect = prepareInsertSelect(INSERT_SELECT);
184: }
185: batch.setInsertStatement(insertSelect);
186: }
187: }
188:
189: public void queue(ValueBatch batch) throws SQLException,
190: InterruptedException {
191: this .batch = batch;
192: if (queue == null) {
193: batch.flush();
194: } else {
195: queue.put(batch);
196: }
197: }
198:
199: public void optimize() throws SQLException {
200: table.optimize();
201: }
202:
203: public boolean expungeRemovedStatements(int count, String condition)
204: throws SQLException {
205: removedStatementsSinceExpunge += count;
206: if (condition != null && timeToExpunge()) {
207: expunge(condition);
208: removedStatementsSinceExpunge = 0;
209: return true;
210: }
211: return false;
212: }
213:
214: public List<Long> maxIds(int shift, int mod) throws SQLException {
215: String column = "id";
216: StringBuilder expr = new StringBuilder();
217: expr.append("MOD((").append(column);
218: expr.append(" >> ").append(shift);
219: expr.append(") + ").append(mod).append(", ");
220: expr.append(mod);
221: expr.append(")");
222: StringBuilder sb = new StringBuilder();
223: sb.append("SELECT ").append(expr);
224: sb.append(", MAX(").append(column);
225: sb.append(")\nFROM ").append(getName());
226: sb.append("\nGROUP BY ").append(expr);
227: String query = sb.toString();
228: PreparedStatement st = table.prepareStatement(query);
229: try {
230: ResultSet rs = st.executeQuery();
231: try {
232: List<Long> result = new ArrayList<Long>();
233: while (rs.next()) {
234: result.add(rs.getLong(2));
235: }
236: return result;
237: } finally {
238: rs.close();
239: }
240: } finally {
241: st.close();
242: }
243: }
244:
245: public String sql(int type, int length) {
246: switch (type) {
247: case Types.VARCHAR:
248: if (length > 0)
249: return "VARCHAR(" + length + ")";
250: return "TEXT";
251: case Types.LONGVARCHAR:
252: if (length > 0)
253: return "LONGVARCHAR(" + length + ")";
254: return "TEXT";
255: case Types.BIGINT:
256: return "BIGINT";
257: case Types.INTEGER:
258: return "INTEGER";
259: case Types.SMALLINT:
260: return "SMALLINT";
261: case Types.FLOAT:
262: return "FLOAT";
263: case Types.DOUBLE:
264: return "DOUBLE";
265: case Types.DECIMAL:
266: return "DECIMAL";
267: case Types.BOOLEAN:
268: return "BOOLEAN";
269: case Types.TIMESTAMP:
270: return "TIMESTAMP";
271: default:
272: throw new AssertionError("Unsupported SQL Type: " + type);
273: }
274: }
275:
276: @Override
277: public String toString() {
278: return getName();
279: }
280:
281: protected void expunge(String condition) throws SQLException {
282: synchronized (table) {
283: int count = table.executeUpdate(EXPUNGE + condition);
284: table.modified(0, count);
285: }
286: }
287:
288: protected boolean timeToExpunge() {
289: return removedStatementsSinceExpunge > table.size() / 4;
290: }
291:
292: protected RdbmsTable getInsertTable() {
293: RdbmsTable tmp = getTemporaryTable();
294: if (tmp == null) {
295: tmp = getRdbmsTable();
296: }
297: return tmp;
298: }
299:
300: protected PreparedStatement prepareInsert(String sql)
301: throws SQLException {
302: return table.prepareStatement(sql);
303: }
304:
305: protected PreparedStatement prepareInsertSelect(String sql)
306: throws SQLException {
307: return table.prepareStatement(sql);
308: }
309:
310: protected void createTable(RdbmsTable table) throws SQLException {
311: StringBuilder sb = new StringBuilder();
312: sb.append(" id ").append(sql(idType, -1)).append(
313: " NOT NULL,\n");
314: sb.append(" value ").append(sql(sqlType, length));
315: sb.append(" NOT NULL\n");
316: table.createTable(sb);
317: }
318:
319: protected void createTemporaryTable(RdbmsTable table)
320: throws SQLException {
321: StringBuilder sb = new StringBuilder();
322: sb.append(" id ").append(sql(idType, -1)).append(
323: " NOT NULL,\n");
324: sb.append(" value ").append(sql(sqlType, length));
325: sb.append(" NOT NULL\n");
326: table.createTemporaryTable(sb);
327: }
328: }
|