001: /**
002: * com.mckoi.database.WorkerPool 12 Aug 2000
003: *
004: * Mckoi SQL Database ( http://www.mckoi.com/database )
005: * Copyright (C) 2000, 2001, 2002 Diehl and Associates, Inc.
006: *
007: * This program is free software; you can redistribute it and/or
008: * modify it under the terms of the GNU General Public License
009: * Version 2 as published by the Free Software Foundation.
010: *
011: * This program is distributed in the hope that it will be useful,
012: * but WITHOUT ANY WARRANTY; without even the implied warranty of
013: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
014: * GNU General Public License Version 2 for more details.
015: *
016: * You should have received a copy of the GNU General Public License
017: * Version 2 along with this program; if not, write to the Free Software
018: * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
019: *
020: * Change Log:
021: *
022: *
023: */package com.mckoi.database;
024:
025: import com.mckoi.debug.DebugLogger;
026: import java.util.LinkedList;
027:
028: /**
029: * Maintains a pool of worker threads that are used to dispatch commands to
030: * a Database sub-system.
031: *
032: * @author Tobias Downer
033: */
034:
035: final class WorkerPool {
036:
037: /**
038: * The TransactionSystem that this pool is part of.
039: */
040: private TransactionSystem system;
041:
042: /**
043: * This is the maximum number of worker threads that will be created.
044: */
045: private int MAXIMUM_WORKER_THREADS = 4;
046:
047: /**
048: * This is a queue of 'WorkerThread' objects that are currently available
049: * to process commands from the service providers.
050: */
051: private LinkedList available_worker_threads;
052:
053: /**
054: * The number of worker threads that have been created in total.
055: */
056: private int worker_thread_count;
057:
058: /**
059: * A list of pending Runnable objects that are due to be executed. This is
060: * a queue of events to be run.
061: */
062: private LinkedList run_queue;
063:
064: /**
065: * If this is set to false, then no commands will be executed by the
066: * 'execute' method.
067: */
068: private boolean is_executing_commands;
069:
070: /**
071: * Constructs the worker thread pool.
072: */
073: WorkerPool(TransactionSystem system, int max_worker_threads) {
074: this .system = system;
075: MAXIMUM_WORKER_THREADS = max_worker_threads;
076:
077: is_executing_commands = false;
078:
079: // Set up the run queue
080: run_queue = new LinkedList();
081: // Set up the worker threads
082: available_worker_threads = new LinkedList();
083: worker_thread_count = 0;
084: // // Create a single worker thread and start it.
085: // ++worker_thread_count;
086: // WorkerThread wt = new WorkerThread(this);
087: // wt.start();
088:
089: }
090:
091: /**
092: * Returns a DebugLogger object that we can use to log debug messages.
093: */
094: public final DebugLogger Debug() {
095: return system.Debug();
096: }
097:
098: // ---------- Thread Pooling methods ----------
099:
100: /**
101: * This is called by a WorkerThread when it is decided that it is ready
102: * to service a new command.
103: */
104: void notifyWorkerReady(WorkerThread worker_thread) {
105: synchronized (available_worker_threads) {
106: // Add it to the queue of worker threads that are available.
107: available_worker_threads.add(worker_thread);
108:
109: // Are there any commands pending?
110: int q_len = run_queue.size();
111: if (q_len > 0) {
112: // Execute the bottom element on the queue
113: RunCommand command = (RunCommand) run_queue.remove(0);
114: execute(command.user, command.database,
115: command.runnable);
116: }
117: }
118: }
119:
120: /**
121: * This returns the first available WorkerThread object from the thread
122: * pool. If there are no available worker threads available then it returns
123: * null. This method must execute fast and must not block.
124: */
125: private WorkerThread getFirstWaitingThread() {
126: synchronized (available_worker_threads) {
127: // Is there a worker thread available?
128: int size = available_worker_threads.size();
129: if (size > 0) {
130: // Yes so remove the first element and return it.
131: WorkerThread wt = (WorkerThread) available_worker_threads
132: .remove(0);
133: return wt;
134: } else {
135: // Otherwise create a new worker thread if we can.
136: if (worker_thread_count < MAXIMUM_WORKER_THREADS) {
137: ++worker_thread_count;
138: WorkerThread wt = new WorkerThread(this );
139: wt.start();
140: // NOTE: We must _not_ return the worker thread we have just created.
141: // We must wait until the worker thread has made it self known by
142: // it calling the 'notifyWorkerReady' method.
143: }
144: return null;
145: }
146: }
147: }
148:
149: /**
150: * Executes database functions from the 'run' method of the given runnable
151: * instance on a worker thread. All database functions should go through
152: * a worker thread. If we ensure this, we can easily stop all database
153: * functions from executing. Also, we only need to have a certain number
154: * of threads active at any one time rather than a unique thread for each
155: * connection.
156: */
157: void execute(User user, DatabaseConnection database, Runnable runner) {
158: synchronized (available_worker_threads) {
159: if (is_executing_commands) {
160: WorkerThread worker = getFirstWaitingThread();
161: if (worker != null) {
162: // System.out.println("[Database] executing runner");
163: worker.execute(user, database, runner);
164: return;
165: }
166: }
167: // System.out.println("[Database] adding to run queue");
168: RunCommand command = new RunCommand(user, database, runner);
169: run_queue.add(command);
170: }
171: }
172:
173: /**
174: * Controls whether the database is allowed to execute commands or not. If
175: * this is set to true, then calls to 'execute' will make commands execute.
176: */
177: void setIsExecutingCommands(boolean status) {
178: synchronized (available_worker_threads) {
179: if (status == true) {
180: is_executing_commands = true;
181:
182: // Execute everything on the queue
183: for (int i = run_queue.size() - 1; i >= 0; --i) {
184: RunCommand command = (RunCommand) run_queue
185: .remove(i);
186: execute(command.user, command.database,
187: command.runnable);
188: }
189: } else {
190: is_executing_commands = false;
191: }
192: }
193: }
194:
195: /**
196: * Waits until all executing commands have stopped. This is best called
197: * right after a call to 'setIsExecutingCommands(false)'. If these two
198: * commands are run, the database is in a known state where no commands
199: * can be executed.
200: * <p>
201: * NOTE: This can't be called from the WorkerThread. Deadlock will
202: * result if we were allowed to do this.
203: */
204: void waitUntilAllWorkersQuiet() {
205: if (Thread.currentThread() instanceof WorkerThread) {
206: throw new Error(
207: "Can't call this method from a WorkerThread!");
208: }
209:
210: synchronized (available_worker_threads) {
211: // loop until available works = total worker thread count.
212: while (worker_thread_count != available_worker_threads
213: .size()) {
214: // Wait half a second
215: try {
216: available_worker_threads.wait(500);
217: } catch (InterruptedException e) {
218: }
219: // ISSUE: If this lasts for more than 10 minutes, one of the worker
220: // threads is likely in a state of deadlock. If this happens, we
221: // should probably find all the open worker threads and clean them
222: // up nicely.
223: }
224: }
225: }
226:
227: /**
228: * Shuts down the WorkerPool object stopping all worker threads.
229: */
230: void shutdown() {
231: synchronized (available_worker_threads) {
232: while (available_worker_threads.size() > 0) {
233: WorkerThread wt = (WorkerThread) available_worker_threads
234: .remove(0);
235: --worker_thread_count;
236: wt.shutdown();
237: }
238: }
239: }
240:
241: // ---------- Inner classes ----------
242:
243: /**
244: * Structures within the run_queue list. This stores the Runnable to
245: * run and the User that's executing the command.
246: */
247: private static final class RunCommand {
248: User user;
249: DatabaseConnection database;
250: Runnable runnable;
251:
252: public RunCommand(User user, DatabaseConnection database,
253: Runnable runnable) {
254: this.user = user;
255: this.database = database;
256: this.runnable = runnable;
257: }
258: }
259:
260: }
|