001: // serverAbstractThread.java
002: // -----------------------
003: // (C) by Michael Peter Christen; mc@anomic.de
004: // first published on http://www.yacy.net
005: // Frankfurt, Germany, 2005
006: // last major change: 14.03.2005
007: //
008: // This program is free software; you can redistribute it and/or modify
009: // it under the terms of the GNU General Public License as published by
010: // the Free Software Foundation; either version 2 of the License, or
011: // (at your option) any later version.
012: //
013: // This program is distributed in the hope that it will be useful,
014: // but WITHOUT ANY WARRANTY; without even the implied warranty of
015: // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
016: // GNU General Public License for more details.
017: //
018: // You should have received a copy of the GNU General Public License
019: // along with this program; if not, write to the Free Software
020: // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
021: //
022: // Using this software in any meaning (reading, learning, copying, compiling,
023: // running) means that you agree that the Author(s) is (are) not responsible
024: // for cost, loss of data or any harm that may be caused directly or indirectly
025: // by usage of this softare or this documentation. The usage of this software
026: // is on your own risk. The installation and usage (starting/running) of this
027: // software may allow other people or application to access your computer and
028: // any attached devices and is highly dependent on the configuration of the
029: // software which must be done by the user of the software; the author(s) is
030: // (are) also not responsible for proper configuration and usage of the
031: // software, even if provoked by documentation provided together with
032: // the software.
033: //
034: // Any changes to this file according to the GPL as documented in the file
035: // gpl.txt aside this file in the shipment you received can be done to the
036: // lines that follows this copyright notice here, but changes must not be
037: // done inside the copyright notive above. A re-distribution must contain
038: // the intact and unchanged copyright notice.
039: // Contributions and changes to the program code must be marked as such.
040:
041: /*
042: an Implementation of a serverRunnable must only extend this class and impement
043: the methods:
044: open(),
045: job() and
046: close()
047: */
048:
049: package de.anomic.server;
050:
051: import java.nio.channels.ClosedByInterruptException;
052:
053: import de.anomic.server.logging.serverLog;
054:
055: public abstract class serverAbstractThread extends Thread implements
056: serverThread {
057:
058: private long startup = 0, intermission = 0, idlePause = 0,
059: busyPause = 0, blockPause = 0;
060: private boolean running = true;
061: private serverLog log = null;
062: private long idletime = 0, busytime = 0, memprereq = 0, memuse = 0;
063: private String shortDescr = "", longDescr = "";
064: private String monitorURL = null;
065: private long threadBlockTimestamp = System.currentTimeMillis();
066: private long idleCycles = 0, busyCycles = 0, outofmemoryCycles = 0;
067: private Object syncObject = null;
068: private boolean intermissionObedient = true;
069:
070: protected final void announceThreadBlockApply() {
071: // shall only be used, if a thread blocks for an important reason
072: // like a socket connect and must renew the timestamp to correct
073: // statistics
074: this .threadBlockTimestamp = System.currentTimeMillis();
075: }
076:
077: protected final void announceThreadBlockRelease() {
078: // shall only be used, if a thread blocks for an important reason
079: // like a socket connect and must renew the timestamp to correct
080: // statistics
081: long this BlockTime = (System.currentTimeMillis() - this .threadBlockTimestamp);
082: this .blockPause += this BlockTime;
083: this .busytime -= this BlockTime;
084: }
085:
086: protected final void announceMoreExecTime(long millis) {
087: this .busytime += millis;
088: }
089:
090: protected final void announceMoreSleepTime(long millis) {
091: this .idletime += millis;
092: }
093:
094: public final void setDescription(String shortText, String longText,
095: String monitorURL) {
096: // sets a visible description string
097: this .shortDescr = shortText;
098: this .longDescr = longText;
099: this .monitorURL = monitorURL;
100: }
101:
102: public final void setStartupSleep(long milliseconds) {
103: // sets a sleep time before execution of the job-loop
104: startup = milliseconds;
105: }
106:
107: public final long setIdleSleep(long milliseconds) {
108: // sets a sleep time for pauses between two jobs
109: idlePause = milliseconds;
110: return milliseconds;
111: }
112:
113: public final long setBusySleep(long milliseconds) {
114: // sets a sleep time for pauses between two jobs
115: busyPause = milliseconds;
116: return milliseconds;
117: }
118:
119: public void setMemPreReqisite(long freeBytes) {
120: // sets minimum required amount of memory for the job execution
121: memprereq = freeBytes;
122: }
123:
124: public void setObeyIntermission(boolean obey) {
125: // defines if the thread should obey the intermission command
126: intermissionObedient = obey;
127: }
128:
129: public final String getShortDescription() {
130: return this .shortDescr;
131: }
132:
133: public final String getLongDescription() {
134: return this .longDescr;
135: }
136:
137: public String getMonitorURL() {
138: return this .monitorURL;
139: }
140:
141: public final long getIdleCycles() {
142: // returns the total number of cycles of job execution with idle-result
143: return this .idleCycles;
144: }
145:
146: public final long getBusyCycles() {
147: // returns the total number of cycles of job execution with busy-result
148: return this .busyCycles;
149: }
150:
151: public long getOutOfMemoryCycles() {
152: // returns the total number of cycles where
153: // a job execution was omitted because of memory shortage
154: return this .outofmemoryCycles;
155: }
156:
157: public final long getBlockTime() {
158: // returns the total time that this thread has been blocked so far
159: return this .blockPause;
160: }
161:
162: public final long getSleepTime() {
163: // returns the total time that this thread has slept so far
164: return this .idletime;
165: }
166:
167: public final long getExecTime() {
168: // returns the total time that this thread has worked so far
169: return this .busytime;
170: }
171:
172: public long getMemoryUse() {
173: // returns the sum of all memory usage differences before and after one busy job
174: return memuse;
175: }
176:
177: public final void setLog(serverLog log) {
178: // defines a log where process states can be written to
179: this .log = log;
180: }
181:
182: public boolean shutdownInProgress() {
183: return !this .running || Thread.currentThread().isInterrupted();
184: }
185:
186: public void intermission(long pause) {
187: if (pause == Long.MAX_VALUE)
188: this .intermission = Long.MAX_VALUE;
189: else
190: this .intermission = System.currentTimeMillis() + pause;
191: }
192:
193: public void terminate(boolean waitFor) {
194: // after calling this method, the thread shall terminate
195: this .running = false;
196:
197: // interrupting the thread
198: this .interrupt();
199:
200: // wait for termination
201: if (waitFor) {
202: // Busy waiting removed: while (this.isAlive()) try {this.sleep(100);} catch (InterruptedException e) {break;}
203: try {
204: this .join(3000);
205: } catch (InterruptedException e) {
206: return;
207: }
208: }
209:
210: // If we reach this point, the process is closed
211: }
212:
213: /*
214: private final void logError(String text) {
215: if (log == null) serverLog.logSevere("THREAD-CONTROL", text);
216: else log.logSevere(text);
217: }
218: */
219:
220: private final void logError(String text, Throwable thrown) {
221: if (log == null)
222: serverLog.logSevere("THREAD-CONTROL", text, thrown);
223: else
224: log.logSevere(text, thrown);
225: }
226:
227: private void logSystem(String text) {
228: if (log == null)
229: serverLog.logConfig("THREAD-CONTROL", text);
230: else
231: log.logConfig(text);
232: }
233:
234: /*
235: private void logSystem(String text, Throwable thrown) {
236: if (log == null) serverLog.logConfig("THREAD-CONTROL", text, thrown);
237: else log.logConfig(text,thrown);
238: }
239: */
240:
241: public void jobExceptionHandler(Exception e) {
242: if (!(e instanceof ClosedByInterruptException)) {
243: // default handler for job exceptions. shall be overridden for own handler
244: logError(
245: "thread '" + this .getName() + "': " + e.toString(),
246: e);
247: }
248: }
249:
250: public void run() {
251: if (startup > 0) {
252: // do a startup-delay
253: logSystem("thread '" + this .getName()
254: + "' deployed, delaying start-up.");
255: ratz(startup);
256: if (!(running))
257: return;
258: }
259: this .open();
260: if (log != null) {
261: if (startup > 0)
262: logSystem("thread '"
263: + this .getName()
264: + "' delayed, "
265: + ((this .busyPause < 0) ? "starting now job."
266: : "starting now loop."));
267: else
268: logSystem("thread '"
269: + this .getName()
270: + "' deployed, "
271: + ((this .busyPause < 0) ? "starting job."
272: : "starting loop."));
273: }
274: long timestamp;
275: long memstamp0, memstamp1;
276: boolean isBusy;
277: //Runtime rt = Runtime.getRuntime();
278:
279: while (running) {
280: if ((this .intermissionObedient) && (this .intermission > 0)
281: && (this .intermission != Long.MAX_VALUE)) {
282: long itime = this .intermission
283: - System.currentTimeMillis();
284: if (itime > 0) {
285: if (itime > this .idlePause)
286: itime = this .idlePause;
287: logSystem("thread '" + this .getName()
288: + "' breaks for intermission: "
289: + (itime / 1000) + " seconds");
290: ratz(itime);
291: }
292: this .intermission = 0;
293: }
294:
295: if (this .intermission == Long.MAX_VALUE) {
296: // omit Job, paused
297: logSystem("thread '" + this .getName() + "' paused");
298: timestamp = System.currentTimeMillis();
299: ratz(this .idlePause);
300: idletime += System.currentTimeMillis() - timestamp;
301: //} else if ((memnow = serverMemory.available()) > memprereq) try {
302: } else if (serverMemory.request(memprereq, false))
303: try {
304: // do job
305: timestamp = System.currentTimeMillis();
306: memstamp0 = serverMemory.used();
307: isBusy = this .job();
308: // do memory and busy/idle-count/time monitoring
309: if (isBusy) {
310: memstamp1 = serverMemory.used();
311: if (memstamp1 >= memstamp0) {
312: // no GC in between. this is not shure but most probable
313: memuse += memstamp1 - memstamp0;
314: } else {
315: // GC was obviously in between. Add an average as simple heuristic
316: if (busyCycles > 0)
317: memuse += memuse / busyCycles;
318: }
319: busytime += System.currentTimeMillis()
320: - timestamp;
321: busyCycles++;
322: } else {
323: idleCycles++;
324: }
325: // interrupt loop if this is interrupted or supposed to be a one-time job
326: if ((!running) || (this .isInterrupted()))
327: break;
328: if ((this .idlePause < 0) || (this .busyPause < 0))
329: break; // for one-time jobs
330: // process scheduled pause
331: timestamp = System.currentTimeMillis();
332: ratz((isBusy) ? this .busyPause : this .idlePause);
333: idletime += System.currentTimeMillis() - timestamp;
334: } catch (Exception e) {
335: // handle exceptions: thread must not die on any unexpected exceptions
336: // if the exception is too bad it should call terminate()
337: this .jobExceptionHandler(e);
338: busyCycles++;
339: }
340: else {
341: log.logWarning("Thread '" + this .getName()
342: + "' runs short memory cycle. Free mem: "
343: + (serverMemory.available() / 1024)
344: + " KB, needed: " + (memprereq / 1024) + " KB");
345: // omit job, not enough memory
346: // process scheduled pause
347: timestamp = System.currentTimeMillis();
348: // do a clean-up
349: this .freemem();
350: // sleep a while
351: ratz(this .idlePause);
352: idletime += System.currentTimeMillis() - timestamp;
353: outofmemoryCycles++;
354: }
355: }
356: this .close();
357: logSystem("thread '" + this .getName() + "' terminated.");
358: }
359:
360: private void ratz(long millis) {
361: try {
362: if (this .syncObject != null) {
363: synchronized (this .syncObject) {
364: this .syncObject.wait(millis);
365: }
366: } else {
367: Thread.sleep(millis);
368: }
369: } catch (InterruptedException e) {
370: if (this .log != null)
371: this .log.logConfig("thread '" + this .getName()
372: + "' interrupted because of shutdown.");
373: }
374: }
375:
376: public void open() {
377: } // dummy definition; should be overriden
378:
379: public void close() {
380: } // dummy definition; should be overriden
381:
382: public void setSyncObject(Object sync) {
383: this .syncObject = sync;
384: }
385:
386: public Object getSyncObject() {
387: return this .syncObject;
388: }
389:
390: public void notifyThread() {
391: if (this .syncObject != null) {
392: synchronized (this .syncObject) {
393: if (this .log != null)
394: this .log
395: .logFine("thread '"
396: + this .getName()
397: + "' has received a notification from thread '"
398: + Thread.currentThread().getName()
399: + "'.");
400: this.syncObject.notifyAll();
401: }
402: }
403: }
404: }
|