001: /*
002: * Copyright Aduna (http://www.aduna-software.com/) (c) 2008.
003: *
004: * Licensed under the Aduna BSD-style license.
005: */
006: package org.openrdf.sail.rdbms.schema;
007:
008: import java.sql.Connection;
009: import java.sql.PreparedStatement;
010: import java.sql.SQLException;
011: import java.util.concurrent.BlockingQueue;
012:
013: import org.openrdf.sail.helpers.DefaultSailChangedEvent;
014:
015: /**
016: * Manages a temporary table used when uploading new statements with the same
017: * predicate into the database.
018: *
019: * @author James Leigh
020: *
021: */
022: public class TransactionTable {
023: private int batchSize;
024: private TripleTable triples;
025: private int addedCount;
026: private int removedCount;
027: private RdbmsTable temporary;
028: private Connection conn;
029: private TripleBatch batch;
030: private BlockingQueue<Batch> queue;
031: private DefaultSailChangedEvent sailChangedEvent;
032: private IdSequence ids;
033: private PreparedStatement insertSelect;
034:
035: public void setIdSequence(IdSequence ids) {
036: this .ids = ids;
037: }
038:
039: public void setQueue(BlockingQueue<Batch> queue) {
040: this .queue = queue;
041: }
042:
043: public void setTemporaryTable(RdbmsTable table) {
044: this .temporary = table;
045: }
046:
047: public void setConnection(Connection conn) {
048: this .conn = conn;
049: }
050:
051: public TripleTable getTripleTable() {
052: return triples;
053: }
054:
055: public void setTripleTable(TripleTable statements) {
056: this .triples = statements;
057: }
058:
059: public void setSailChangedEvent(
060: DefaultSailChangedEvent sailChangedEvent) {
061: this .sailChangedEvent = sailChangedEvent;
062: }
063:
064: public int getBatchSize() {
065: return batchSize;
066: }
067:
068: public void setBatchSize(int size) {
069: this .batchSize = size;
070: }
071:
072: public void close() throws SQLException {
073: if (insertSelect != null) {
074: insertSelect.close();
075: }
076: temporary.close();
077: }
078:
079: public synchronized void insert(Number ctx, Number subj,
080: Number pred, Number obj) throws SQLException,
081: InterruptedException {
082: if (batch == null || batch.isFull() || !queue.remove(batch)) {
083: batch = newTripleBatch();
084: batch.setTable(triples);
085: batch.setSailChangedEvent(sailChangedEvent);
086: batch.setTemporary(temporary);
087: batch.setMaxBatchSize(getBatchSize());
088: batch.setBatchStatement(prepareInsert());
089: if (insertSelect == null) {
090: insertSelect = prepareInsertSelect(buildInsertSelect());
091: }
092: batch.setInsertStatement(insertSelect);
093: }
094: batch.setObject(1, ctx);
095: batch.setObject(2, subj);
096: if (temporary == null && !triples.isPredColumnPresent()) {
097: batch.setObject(3, obj);
098: } else {
099: batch.setObject(3, pred);
100: batch.setObject(4, obj);
101: }
102: batch.addBatch();
103: queue.put(batch);
104: addedCount++;
105: triples.getSubjTypes().add(ids.valueOf(subj));
106: triples.getObjTypes().add(ids.valueOf(obj));
107: }
108:
109: public void committed() throws SQLException {
110: triples.modified(addedCount, removedCount);
111: addedCount = 0;
112: removedCount = 0;
113: }
114:
115: public void removed(int count) throws SQLException {
116: removedCount += count;
117: }
118:
119: public boolean isEmpty() throws SQLException {
120: return triples.isEmpty() && addedCount == 0;
121: }
122:
123: @Override
124: public String toString() {
125: return triples.toString();
126: }
127:
128: protected TripleBatch newTripleBatch() {
129: return new TripleBatch();
130: }
131:
132: protected PreparedStatement prepareInsertSelect(String sql)
133: throws SQLException {
134: return conn.prepareStatement(sql);
135: }
136:
137: protected String buildInsertSelect() throws SQLException {
138: String tableName = triples.getName();
139: StringBuilder sb = new StringBuilder();
140: sb.append("INSERT INTO ").append(tableName).append("\n");
141: sb.append("SELECT DISTINCT ctx, subj, ");
142: if (triples.isPredColumnPresent()) {
143: sb.append("pred, ");
144: }
145: sb.append("obj FROM ");
146: sb.append(temporary.getName()).append(" tr\n");
147: sb.append("WHERE NOT EXISTS (");
148: sb.append("SELECT ctx, subj, ");
149: if (triples.isPredColumnPresent()) {
150: sb.append("pred, ");
151: }
152: sb.append("obj FROM ");
153: sb.append(tableName).append(" st\n");
154: sb.append("WHERE st.ctx = tr.ctx");
155: sb.append(" AND st.subj = tr.subj");
156: if (triples.isPredColumnPresent()) {
157: sb.append(" AND st.pred = tr.pred");
158: }
159: sb.append(" AND st.obj = tr.obj");
160: sb.append(")");
161: return sb.toString();
162: }
163:
164: protected PreparedStatement prepareInsert(String sql)
165: throws SQLException {
166: return conn.prepareStatement(sql);
167: }
168:
169: protected String buildInsert(String tableName,
170: boolean predColumnPresent) throws SQLException {
171: StringBuilder sb = new StringBuilder();
172: sb.append("INSERT INTO ").append(tableName);
173: sb.append(" (ctx, subj, ");
174: if (predColumnPresent) {
175: sb.append("pred, ");
176: }
177: sb.append("obj)\n");
178: sb.append("VALUES (?, ?, ");
179: if (predColumnPresent) {
180: sb.append("?, ");
181: }
182: sb.append("?)");
183: return sb.toString();
184: }
185:
186: protected boolean isPredColumnPresent() {
187: return triples.isPredColumnPresent();
188: }
189:
190: private PreparedStatement prepareInsert() throws SQLException {
191: if (temporary == null) {
192: boolean present = triples.isPredColumnPresent();
193: String sql = buildInsert(triples.getName(), present);
194: return prepareInsert(sql);
195: }
196: String sql = buildInsert(temporary.getName(), true);
197: return prepareInsert(sql);
198: }
199:
200: }
|