001: /*-
002: * See the file LICENSE for redistribution information.
003: *
004: * Copyright (c) 2002,2008 Oracle. All rights reserved.
005: *
006: * $Id: DaemonThread.java,v 1.55.2.3 2008/01/07 15:14:18 cwl Exp $
007: */
008:
009: package com.sleepycat.je.utilint;
010:
011: import java.util.Collection;
012: import java.util.HashSet;
013: import java.util.Set;
014:
015: import com.sleepycat.je.DbInternal;
016: import com.sleepycat.je.DatabaseException;
017: import com.sleepycat.je.DeadlockException;
018: import com.sleepycat.je.ExceptionListener;
019: import com.sleepycat.je.dbi.EnvironmentImpl;
020: import com.sleepycat.je.latch.Latch;
021: import com.sleepycat.je.latch.LatchSupport;
022:
023: /**
024: * A daemon thread.
025: */
026: public abstract class DaemonThread implements DaemonRunner, Runnable {
027: private static final int JOIN_MILLIS = 10;
028: private long waitTime;
029: private Object synchronizer = new Object();
030: private Thread thread;
031: private EnvironmentImpl env;
032: protected String name;
033: protected Set workQueue;
034: protected Latch workQueueLatch;
035: protected int nWakeupRequests;
036: protected boolean stifleExceptionChatter = false;
037:
038: /* Fields shared between threads must be 'volatile'. */
039: private volatile boolean shutdownRequest = false;
040: private volatile boolean paused = false;
041:
042: /* This is not volatile because it is only an approximation. */
043: private boolean running = false;
044:
045: public DaemonThread(long waitTime, String name, EnvironmentImpl env) {
046: this .waitTime = waitTime;
047: this .name = name;
048: this .env = env;
049: workQueue = new HashSet();
050: workQueueLatch = LatchSupport.makeLatch(name + " work queue",
051: env);
052: }
053:
054: /**
055: * For testing.
056: */
057: public Thread getThread() {
058: return thread;
059: }
060:
061: /**
062: * If run is true, starts the thread if not started or unpauses it
063: * if already started; if run is false, pauses the thread if
064: * started or does nothing if not started.
065: */
066: public void runOrPause(boolean run) {
067: if (run) {
068: paused = false;
069: if (thread != null) {
070: wakeup();
071: } else {
072: thread = new Thread(this , name);
073: thread.setDaemon(true);
074: thread.start();
075: }
076: } else {
077: paused = true;
078: }
079: }
080:
081: public void requestShutdown() {
082: shutdownRequest = true;
083: }
084:
085: /**
086: * Requests shutdown and calls join() to wait for the thread to stop.
087: */
088: public void shutdown() {
089: if (thread != null) {
090: shutdownRequest = true;
091: while (thread.isAlive()) {
092: synchronized (synchronizer) {
093: synchronizer.notifyAll();
094: }
095: try {
096: thread.join(JOIN_MILLIS);
097: } catch (InterruptedException e) {
098:
099: /*
100: * Klockwork - ok
101: * Don't say anything about exceptions here.
102: */
103: }
104: }
105: thread = null;
106: }
107: }
108:
109: public String toString() {
110: StringBuffer sb = new StringBuffer();
111: sb.append("<DaemonThread name=\"").append(name).append("\"/>");
112: return sb.toString();
113: }
114:
115: public void addToQueue(Object o) throws DatabaseException {
116:
117: workQueueLatch.acquire();
118: workQueue.add(o);
119: wakeup();
120: workQueueLatch.release();
121: }
122:
123: public int getQueueSize() throws DatabaseException {
124:
125: workQueueLatch.acquire();
126: int count = workQueue.size();
127: workQueueLatch.release();
128: return count;
129: }
130:
131: /*
132: * Add an entry to the queue. Call this if the workQueueLatch is
133: * already held.
134: */
135: public void addToQueueAlreadyLatched(Collection c)
136: throws DatabaseException {
137:
138: workQueue.addAll(c);
139: }
140:
141: public void wakeup() {
142: if (!paused) {
143: synchronized (synchronizer) {
144: synchronizer.notifyAll();
145: }
146: }
147: }
148:
149: public void run() {
150: while (true) {
151: /* Check for shutdown request. */
152: if (shutdownRequest) {
153: break;
154: }
155: try {
156: workQueueLatch.acquire();
157: boolean nothingToDo = workQueue.size() == 0;
158: workQueueLatch.release();
159: if (nothingToDo) {
160: synchronized (synchronizer) {
161: if (waitTime == 0) {
162: synchronizer.wait();
163: } else {
164: synchronizer.wait(waitTime);
165: }
166: }
167: }
168:
169: /* Check for shutdown request. */
170: if (shutdownRequest) {
171: break;
172: }
173:
174: /* If paused, wait until notified. */
175: if (paused) {
176: synchronized (synchronizer) {
177: /* FindBugs whines unnecessarily here. */
178: synchronizer.wait();
179: }
180: continue;
181: }
182:
183: int numTries = 0;
184: int maxRetries = nDeadlockRetries();
185:
186: do {
187: try {
188: nWakeupRequests++;
189: running = true;
190: onWakeup();
191: break;
192: } catch (DeadlockException e) {
193: } finally {
194: running = false;
195: }
196: numTries++;
197:
198: /* Check for shutdown request. */
199: if (shutdownRequest) {
200: break;
201: }
202:
203: } while (numTries <= maxRetries);
204:
205: /* Check for shutdown request. */
206: if (shutdownRequest) {
207: break;
208: }
209: } catch (InterruptedException IE) {
210: ExceptionListener exceptionListener = env
211: .getExceptionListener();
212: if (exceptionListener != null) {
213: exceptionListener.exceptionThrown(DbInternal
214: .makeExceptionEvent(IE, name));
215: }
216:
217: if (!stifleExceptionChatter) {
218: System.err.println("Shutting down " + this
219: + " due to exception: " + IE);
220: }
221: shutdownRequest = true;
222: } catch (Exception E) {
223: ExceptionListener exceptionListener = env
224: .getExceptionListener();
225: if (exceptionListener != null) {
226: exceptionListener.exceptionThrown(DbInternal
227: .makeExceptionEvent(E, name));
228: }
229:
230: if (!stifleExceptionChatter) {
231: System.err
232: .println(this + " caught exception: " + E);
233: E.printStackTrace(System.err);
234: }
235: if (env.mayNotWrite()) {
236: if (!stifleExceptionChatter) {
237: System.err.println("Exiting");
238: }
239: shutdownRequest = true;
240: } else {
241: if (!stifleExceptionChatter) {
242: System.err.println("Continuing");
243: }
244: }
245: }
246: }
247: }
248:
249: /**
250: * Returns the number of retries to perform when Deadlock Exceptions
251: * occur.
252: */
253: protected int nDeadlockRetries() throws DatabaseException {
254:
255: return 0;
256: }
257:
258: /**
259: * onWakeup is synchronized to ensure that multiple invocations of the
260: * DaemonThread aren't made. isRunnable must be called from within
261: * onWakeup to avoid having the following sequence:
262: * Thread A: isRunnable() => true,
263: * Thread B: isRunnable() => true,
264: * Thread A: onWakeup() starts
265: * Thread B: waits for monitor on thread to call onWakeup()
266: * Thread A: onWakeup() completes rendering isRunnable() predicate false
267: * Thread B: onWakeup() starts, but isRunnable predicate is now false
268: */
269: abstract protected void onWakeup() throws DatabaseException;
270:
271: /**
272: * Returns whether shutdown has been requested. This method should be
273: * used to to terminate daemon loops.
274: */
275: protected boolean isShutdownRequested() {
276: return shutdownRequest;
277: }
278:
279: /**
280: * Returns whether the daemon is currently paused/disabled. This method
281: * should be used to to terminate daemon loops.
282: */
283: protected boolean isPaused() {
284: return paused;
285: }
286:
287: /**
288: * Returns whether the onWakeup method is currently executing. This is
289: * only an approximation and is used to avoid unnecessary wakeups.
290: */
291: public boolean isRunning() {
292: return running;
293: }
294:
295: /**
296: * For unit testing.
297: */
298: public int getNWakeupRequests() {
299: return nWakeupRequests;
300: }
301: }
|