001: /*
002: * Copyright Aduna (http://www.aduna-software.com/) (c) 2008.
003: *
004: * Licensed under the Aduna BSD-style license.
005: */
006: package org.openrdf.sail.rdbms.managers;
007:
008: import java.sql.Connection;
009: import java.sql.SQLException;
010: import java.util.Collection;
011: import java.util.HashMap;
012: import java.util.Map;
013: import java.util.concurrent.BlockingQueue;
014:
015: import org.openrdf.sail.helpers.DefaultSailChangedEvent;
016: import org.openrdf.sail.rdbms.schema.Batch;
017: import org.openrdf.sail.rdbms.schema.IdSequence;
018: import org.openrdf.sail.rdbms.schema.RdbmsTable;
019: import org.openrdf.sail.rdbms.schema.TableFactory;
020: import org.openrdf.sail.rdbms.schema.TransactionTable;
021: import org.openrdf.sail.rdbms.schema.TripleTable;
022: import org.openrdf.sail.rdbms.schema.ValueTable;
023: import org.openrdf.sail.rdbms.schema.ValueTypes;
024:
025: /**
026: * Manages and delegates to a collection of {@link TransactionTable}s.
027: *
028: * @author James Leigh
029: *
030: */
031: public class TransTableManager {
032: public static int BATCH_SIZE = 8 * 1024;
033: public static final boolean TEMPORARY_TABLE_USED = TripleTable.UNIQUE_INDEX_TRIPLES;
034: private TableFactory factory;
035: private TripleTableManager triples;
036: private RdbmsTable temporaryTable;
037: private Map<Number, TransactionTable> tables = new HashMap<Number, TransactionTable>();
038: private int removedCount;
039: private String fromDummy;
040: private Connection conn;
041: private BlockingQueue<Batch> batchQueue;
042: private DefaultSailChangedEvent sailChangedEvent;
043: private IdSequence ids;
044:
045: public void setConnection(Connection conn) {
046: this .conn = conn;
047: }
048:
049: public void setTemporaryTableFactory(TableFactory factory) {
050: this .factory = factory;
051: }
052:
053: public void setStatementsTable(
054: TripleTableManager predicateTableManager) {
055: this .triples = predicateTableManager;
056: }
057:
058: public void setFromDummyTable(String fromDummy) {
059: this .fromDummy = fromDummy;
060: }
061:
062: public void setBatchQueue(BlockingQueue<Batch> queue) {
063: this .batchQueue = queue;
064: }
065:
066: public void setSailChangedEvent(
067: DefaultSailChangedEvent sailChangedEvent) {
068: this .sailChangedEvent = sailChangedEvent;
069: }
070:
071: public void setIdSequence(IdSequence ids) {
072: this .ids = ids;
073: }
074:
075: public int getBatchSize() {
076: return BATCH_SIZE;
077: }
078:
079: public void initialize() throws SQLException {
080: }
081:
082: public void insert(Number ctx, Number subj, Number pred, Number obj)
083: throws SQLException, InterruptedException {
084: getTable(pred).insert(ctx, subj, pred, obj);
085: }
086:
087: public void close() throws SQLException {
088: try {
089: if (temporaryTable != null) {
090: temporaryTable.drop();
091: temporaryTable.close();
092: }
093: } catch (SQLException e) {
094: // ignore
095: }
096: for (TransactionTable table : tables.values()) {
097: table.close();
098: }
099: }
100:
101: public String findTableName(Number pred) throws SQLException {
102: return triples.findTableName(pred);
103: }
104:
105: public String getCombinedTableName() throws SQLException {
106: String union = " UNION ALL ";
107: StringBuilder sb = new StringBuilder(1024);
108: sb.append("(");
109: for (Number pred : triples.getPredicateIds()) {
110: TripleTable predicate;
111: try {
112: predicate = triples.getPredicateTable(pred);
113: } catch (SQLException e) {
114: throw new AssertionError(e);
115: }
116: TransactionTable table = findTable(pred);
117: if ((table == null || table.isEmpty())
118: && predicate.isEmpty())
119: continue;
120: sb.append("SELECT ctx, subj, ");
121: if (predicate.isPredColumnPresent()) {
122: sb.append(" pred,");
123: } else {
124: sb.append(pred).append(" AS pred,");
125: }
126: sb.append(" obj");
127: sb.append("\nFROM ");
128: sb.append(predicate.getNameWhenReady());
129: sb.append(union);
130: predicate.blockUntilReady();
131: }
132: if (sb.length() < union.length())
133: return getEmptyTableName();
134: sb.delete(sb.length() - union.length(), sb.length());
135: sb.append(")");
136: return sb.toString();
137: }
138:
139: public String getTableName(Number pred) throws SQLException {
140: if (pred.equals(ValueTable.NIL_ID))
141: return getCombinedTableName();
142: String tableName = triples.getTableName(pred);
143: if (tableName == null)
144: return getEmptyTableName();
145: return tableName;
146: }
147:
148: public void committed(boolean locked) throws SQLException {
149: synchronized (tables) {
150: for (TransactionTable table : tables.values()) {
151: table.committed();
152: }
153: tables.clear();
154: }
155: if (removedCount > 0) {
156: triples.removed(removedCount, locked);
157: }
158: }
159:
160: public void removed(Number pred, int count) throws SQLException {
161: getTable(pred).removed(count);
162: removedCount += count;
163: }
164:
165: public Collection<Number> getPredicateIds() {
166: return triples.getPredicateIds();
167: }
168:
169: public boolean isPredColumnPresent(Number id) throws SQLException {
170: if (id.longValue() == ValueTable.NIL_ID)
171: return true;
172: return triples.getPredicateTable(id).isPredColumnPresent();
173: }
174:
175: public ValueTypes getObjTypes(Number pred) {
176: TripleTable table = triples.getExistingTable(pred);
177: if (table == null)
178: return ValueTypes.UNKNOWN;
179: return table.getObjTypes();
180: }
181:
182: public ValueTypes getSubjTypes(Number pred) {
183: TripleTable table = triples.getExistingTable(pred);
184: if (table == null)
185: return ValueTypes.RESOURCE;
186: return table.getSubjTypes();
187: }
188:
189: public boolean isEmpty() throws SQLException {
190: for (Number pred : triples.getPredicateIds()) {
191: TripleTable predicate;
192: try {
193: predicate = triples.getPredicateTable(pred);
194: } catch (SQLException e) {
195: throw new AssertionError(e);
196: }
197: TransactionTable table = findTable(pred);
198: if (table != null && !table.isEmpty()
199: || !predicate.isEmpty())
200: return false;
201: }
202: return true;
203: }
204:
205: protected String getZeroBigInt() {
206: return "0";
207: }
208:
209: protected TransactionTable getTable(Number pred)
210: throws SQLException {
211: synchronized (tables) {
212: TransactionTable table = tables.get(pred);
213: if (table == null) {
214: TripleTable predicate = triples.getPredicateTable(pred);
215: Number key = pred;
216: if (predicate.isPredColumnPresent()) {
217: key = ids.idOf(-1);
218: table = tables.get(key);
219: if (table != null)
220: return table;
221: }
222: table = createTransactionTable(predicate);
223: tables.put(key, table);
224: }
225: return table;
226: }
227: }
228:
229: protected TransactionTable createTransactionTable(
230: TripleTable predicate) throws SQLException {
231: if (temporaryTable == null && TEMPORARY_TABLE_USED) {
232: temporaryTable = createTemporaryTable(conn);
233: if (!temporaryTable.isCreated()) {
234: createTemporaryTable(temporaryTable);
235: }
236: }
237: TransactionTable table = createTransactionTable();
238: table.setIdSequence(ids);
239: table.setSailChangedEvent(sailChangedEvent);
240: table.setQueue(batchQueue);
241: table.setTripleTable(predicate);
242: table.setTemporaryTable(temporaryTable);
243: table.setConnection(conn);
244: table.setBatchSize(getBatchSize());
245: return table;
246: }
247:
248: protected RdbmsTable createTemporaryTable(Connection conn) {
249: return factory.createTemporaryTable(conn);
250: }
251:
252: protected TransactionTable createTransactionTable() {
253: return new TransactionTable();
254: }
255:
256: protected void createTemporaryTable(RdbmsTable table)
257: throws SQLException {
258: String type = ids.getSqlType();
259: StringBuilder sb = new StringBuilder();
260: sb.append(" ctx ").append(type).append(" NOT NULL,\n");
261: sb.append(" subj ").append(type).append(" NOT NULL,\n");
262: sb.append(" pred ").append(type).append(" NOT NULL,\n");
263: sb.append(" obj ").append(type).append(" NOT NULL\n");
264: table.createTemporaryTable(sb);
265: }
266:
267: private String getEmptyTableName() {
268: StringBuilder sb = new StringBuilder(256);
269: sb.append("(");
270: sb.append("SELECT ");
271: sb.append(getZeroBigInt()).append(" AS ctx, ");
272: sb.append(getZeroBigInt()).append(" AS subj, ");
273: sb.append(getZeroBigInt()).append(" AS pred, ");
274: sb.append(getZeroBigInt()).append(" AS obj ");
275: sb.append(fromDummy);
276: sb.append("\nWHERE 1=0");
277: sb.append(")");
278: return sb.toString();
279: }
280:
281: private TransactionTable findTable(Number pred) {
282: synchronized (tables) {
283: return tables.get(pred);
284: }
285: }
286:
287: }
|