001: package prefuse.data.io.sql;
002:
003: import java.util.logging.Logger;
004:
005: import prefuse.data.Table;
006: import prefuse.data.io.DataIOException;
007: import prefuse.util.PrefuseConfig;
008: import prefuse.util.StringLib;
009: import prefuse.util.collections.CopyOnWriteArrayList;
010:
011: /**
012: * Worker thread that asynchronously handles a queue of jobs, with each job
013: * responsible for issuing a query and processing the results. Currently
014: * involves just a single thread, in the future this may be expanded to
015: * thread pool for greater concurrency.
016: *
017: * @author <a href="http://jheer.org">jeffrey heer</a>
018: * @see DatabaseDataSource
019: */
020: public class DataSourceWorker extends Thread {
021:
022: private static Logger s_logger = Logger
023: .getLogger(DataSourceWorker.class.getName());
024:
025: // TODO: in future, may want to expand this to a thread pool
026: private static DataSourceWorker s_instance;
027:
028: private static CopyOnWriteArrayList s_queue;
029:
030: /**
031: * Submit a job to the worker thread.
032: * @param e an {@link DataSourceWorker.Entry} instance that contains
033: * the parameters of the job.
034: */
035: public synchronized static void submit(Entry e) {
036: // perform lazily initialization as needed
037: if (s_queue == null)
038: s_queue = new CopyOnWriteArrayList();
039: if (s_instance == null)
040: s_instance = new DataSourceWorker();
041:
042: // queue it up
043: s_queue.add(e);
044:
045: // wake up a sleepy thread
046: synchronized (s_instance) {
047: s_instance.notify();
048: }
049: }
050:
051: // ------------------------------------------------------------------------
052:
053: /**
054: * Create a new DataSourceWorker.
055: */
056: private DataSourceWorker() {
057: super ("prefuse_DatabaseWorker");
058:
059: int priority = PrefuseConfig
060: .getInt("data.io.worker.threadPriority");
061: if (priority >= Thread.MIN_PRIORITY
062: && priority <= Thread.MAX_PRIORITY) {
063: this .setPriority(priority);
064: }
065: this .setDaemon(true);
066: this .start();
067: }
068:
069: /**
070: * @see java.lang.Runnable#run()
071: */
072: public void run() {
073: while (true) {
074: Entry e = null;
075: synchronized (s_queue) {
076: if (s_queue.size() > 0)
077: e = (Entry) s_queue.remove(0);
078: }
079:
080: if (e != null) {
081: try {
082: if (e.listener != null)
083: e.listener.preQuery(e);
084: e.ds.getData(e.table, e.query, e.keyField, e.lock);
085: if (e.listener != null)
086: e.listener.postQuery(e);
087: } catch (DataIOException dre) {
088: s_logger.warning(dre.getMessage() + "\n"
089: + StringLib.getStackTrace(dre));
090: }
091: } else {
092: // nothing to do, chill out until notified
093: try {
094: synchronized (this ) {
095: wait();
096: }
097: } catch (InterruptedException ex) {
098: }
099: }
100: }
101: }
102:
103: /**
104: * Stores the parameters of a data query and processing job.
105: * @author <a href="http://jheer.org">jeffrey heer</a>
106: */
107: public static class Entry {
108: /**
109: * Create a new Entry.
110: * @param ds the DatabaseDataSource to query
111: * @param table the Table for storing the results
112: * @param query the query to issue
113: * @param keyField the key field that should be used to identify
114: * when duplicate results occur
115: * @param lock an optional lock to synchronize on when processing
116: * data and adding it to the Table
117: * @param listener an optional callback listener that allows
118: * notifications to be issued before and after query processing
119: */
120: public Entry(DatabaseDataSource ds, Table table, String query,
121: String keyField, Object lock, Listener listener) {
122: this .ds = ds;
123: this .table = table;
124: this .query = query;
125: this .keyField = keyField;
126: this .lock = lock;
127: this .listener = listener;
128: }
129:
130: /** The DatabaseDataSource to query. */
131: DatabaseDataSource ds;
132: /** An optional callback listener that allows
133: * notifications to be issued before and after query processing. */
134: Listener listener;
135: /** The Table for storing the results. */
136: Table table;
137: /** The query to issue. */
138: String query;
139: /** The key field that should be used to identify
140: * when duplicate results occur. */
141: String keyField;
142: /** An optional lock to synchronize on when processing
143: * data and adding it to the Table. */
144: Object lock;
145: }
146:
147: /**
148: * Listener interface for receiving notifications about the status of
149: * a submitted data query and processing job.
150: * @author <a href="http://jheer.org">jeffrey heer</a>
151: */
152: public static interface Listener {
153: /**
154: * Notification that the query is about to be issued.
155: * @param job the current job being processed
156: */
157: public void preQuery(DataSourceWorker.Entry job);
158:
159: /**
160: * Notification that the query processing has just completed.
161: * @param job the current job being processed
162: */
163: public void postQuery(DataSourceWorker.Entry job);
164: }
165:
166: } // end of class DataSourceWorker
|