001: /*
002: * Copyright Aduna (http://www.aduna-software.com/) (c) 2008.
003: *
004: * Licensed under the Aduna BSD-style license.
005: */
006: package org.openrdf.sail.rdbms.managers;
007:
008: import java.sql.SQLException;
009: import java.util.ArrayList;
010: import java.util.Collection;
011: import java.util.HashMap;
012: import java.util.Iterator;
013: import java.util.List;
014: import java.util.Map;
015: import java.util.concurrent.ArrayBlockingQueue;
016: import java.util.concurrent.BlockingQueue;
017: import java.util.concurrent.atomic.AtomicInteger;
018:
019: import org.slf4j.Logger;
020: import org.slf4j.LoggerFactory;
021:
022: import org.openrdf.sail.rdbms.managers.base.ManagerBase;
023: import org.openrdf.sail.rdbms.model.RdbmsBNode;
024: import org.openrdf.sail.rdbms.model.RdbmsLiteral;
025: import org.openrdf.sail.rdbms.model.RdbmsURI;
026: import org.openrdf.sail.rdbms.model.RdbmsValue;
027: import org.openrdf.sail.rdbms.schema.Batch;
028: import org.openrdf.sail.rdbms.schema.HashBatch;
029: import org.openrdf.sail.rdbms.schema.HashTable;
030: import org.openrdf.sail.rdbms.schema.IdSequence;
031:
032: /**
033: *
034: * @author James Leigh
035: */
036: public class HashManager extends ManagerBase {
037: public static HashManager instance;
038: private Logger logger = LoggerFactory.getLogger(HashManager.class);
039: private HashTable table;
040: private Map<Long, Number> ids;
041: private AtomicInteger version = new AtomicInteger();
042: private BNodeManager bnodes;
043: private UriManager uris;
044: private LiteralManager literals;
045: private Thread lookupThread;
046: private Object working = new Object();
047: private BlockingQueue<RdbmsValue> queue;
048: private IdSequence idseq;
049: Exception exc;
050: RdbmsValue closeSignal = new RdbmsValue() {
051: private static final long serialVersionUID = -2211413309013905712L;
052:
053: public String stringValue() {
054: return null;
055: }
056: };
057:
058: public HashManager() {
059: instance = this ;
060: }
061:
062: public void setHashTable(HashTable table) {
063: this .table = table;
064: ids = new HashMap<Long, Number>(table.getBatchSize());
065: }
066:
067: public void setBNodeManager(BNodeManager bnodeTable) {
068: this .bnodes = bnodeTable;
069: }
070:
071: public void setLiteralManager(LiteralManager literalTable) {
072: this .literals = literalTable;
073: }
074:
075: public void setUriManager(UriManager uriTable) {
076: this .uris = uriTable;
077: }
078:
079: public void setIdSequence(IdSequence idseq) {
080: this .idseq = idseq;
081: }
082:
083: public void init() {
084: queue = new ArrayBlockingQueue<RdbmsValue>(table.getBatchSize());
085: lookupThread = new Thread(new Runnable() {
086: public void run() {
087: try {
088: lookupThread(working);
089: } catch (Exception e) {
090: exc = e;
091: logger.error(e.toString(), e);
092: }
093: }
094: }, "id-lookup");
095: lookupThread.start();
096: }
097:
098: @Override
099: public void close() throws SQLException {
100: try {
101: flush();
102: if (lookupThread != null) {
103: queue.put(closeSignal);
104: lookupThread.join();
105: }
106: } catch (InterruptedException e) {
107: logger.warn(e.toString(), e);
108: }
109: super .close();
110: table.close();
111: }
112:
113: public int getIdVersion() {
114: return version.intValue();
115: }
116:
117: public void optimize() throws SQLException {
118: table.optimize();
119: }
120:
121: public boolean removedStatements(int count, String condition)
122: throws SQLException {
123: if (table.expungeRemovedStatements(count, condition)) {
124: version.addAndGet(1);
125: return true;
126: }
127: return false;
128: }
129:
130: public void lookupId(RdbmsValue value) throws InterruptedException {
131: queue.put(value);
132: }
133:
134: public void assignId(RdbmsValue value, int version)
135: throws InterruptedException, SQLException {
136: List<RdbmsValue> values = new ArrayList<RdbmsValue>(
137: getChunkSize());
138: synchronized (working) {
139: throwException();
140: if (value.isExpired(version)) {
141: Map<Long, Number> map = new HashMap<Long, Number>(
142: getChunkSize());
143: values.add(value);
144: assignIds(values, map);
145: }
146: }
147: for (RdbmsValue v : values) {
148: insert(v);
149: }
150: }
151:
152: @Override
153: public void flush() throws SQLException, InterruptedException {
154: throwException();
155: List<RdbmsValue> values = new ArrayList<RdbmsValue>(
156: getChunkSize());
157: Map<Long, Number> map = new HashMap<Long, Number>(
158: getChunkSize());
159: RdbmsValue taken = queue.poll();
160: while (taken != null) {
161: values.clear();
162: values.add(taken);
163: synchronized (working) {
164: assignIds(values, map);
165: }
166: for (RdbmsValue v : values) {
167: insert(v);
168: }
169: taken = queue.poll();
170: if (taken == closeSignal) {
171: queue.add(taken);
172: taken = null;
173: }
174: }
175: super .flush();
176: }
177:
178: protected int getChunkSize() {
179: return table.getSelectChunkSize();
180: }
181:
182: @Override
183: protected void flush(Batch batch) throws SQLException {
184: super .flush(batch);
185: synchronized (working) {
186: synchronized (ids) {
187: HashBatch hb = (HashBatch) batch;
188: for (Long hash : hb.getHashes()) {
189: ids.remove(hash);
190: }
191: }
192: }
193: }
194:
195: void lookupThread(Object working) throws InterruptedException,
196: SQLException {
197: List<RdbmsValue> values = new ArrayList<RdbmsValue>(
198: getChunkSize());
199: Map<Long, Number> map = new HashMap<Long, Number>(
200: getChunkSize());
201: RdbmsValue taken = queue.take();
202: for (; taken != closeSignal; taken = queue.take()) {
203: values.clear();
204: values.add(taken);
205: synchronized (working) {
206: assignIds(values, map);
207: }
208: for (RdbmsValue v : values) {
209: insert(v);
210: }
211: }
212: }
213:
214: private void assignIds(List<RdbmsValue> values,
215: Map<Long, Number> map) throws SQLException,
216: InterruptedException {
217: while (values.size() < getChunkSize()) {
218: RdbmsValue taken = queue.poll();
219: if (taken == closeSignal) {
220: queue.add(taken);
221: break;
222: }
223: if (taken == null)
224: break;
225: values.add(taken);
226: }
227: Map<Long, Number> existing = lookup(values, map);
228: Iterator<RdbmsValue> iter = values.iterator();
229: while (iter.hasNext()) {
230: RdbmsValue value = iter.next();
231: Long hash = idseq.hashOf(value);
232: if (existing.get(hash) != null) {
233: // already in database
234: value.setInternalId(idseq.idOf(existing.get(hash)));
235: value.setVersion(getIdVersion(value));
236: iter.remove();
237: } else {
238: synchronized (ids) {
239: if (ids.containsKey(hash)) {
240: // already inserting this value
241: value.setInternalId(ids.get(hash));
242: value.setVersion(getIdVersion(value));
243: iter.remove();
244: } else {
245: // new id to be inserted
246: Number id = idseq.nextId(value);
247: value.setInternalId(id);
248: value.setVersion(getIdVersion(value));
249: ids.put(hash, id);
250: // keep on list for later insert
251: }
252: }
253: }
254: }
255: }
256:
257: private Map<Long, Number> lookup(Collection<RdbmsValue> values,
258: Map<Long, Number> map) throws SQLException {
259: assert !values.isEmpty();
260: assert values.size() <= getChunkSize();
261: map.clear();
262: for (RdbmsValue value : values) {
263: map.put(idseq.hashOf(value), null);
264: }
265: return table.load(map);
266: }
267:
268: private Integer getIdVersion(RdbmsValue value) {
269: if (value instanceof RdbmsLiteral)
270: return literals.getIdVersion();
271: if (value instanceof RdbmsURI)
272: return uris.getIdVersion();
273: assert value instanceof RdbmsBNode;
274: return bnodes.getIdVersion();
275: }
276:
277: private void insert(RdbmsValue value) throws SQLException,
278: InterruptedException {
279: Number id = value.getInternalId();
280: table.insert(id, idseq.hashOf(value));
281: if (value instanceof RdbmsLiteral) {
282: literals.insert(id, (RdbmsLiteral) value);
283: } else if (value instanceof RdbmsURI) {
284: uris.insert(id, (RdbmsURI) value);
285: } else {
286: assert value instanceof RdbmsBNode;
287: bnodes.insert(id, (RdbmsBNode) value);
288: }
289: }
290:
291: private void throwException() throws SQLException {
292: if (exc instanceof SQLException) {
293: SQLException e = (SQLException) exc;
294: exc = null;
295: throw e;
296: } else if (exc instanceof RuntimeException) {
297: RuntimeException e = (RuntimeException) exc;
298: exc = null;
299: throw e;
300: }
301: }
302:
303: }
|