001: /*
002: * Copyright Aduna (http://www.aduna-software.com/) (c) 2008.
003: *
004: * Licensed under the Aduna BSD-style license.
005: */
006: package org.openrdf.sail.rdbms.managers.base;
007:
008: import java.lang.reflect.InvocationHandler;
009: import java.lang.reflect.Method;
010: import java.lang.reflect.Proxy;
011: import java.sql.SQLException;
012: import java.util.concurrent.BlockingQueue;
013:
014: import org.slf4j.Logger;
015: import org.slf4j.LoggerFactory;
016:
017: import org.openrdf.sail.rdbms.managers.helpers.BatchBlockingQueue;
018: import org.openrdf.sail.rdbms.schema.Batch;
019:
020: public abstract class ManagerBase {
021: public static int BATCH_SIZE = 8 * 1024;
022:
023: public static int MIN_QUEUE = 128;
024:
025: public static int MAX_QUEUE = 96 * 1024;
026:
027: Exception exc;
028:
029: private Logger logger = LoggerFactory.getLogger(ManagerBase.class);
030:
031: public final BlockingQueue<Batch> queue = new BatchBlockingQueue(
032: MAX_QUEUE);
033:
034: private final Object working = new Object();
035:
036: private Thread thread;
037:
038: private int count;
039:
040: @SuppressWarnings("unchecked")
041: public BlockingQueue<Batch> getQueue() {
042: ClassLoader cl = getClass().getClassLoader();
043: Class<?>[] classes = new Class[] { BlockingQueue.class };
044: InvocationHandler h = new InvocationHandler() {
045:
046: public Object invoke(Object proxy, Method method,
047: Object[] args) throws Throwable {
048: Object result = method.invoke(queue, args);
049: checkQueueSize();
050: return result;
051: }
052: };
053: Object proxy = Proxy.newProxyInstance(cl, classes, h);
054: return (BlockingQueue<Batch>) proxy;
055: }
056:
057: public void close() throws SQLException {
058: try {
059: flush();
060: if (thread != null) {
061: queue.put(Batch.CLOSED_SIGNAL);
062: thread.join();
063: }
064: } catch (InterruptedException e) {
065: logger.warn(e.toString(), e);
066: }
067: throwException();
068: }
069:
070: public void flush() throws SQLException, InterruptedException {
071: throwException();
072: synchronized (working) {
073: throwException();
074: for (Batch b = queue.poll(); isFlushable(b); b = queue
075: .poll()) {
076: flush(b);
077: }
078: count = 0;
079: }
080: }
081:
082: public void clear() {
083: queue.clear();
084: }
085:
086: protected void optimize() throws SQLException {
087: // allow subclasses to optimise table
088: }
089:
090: void checkQueueSize() {
091: if (++count >= MIN_QUEUE && thread == null) {
092: String name = getClass().getSimpleName() + "-flusher";
093: thread = new Thread(new Runnable() {
094:
095: public void run() {
096: try {
097: insertThread(working);
098: } catch (Exception e) {
099: exc = e;
100: logger.error(e.toString(), e);
101: }
102: }
103: }, name);
104: thread.start();
105: }
106: }
107:
108: protected void flush(Batch batch) throws SQLException {
109: batch.flush();
110: }
111:
112: void insertThread(Object working) throws SQLException,
113: InterruptedException {
114: String name = Thread.currentThread().getName();
115: logger.debug("Starting helper thread {}", name);
116: int notReadyCount = 0;
117: for (Batch b = queue.take(); isFlushable(b); b = queue.take()) {
118: if (b.isReady() || queue.size() <= notReadyCount) {
119: synchronized (working) {
120: flush(b);
121: }
122: optimize();
123: notReadyCount = 0;
124: } else {
125: queue.add(b);
126: notReadyCount++;
127: }
128: }
129: logger.debug("Closing helper thread {}", name);
130: }
131:
132: private boolean isFlushable(Batch batch) {
133: return batch != null && batch != Batch.CLOSED_SIGNAL;
134: }
135:
136: private void throwException() throws SQLException {
137: if (exc instanceof SQLException) {
138: SQLException e = (SQLException) exc;
139: exc = null;
140: throw e;
141: } else if (exc instanceof RuntimeException) {
142: RuntimeException e = (RuntimeException) exc;
143: exc = null;
144: throw e;
145: }
146: }
147:
148: }
|