001: /*
002: * Distributed as part of c3p0 v.0.9.1.2
003: *
004: * Copyright (C) 2005 Machinery For Change, Inc.
005: *
006: * Author: Steve Waldman <swaldman@mchange.com>
007: *
008: * This library is free software; you can redistribute it and/or modify
009: * it under the terms of the GNU Lesser General Public License version 2.1, as
010: * published by the Free Software Foundation.
011: *
012: * This software is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
015: * GNU Lesser General Public License for more details.
016: *
017: * You should have received a copy of the GNU Lesser General Public License
018: * along with this software; see the file LICENSE. If not, write to the
019: * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
020: * Boston, MA 02111-1307, USA.
021: */
022:
023: package com.mchange.v2.async;
024:
025: import java.util.*;
026: import com.mchange.v2.log.*;
027: import com.mchange.v2.util.ResourceClosedException;
028:
029: public class ThreadPerTaskAsynchronousRunner implements
030: AsynchronousRunner {
031: final static int PRESUME_DEADLOCKED_MULTIPLE = 3; //after three times the interrupt period, we presume deadlock
032:
033: final static MLogger logger = MLog
034: .getLogger(ThreadPerTaskAsynchronousRunner.class);
035:
036: //MT: unchanged post-ctor
037: final int max_task_threads;
038: final long interrupt_task_delay;
039:
040: //MT: protected by this' lock
041: LinkedList queue = new LinkedList();
042: ArrayList running = new ArrayList(); //as a Collection -- duplicate-accepting-ness is important, order is not
043: ArrayList deadlockSnapshot = null;
044: boolean still_open = true;
045:
046: //MT: thread-safe and not reassigned post-ctor
047: Thread dispatchThread = new DispatchThread();
048: Timer interruptAndDeadlockTimer;
049:
050: public ThreadPerTaskAsynchronousRunner(int max_task_threads) {
051: this (max_task_threads, 0);
052: }
053:
054: public ThreadPerTaskAsynchronousRunner(int max_task_threads,
055: long interrupt_task_delay) {
056: this .max_task_threads = max_task_threads;
057: this .interrupt_task_delay = interrupt_task_delay;
058: if (hasIdTimer()) {
059: interruptAndDeadlockTimer = new Timer(true);
060: TimerTask deadlockChecker = new TimerTask() {
061: public void run() {
062: checkForDeadlock();
063: }
064: };
065: long delay = interrupt_task_delay
066: * PRESUME_DEADLOCKED_MULTIPLE;
067: interruptAndDeadlockTimer.schedule(deadlockChecker, delay,
068: delay);
069: }
070:
071: dispatchThread.start();
072: }
073:
074: private boolean hasIdTimer() {
075: return (interrupt_task_delay > 0);
076: }
077:
078: public synchronized void postRunnable(Runnable r) {
079: if (still_open) {
080: queue.add(r);
081: this .notifyAll();
082: } else
083: throw new ResourceClosedException(
084: "Attempted to use a ThreadPerTaskAsynchronousRunner in a closed or broken state.");
085:
086: }
087:
088: public void close() {
089: close(true);
090: }
091:
092: public synchronized void close(boolean skip_remaining_tasks) {
093: if (still_open) {
094: this .still_open = false;
095: if (skip_remaining_tasks) {
096: queue.clear();
097: for (Iterator ii = running.iterator(); ii.hasNext();)
098: ((Thread) ii.next()).interrupt();
099: closeThreadResources();
100: }
101: }
102: }
103:
104: public synchronized int getRunningCount() {
105: return running.size();
106: }
107:
108: public synchronized Collection getRunningTasks() {
109: return (Collection) running.clone();
110: }
111:
112: public synchronized int getWaitingCount() {
113: return queue.size();
114: }
115:
116: public synchronized Collection getWaitingTasks() {
117: return (Collection) queue.clone();
118: }
119:
120: public synchronized boolean isClosed() {
121: return !still_open;
122: }
123:
124: public synchronized boolean isDoneAndGone() {
125: return (!dispatchThread.isAlive() && running.isEmpty() && interruptAndDeadlockTimer == null);
126: }
127:
128: private synchronized void acknowledgeComplete(TaskThread tt) {
129: if (!tt.isCompleted()) {
130: running.remove(tt);
131: tt.markCompleted();
132: ThreadPerTaskAsynchronousRunner.this .notifyAll();
133:
134: if (!still_open && queue.isEmpty() && running.isEmpty())
135: closeThreadResources();
136: }
137: }
138:
139: private synchronized void checkForDeadlock() {
140: if (deadlockSnapshot == null) {
141: if (running.size() == max_task_threads)
142: deadlockSnapshot = (ArrayList) running.clone();
143: } else if (running.size() < max_task_threads)
144: deadlockSnapshot = null;
145: else if (deadlockSnapshot.equals(running)) //deadlock!
146: {
147: if (logger.isLoggable(MLevel.WARNING)) {
148: StringBuffer warningMsg = new StringBuffer(1024);
149: warningMsg.append("APPARENT DEADLOCK! (");
150: warningMsg.append(this );
151: warningMsg
152: .append(") Deadlocked threads (unresponsive to interrupt()) are being set aside as hopeless and up to ");
153: warningMsg.append(max_task_threads);
154: warningMsg
155: .append(" may now be spawned for new tasks. If tasks continue to deadlock, you may run out of memory. Deadlocked task list: ");
156: for (int i = 0, len = deadlockSnapshot.size(); i < len; ++i) {
157: if (i != 0)
158: warningMsg.append(", ");
159: warningMsg.append(((TaskThread) deadlockSnapshot
160: .get(i)).getTask());
161: }
162:
163: logger.log(MLevel.WARNING, warningMsg.toString());
164: }
165:
166: // note "complete" here means from the perspective of the async runner, as complete
167: // as it will ever be, since the task is presumed hopelessly hung
168: for (int i = 0, len = deadlockSnapshot.size(); i < len; ++i)
169: acknowledgeComplete((TaskThread) deadlockSnapshot
170: .get(i));
171: deadlockSnapshot = null;
172: } else
173: deadlockSnapshot = (ArrayList) running.clone();
174: }
175:
176: private void closeThreadResources() {
177: if (interruptAndDeadlockTimer != null) {
178: interruptAndDeadlockTimer.cancel();
179: interruptAndDeadlockTimer = null;
180: }
181: dispatchThread.interrupt();
182: }
183:
184: class DispatchThread extends Thread {
185: DispatchThread() {
186: super ("Dispatch-Thread-for-"
187: + ThreadPerTaskAsynchronousRunner.this );
188: }
189:
190: public void run() {
191: synchronized (ThreadPerTaskAsynchronousRunner.this ) {
192: try {
193: while (true) {
194: while (queue.isEmpty()
195: || running.size() == max_task_threads)
196: ThreadPerTaskAsynchronousRunner.this .wait();
197:
198: Runnable next = (Runnable) queue.remove(0);
199: TaskThread doer = new TaskThread(next);
200: doer.start();
201: running.add(doer);
202: }
203: } catch (InterruptedException e) {
204: if (still_open) //we're not closed...
205: {
206: if (logger.isLoggable(MLevel.WARNING))
207: logger
208: .log(
209: MLevel.WARNING,
210: this .getName()
211: + " unexpectedly interrupted! Shutting down!");
212: close(false);
213: }
214: }
215: }
216: }
217: }
218:
219: class TaskThread extends Thread {
220: //MT: post-ctor constant
221: Runnable r;
222:
223: //MT: protected by this' lock
224: boolean completed = false;
225:
226: TaskThread(Runnable r) {
227: super ("Task-Thread-for-"
228: + ThreadPerTaskAsynchronousRunner.this );
229: this .r = r;
230: }
231:
232: Runnable getTask() {
233: return r;
234: }
235:
236: synchronized void markCompleted() {
237: completed = true;
238: }
239:
240: synchronized boolean isCompleted() {
241: return completed;
242: }
243:
244: public void run() {
245: try {
246: if (hasIdTimer()) {
247: TimerTask interruptTask = new TimerTask() {
248: public void run() {
249: TaskThread.this.interrupt();
250: }
251: };
252: interruptAndDeadlockTimer.schedule(interruptTask,
253: interrupt_task_delay);
254: }
255: r.run();
256: } finally {
257: acknowledgeComplete(this);
258: }
259: }
260: }
261: }
|