001: /*
002: * Distributed as part of c3p0 v.0.9.1.2
003: *
004: * Copyright (C) 2005 Machinery For Change, Inc.
005: *
006: * Author: Steve Waldman <swaldman@mchange.com>
007: *
008: * This library is free software; you can redistribute it and/or modify
009: * it under the terms of the GNU Lesser General Public License version 2.1, as
010: * published by the Free Software Foundation.
011: *
012: * This software is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
015: * GNU Lesser General Public License for more details.
016: *
017: * You should have received a copy of the GNU Lesser General Public License
018: * along with this software; see the file LICENSE. If not, write to the
019: * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
020: * Boston, MA 02111-1307, USA.
021: */
022:
023: package com.mchange.v2.async;
024:
025: import java.util.Collections;
026: import java.util.List;
027: import java.util.LinkedList;
028: import com.mchange.v2.log.MLevel;
029: import com.mchange.v2.log.MLog;
030: import com.mchange.v2.log.MLogger;
031: import com.mchange.v2.util.ResourceClosedException;
032:
033: public class CarefulRunnableQueue implements RunnableQueue, Queuable,
034: StrandedTaskReporting {
035: private final static MLogger logger = MLog
036: .getLogger(CarefulRunnableQueue.class);
037:
038: private List taskList = new LinkedList();
039: private TaskThread t = new TaskThread();
040:
041: private boolean shutdown_on_interrupt;
042:
043: private boolean gentle_close_requested = false;
044:
045: private List strandedTasks = null;
046:
047: public CarefulRunnableQueue(boolean daemon,
048: boolean shutdown_on_interrupt) {
049: this .shutdown_on_interrupt = shutdown_on_interrupt;
050: t.setDaemon(daemon);
051: t.start();
052: }
053:
054: public RunnableQueue asRunnableQueue() {
055: return this ;
056: }
057:
058: public synchronized void postRunnable(Runnable r) {
059: try {
060: if (gentle_close_requested)
061: throw new ResourceClosedException(
062: "Attempted to post a task to a closing "
063: + "CarefulRunnableQueue.");
064:
065: taskList.add(r);
066: this .notifyAll();
067: } catch (NullPointerException e) {
068: //e.printStackTrace();
069: if (Debug.DEBUG) {
070: if (logger.isLoggable(MLevel.FINE))
071: logger
072: .log(
073: MLevel.FINE,
074: "NullPointerException while posting Runnable.",
075: e);
076: }
077: if (taskList == null)
078: throw new ResourceClosedException(
079: "Attempted to post a task to a CarefulRunnableQueue "
080: + "which has been closed, or whose TaskThread has been "
081: + "interrupted.");
082: else
083: throw e;
084: }
085: }
086:
087: public synchronized void close(boolean skip_remaining_tasks) {
088: if (skip_remaining_tasks) {
089: t.safeStop();
090: t.interrupt();
091: } else
092: gentle_close_requested = true;
093: }
094:
095: public synchronized void close() {
096: this .close(true);
097: }
098:
099: public synchronized List getStrandedTasks() {
100: try {
101: while (gentle_close_requested && taskList != null)
102: this .wait();
103: return strandedTasks;
104: } catch (InterruptedException e) {
105: // very, very rare I think...
106: // if necessary I'll try a more complex solution, but I don't think
107: // it's worth it.
108: //e.printStackTrace();
109: if (logger.isLoggable(MLevel.WARNING))
110: logger
111: .log(
112: MLevel.WARNING,
113: Thread.currentThread()
114: + " interrupted while waiting for stranded tasks from CarefulRunnableQueue.",
115: e);
116:
117: throw new RuntimeException(
118: Thread.currentThread()
119: + " interrupted while waiting for stranded tasks from CarefulRunnableQueue.");
120: }
121: }
122:
123: private synchronized Runnable dequeueRunnable() {
124: Runnable r = (Runnable) taskList.get(0);
125: taskList.remove(0);
126: return r;
127: }
128:
129: private synchronized void awaitTask() throws InterruptedException {
130: while (taskList.size() == 0) {
131: if (gentle_close_requested) {
132: t.safeStop(); // remember t == Thread.currentThread()
133: t.interrupt();
134: }
135: this .wait();
136: }
137: }
138:
139: class TaskThread extends Thread {
140: boolean should_stop = false;
141:
142: TaskThread() {
143: super ("CarefulRunnableQueue.TaskThread");
144: }
145:
146: public synchronized void safeStop() {
147: should_stop = true;
148: }
149:
150: private synchronized boolean shouldStop() {
151: return should_stop;
152: }
153:
154: public void run() {
155: try {
156: while (!shouldStop()) {
157: try {
158: awaitTask();
159: Runnable r = dequeueRunnable();
160: try {
161: r.run();
162: } catch (Exception e) {
163: //System.err.println(this.getClass().getName() + " -- Unexpected exception in task!");
164: //e.printStackTrace();
165:
166: if (logger.isLoggable(MLevel.WARNING))
167: logger
168: .log(
169: MLevel.WARNING,
170: this .getClass()
171: .getName()
172: + " -- Unexpected exception in task!",
173: e);
174: }
175: } catch (InterruptedException e) {
176: if (shutdown_on_interrupt) {
177: CarefulRunnableQueue.this .close(false);
178: // if (Debug.DEBUG && Debug.TRACE >= Debug.TRACE_MED )
179: // System.err.println( this.toString() +
180: // " interrupted. Shutting down after current tasks" +
181: // " have completed." );
182: if (logger.isLoggable(MLevel.INFO))
183: logger
184: .info(this .toString()
185: + " interrupted. Shutting down after current tasks"
186: + " have completed.");
187: } else {
188: // if (Debug.DEBUG && Debug.TRACE >= Debug.TRACE_MED )
189: // System.err.println( this.toString() +
190: // " received interrupt. IGNORING." );
191: logger.info(this .toString()
192: + " received interrupt. IGNORING.");
193: }
194: }
195: }
196: }
197: // catch (ThreadDeath td) //DEBUG ONLY -- remove soon, swaldman 08-Jun-2003
198: // {
199: // System.err.print("c3p0-TRAVIS: ");
200: // System.err.println(this.getName() + ": Some bastard used the deprecated stop() method to kill me!!!!");
201: // td.printStackTrace();
202: // throw td;
203: // }
204: // catch (Throwable t) //DEBUG ONLY -- remove soon, swaldman 08-Jun-2003
205: // {
206: // System.err.print("c3p0-TRAVIS: ");
207: // System.err.println(this.getName() + ": Some unexpected Throwable occurred and killed me!!!!");
208: // t.printStackTrace();
209: // if (t instanceof Error)
210: // throw (Error) t;
211: // else if (t instanceof RuntimeException)
212: // throw (RuntimeException) t;
213: // else
214: // throw new InternalError( t.toString() ); //we don't expect any checked Exceptions can happen here.
215: // }
216: finally {
217: synchronized (CarefulRunnableQueue.this ) {
218: strandedTasks = Collections
219: .unmodifiableList(taskList);
220: taskList = null;
221: t = null;
222: CarefulRunnableQueue.this .notifyAll(); //if anyone is waiting for stranded tasks...
223: //System.err.print("c3p0-TRAVIS: ");
224: //System.err.println("TaskThread dead. strandedTasks: " + strandedTasks);
225: }
226: }
227: }
228: }
229: }
|