001: /*
002:
003: Derby - Class org.apache.derby.impl.services.daemon.BasicDaemon
004:
005: Licensed to the Apache Software Foundation (ASF) under one or more
006: contributor license agreements. See the NOTICE file distributed with
007: this work for additional information regarding copyright ownership.
008: The ASF licenses this file to you under the Apache License, Version 2.0
009: (the "License"); you may not use this file except in compliance with
010: the License. You may obtain a copy of the License at
011:
012: http://www.apache.org/licenses/LICENSE-2.0
013:
014: Unless required by applicable law or agreed to in writing, software
015: distributed under the License is distributed on an "AS IS" BASIS,
016: WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017: See the License for the specific language governing permissions and
018: limitations under the License.
019:
020: */
021:
022: package org.apache.derby.impl.services.daemon;
023:
024: import org.apache.derby.iapi.services.context.ContextService;
025: import org.apache.derby.iapi.services.context.ContextManager;
026: import org.apache.derby.iapi.services.daemon.DaemonService;
027: import org.apache.derby.iapi.services.daemon.Serviceable;
028: import org.apache.derby.iapi.services.monitor.Monitor;
029: import org.apache.derby.iapi.services.monitor.ModuleFactory;
030: import org.apache.derby.iapi.services.sanity.SanityManager;
031:
032: import org.apache.derby.iapi.error.StandardException;
033:
034: import java.util.Vector;
035: import java.util.List;
036:
037: /**
038: A BasicDaemon is a background worker thread which does asynchronous I/O and
039: general clean up. It should not be used as a general worker thread for
040: parallel execution.
041:
042: One cannot count on the order of request or count on when the daemon will
043: wake up, even with serviceNow requests. Request are not persistent and not
044: recoverable, they are all lost when the system crashes or is shutdown.
045: System shutdown, even orderly ones, do not wait for daemons to finish its
046: work or empty its queue. Furthermore, any Serviceable subscriptions,
047: including onDemandOnly, must tolerate spurious services. The BasicDaemon
048: will setup a context manager with no context on it. The Serviceable
049: object's performWork must provide useful context on the context manager to
050: do its work. The BasicDaemon will wrap performWork call with try / catch
051: block and will use the ContextManager's error handling to clean up any
052: error. The BasicDaemon will guarentee serviceNow request will not be lost
053: as long as the jbms does not crash - however, if N serviceNow requests are
054: made by the same client, it may only be serviced once, not N times.
055:
056: Many Serviceable object will subscribe to the same BasicDaemon. Their
057: performWork method should be well behaved - in other words, it should not
058: take too long or hog too many resources or deadlock with anyone else. And
059: it cannot (should not) error out.
060:
061: The BasicDaemon implementation manages the DaemonService's data structure,
062: handles subscriptions and enqueues requests, and determine the service
063: schedule for its Serviceable objects. The BasicDaemon keeps an array
064: (Vector) of Serviceable subscriptions it also keeps 2 queues for clients
065: that uses it for one time service - the 1st queue is for a serviceNow
066: enqueue request, the 2nd queue is for non serviceNow enqueue request.
067:
068: This BasicDaemon services its clients in the following order:
069: 1. any subscribed client that have made a serviceNow request that has not
070: been fulfilled
071: 2. serviceable clients on the 1st queue
072: 3. all subscribed clients that are not onDemandOnly
073: 4. serviceable clients 2nd queue
074:
075: */
076: public class BasicDaemon implements DaemonService, Runnable {
077: private int numClients; // number of clients that needs services
078:
079: private static final int OPTIMAL_QUEUE_SIZE = 100;
080:
081: private final Vector subscription;
082:
083: // the context this daemon should run with
084: protected final ContextService contextService;
085: protected final ContextManager contextMgr;
086:
087: /**
088: Queues for the work to be done.
089: These are synchronized by this object.
090: */
091: private final List highPQ; // high priority queue
092: private final List normPQ; // normal priority queue
093:
094: /**
095: which subscribed clients to service next?
096: only accessed by daemon thread
097: */
098: private int nextService;
099:
100: /*
101: ** State for the sleep/wakeup routines.
102: */
103:
104: private boolean awakened; // a wake up call has been issued
105: // MT - synchronized on this
106:
107: /**
108: true if I'm waiting, if this is false then I am running and a notify is not required.
109: */
110: private boolean waiting;
111:
112: private boolean inPause; // if true, don't do anything
113: private boolean running; // I am running now
114: private boolean stopRequested; // thread is requested to die
115: private boolean stopped; // we have stopped
116:
117: private long lastServiceTime; // when did I last wake up on a timer
118: private int earlyWakeupCount; // if I am waken up a couple of times, check
119:
120: // that lastServiceTime to make sure work
121: // scheduled on a timer gets done once in a
122: // while
123:
124: /**
125: make a BasicDaemon
126: */
127: public BasicDaemon(ContextService contextService) {
128: this .contextService = contextService;
129: this .contextMgr = contextService.newContextManager();
130:
131: subscription = new Vector(1, 1);
132: highPQ = new java.util.LinkedList();
133: normPQ = new java.util.LinkedList();
134:
135: lastServiceTime = System.currentTimeMillis();
136: }
137:
138: public int subscribe(Serviceable newClient, boolean onDemandOnly) {
139: int clientNumber;
140:
141: ServiceRecord clientRecord;
142:
143: synchronized (this ) {
144: clientNumber = numClients++;
145:
146: clientRecord = new ServiceRecord(newClient, onDemandOnly,
147: true);
148: subscription.insertElementAt(clientRecord, clientNumber);
149: }
150:
151: if (SanityManager.DEBUG) {
152: if (SanityManager.DEBUG_ON(DaemonService.DaemonTrace))
153: SanityManager.DEBUG(DaemonService.DaemonTrace,
154: "subscribed client # " + clientNumber + " : "
155: + clientRecord);
156: }
157:
158: return clientNumber;
159: }
160:
161: /**
162: * Removes a client from the list of subscribed clients. The call does not
163: * wait for the daemon to finish the work it is currently performing.
164: * Therefore, the client must tolerate that its <code>performWork()</code>
165: * method could be invoked even after the call to
166: * <code>unsubscribe()</code> has returned (but not more than once).
167: *
168: * @param clientNumber client identifier
169: */
170: public void unsubscribe(int clientNumber) {
171: if (clientNumber < 0 || clientNumber > subscription.size())
172: return;
173:
174: // client number is never reused. Just null out the vector entry.
175: subscription.setElementAt(null, clientNumber);
176: }
177:
178: public void serviceNow(int clientNumber) {
179: if (clientNumber < 0 || clientNumber > subscription.size())
180: return;
181:
182: ServiceRecord clientRecord = (ServiceRecord) subscription
183: .elementAt(clientNumber);
184: if (clientRecord == null)
185: return;
186:
187: clientRecord.called();
188: wakeUp();
189: }
190:
191: public boolean enqueue(Serviceable newClient, boolean serviceNow) {
192: ServiceRecord clientRecord = new ServiceRecord(newClient,
193: false, false);
194:
195: if (SanityManager.DEBUG) {
196: if (SanityManager.DEBUG_ON(DaemonService.DaemonTrace))
197: SanityManager.DEBUG(DaemonService.DaemonTrace,
198: "enqueing work, urgent = " + serviceNow + ":"
199: + newClient);
200: }
201:
202: List queue = serviceNow ? highPQ : normPQ;
203:
204: int highPQsize;
205: synchronized (this ) {
206: queue.add(clientRecord);
207: highPQsize = highPQ.size();
208:
209: if (SanityManager.DEBUG) {
210:
211: if (SanityManager.DEBUG_ON("memoryLeakTrace")) {
212:
213: if (highPQsize > (OPTIMAL_QUEUE_SIZE * 2))
214: System.out
215: .println("memoryLeakTrace:BasicDaemon "
216: + highPQsize);
217: }
218: }
219: }
220:
221: if (serviceNow && !awakened)
222: wakeUp();
223:
224: if (serviceNow) {
225: return highPQsize > OPTIMAL_QUEUE_SIZE;
226: }
227: return false;
228: }
229:
230: /**
231: Get rid of all queued up Serviceable tasks.
232: */
233: public synchronized void clear() {
234: normPQ.clear();
235: highPQ.clear();
236: }
237:
238: /*
239: * class specific methods
240: */
241:
242: protected ServiceRecord nextAssignment(boolean urgent) {
243: // first goes thru the subscription list, then goes thru highPQ;
244: ServiceRecord clientRecord;
245:
246: while (nextService < subscription.size()) {
247: clientRecord = (ServiceRecord) subscription
248: .elementAt(nextService++);
249: if (clientRecord != null
250: && (clientRecord.needImmediateService() || (!urgent && clientRecord
251: .needService())))
252: return clientRecord;
253: }
254:
255: clientRecord = null;
256:
257: synchronized (this ) {
258: if (!highPQ.isEmpty())
259: clientRecord = (ServiceRecord) highPQ.remove(0);
260: }
261:
262: if (urgent || clientRecord != null) {
263: if (SanityManager.DEBUG) {
264: if (SanityManager.DEBUG_ON(DaemonService.DaemonTrace))
265: SanityManager
266: .DEBUG(
267: DaemonService.DaemonTrace,
268: clientRecord == null ? "No more urgent assignment "
269: : "Next urgent assignment : "
270: + clientRecord);
271: }
272:
273: return clientRecord;
274: }
275:
276: clientRecord = null;
277: synchronized (this ) {
278: if (!normPQ.isEmpty()) {
279: clientRecord = (ServiceRecord) normPQ.remove(0);
280:
281: if (SanityManager.DEBUG) {
282: if (SanityManager
283: .DEBUG_ON(DaemonService.DaemonTrace))
284: SanityManager.DEBUG(DaemonService.DaemonTrace,
285: "Next normal enqueued : "
286: + clientRecord);
287: }
288: }
289:
290: // else no more work
291: }
292:
293: if (SanityManager.DEBUG) {
294: if (SanityManager.DEBUG_ON(DaemonService.DaemonTrace)) {
295: if (clientRecord == null)
296: SanityManager.DEBUG(DaemonService.DaemonTrace,
297: "No more assignment");
298: }
299: }
300:
301: return clientRecord;
302: }
303:
304: protected void serviceClient(ServiceRecord clientRecord) {
305: clientRecord.serviced();
306:
307: Serviceable client = clientRecord.client;
308:
309: // client may have unsubscribed while it had items queued
310: if (client == null)
311: return;
312:
313: ContextManager cm = contextMgr;
314:
315: if (SanityManager.DEBUG) {
316: SanityManager.ASSERT(cm != null, "Context manager is null");
317: SanityManager.ASSERT(client != null, "client is null");
318: }
319:
320: try {
321: int status = client.performWork(cm);
322:
323: if (clientRecord.subscriber)
324: return;
325:
326: if (status == Serviceable.REQUEUE) {
327: List queue = client.serviceASAP() ? highPQ : normPQ;
328: synchronized (this ) {
329: queue.add(clientRecord);
330:
331: if (SanityManager.DEBUG) {
332:
333: if (SanityManager.DEBUG_ON("memoryLeakTrace")) {
334:
335: if (queue.size() > (OPTIMAL_QUEUE_SIZE * 2))
336: System.out
337: .println("memoryLeakTrace:BasicDaemon "
338: + queue.size());
339: }
340: }
341: }
342: }
343:
344: return;
345: } catch (Throwable e) {
346: if (SanityManager.DEBUG)
347: SanityManager.showTrace(e);
348: cm.cleanupOnError(e);
349: }
350: }
351:
352: /*
353: * Runnable methods
354: */
355: public void run() {
356: contextService.setCurrentContextManager(contextMgr);
357:
358: if (SanityManager.DEBUG) {
359: if (SanityManager.DEBUG_ON(DaemonService.DaemonOff)) {
360: SanityManager
361: .DEBUG(DaemonService.DaemonTrace,
362: "DaemonOff is set in properties, background Daemon not run");
363: return;
364: }
365: SanityManager.DEBUG(DaemonService.DaemonTrace, "running");
366: }
367:
368: // infinite loop of rest and work
369: while (true) {
370: if (stopRequested())
371: break;
372:
373: // if someone wake me up, only service the urgent requests.
374: // if I wake up by my regular schedule, service all clients
375: boolean urgentOnly = rest();
376:
377: if (stopRequested())
378: break;
379:
380: if (!inPause())
381: work(urgentOnly);
382: }
383:
384: synchronized (this ) {
385: running = false;
386: stopped = true;
387: }
388: contextMgr.cleanupOnError(StandardException.normalClose());
389: contextService.resetCurrentContextManager(contextMgr);
390: }
391:
392: /*
393: * Daemon Service method
394: */
395:
396: /*
397: * pause the daemon. Wait till it is no running before it returns
398: */
399: public void pause() {
400: if (SanityManager.DEBUG) {
401: if (SanityManager.DEBUG_ON(DaemonService.DaemonTrace))
402: SanityManager.DEBUG(DaemonService.DaemonTrace,
403: "pausing daemon");
404: }
405:
406: synchronized (this ) {
407: inPause = true;
408: while (running) {
409: if (SanityManager.DEBUG) {
410: if (SanityManager
411: .DEBUG_ON(DaemonService.DaemonTrace))
412: SanityManager.DEBUG(DaemonService.DaemonTrace,
413: "waiting for daemon run to finish");
414: }
415:
416: try {
417: wait();
418: } catch (InterruptedException ie) {
419: // someone interrrupt us, done running
420: }
421: }
422: }
423:
424: if (SanityManager.DEBUG) {
425: if (SanityManager.DEBUG_ON(DaemonService.DaemonTrace))
426: SanityManager.DEBUG(DaemonService.DaemonTrace,
427: "daemon paused");
428: }
429: }
430:
431: public void resume() {
432: synchronized (this ) {
433: inPause = false;
434: }
435:
436: if (SanityManager.DEBUG) {
437: if (SanityManager.DEBUG_ON(DaemonService.DaemonTrace))
438: SanityManager.DEBUG(DaemonService.DaemonTrace,
439: "daemon resumed");
440: }
441: }
442:
443: /**
444: Finish what we are doing and at the next convenient moment, get rid of
445: the thread and make the daemon object goes away if possible.
446:
447: remember we are calling from another thread
448: */
449: public void stop() {
450: if (stopped) // already stopped
451: return;
452:
453: synchronized (this ) {
454: stopRequested = true;
455: notifyAll(); // get sleeper to wake up and stop ASAP
456: }
457:
458: pause(); // finish doing what we are doing first
459:
460: }
461:
462: /*
463: **Wait until the work in the high priority queue is done.
464: **Note: Used by tests only to make sure all the work
465: **assigned to the daemon is completed.
466: **/
467: public void waitUntilQueueIsEmpty() {
468: while (true) {
469: synchronized (this ) {
470: boolean noSubscriptionRequests = true;
471: for (int urgentServiced = 0; urgentServiced < subscription
472: .size(); urgentServiced++) {
473: ServiceRecord clientRecord = (ServiceRecord) subscription
474: .elementAt(urgentServiced);
475: if (clientRecord != null
476: && clientRecord.needService()) {
477: noSubscriptionRequests = false;
478: break;
479: }
480: }
481:
482: if (highPQ.isEmpty() && noSubscriptionRequests
483: && !running) {
484: return;
485: } else {
486:
487: notifyAll(); //wake up the the daemon thread
488: //wait for the raw store daemon to wakeus up
489: //when it finihes work.
490: try {
491: wait();
492: } catch (InterruptedException ie) {
493: // someone interrupt us, see what's going on
494: }
495: }
496: }
497: }
498: }
499:
500: private synchronized boolean stopRequested() {
501: return stopRequested;
502: }
503:
504: private synchronized boolean inPause() {
505: return inPause;
506: }
507:
508: /*
509: * BasicDaemon method
510: */
511: protected synchronized void wakeUp() {
512: if (!awakened) {
513: awakened = true; // I am being awakened for urgent work.
514:
515: if (waiting) {
516: notifyAll();
517: }
518: }
519: }
520:
521: /**
522: Returns true if awakened by some notification, false if wake up by timer
523: */
524: private boolean rest() {
525: if (SanityManager.DEBUG) {
526: if (SanityManager.DEBUG_ON(DaemonService.DaemonTrace))
527: SanityManager.DEBUG(DaemonService.DaemonTrace,
528: "going back to rest");
529: }
530:
531: boolean urgentOnly;
532: boolean checkWallClock = false;
533: synchronized (this ) {
534: try {
535: if (!awakened) {
536: waiting = true;
537: wait(DaemonService.TIMER_DELAY);
538: waiting = false;
539: }
540: } catch (InterruptedException ie) {
541: // someone interrupt us, see what's going on
542: }
543:
544: nextService = 0;
545:
546: urgentOnly = awakened;
547: if (urgentOnly) // check wall clock
548: {
549: // take a guess that each early request is services every 500ms.
550: if (earlyWakeupCount++ > (DaemonService.TIMER_DELAY / 500)) {
551: earlyWakeupCount = 0;
552: checkWallClock = true;
553: }
554: }
555: awakened = false; // reset this for next time
556: }
557:
558: if (SanityManager.DEBUG) {
559: if (SanityManager.DEBUG_ON(DaemonService.DaemonTrace))
560: SanityManager.DEBUG(DaemonService.DaemonTrace,
561: urgentOnly ? "someone wakes me up"
562: : "wakes up by myself");
563: }
564:
565: if (checkWallClock) {
566: long currenttime = System.currentTimeMillis();
567: if ((currenttime - lastServiceTime) > DaemonService.TIMER_DELAY) {
568: lastServiceTime = currenttime;
569: urgentOnly = false;
570:
571: if (SanityManager.DEBUG) {
572: if (SanityManager
573: .DEBUG_ON(DaemonService.DaemonTrace))
574: SanityManager.DEBUG(DaemonService.DaemonTrace,
575: "wall clock check says service all");
576: }
577: }
578: }
579:
580: return urgentOnly;
581: }
582:
583: private void work(boolean urgentOnly) {
584: if (SanityManager.DEBUG) {
585: if (SanityManager.DEBUG_ON(DaemonService.DaemonTrace))
586: SanityManager.DEBUG(DaemonService.DaemonTrace,
587: "going back to work");
588: }
589:
590: ServiceRecord work;
591:
592: // while I am working, all serviceNow requests that comes in now will
593: // be taken care of when we get the next Assignment.
594: int serviceCount = 0;
595:
596: int yieldFactor = 10;
597: if (urgentOnly && (highPQ.size() > OPTIMAL_QUEUE_SIZE))
598: yieldFactor = 2;
599:
600: int yieldCount = OPTIMAL_QUEUE_SIZE / yieldFactor;
601:
602: for (work = nextAssignment(urgentOnly); work != null; work = nextAssignment(urgentOnly)) {
603: if (SanityManager.DEBUG) {
604: if (SanityManager.DEBUG_ON(DaemonService.DaemonTrace))
605: SanityManager.DEBUG(DaemonService.DaemonTrace,
606: "servicing " + work);
607: }
608:
609: synchronized (this ) {
610: if (inPause || stopRequested)
611: break; // don't do anything more
612: running = true;
613: }
614:
615: // do work
616: try {
617: serviceClient(work);
618: serviceCount++;
619: } finally {
620: // catch run time exceptions
621: synchronized (this ) {
622: running = false;
623: notifyAll();
624: if (inPause || stopRequested)
625: break; // don't do anything more
626: }
627: }
628:
629: if (SanityManager.DEBUG) {
630: if (SanityManager.DEBUG_ON(DaemonService.DaemonTrace))
631: SanityManager.DEBUG(DaemonService.DaemonTrace,
632: "done " + work);
633: }
634:
635: // ensure the subscribed clients get a look in once in a while
636: // when the queues are large.
637: if ((serviceCount % (OPTIMAL_QUEUE_SIZE / 2)) == 0) {
638: nextService = 0;
639: }
640:
641: if ((serviceCount % yieldCount) == 0) {
642:
643: yield();
644: }
645:
646: if (SanityManager.DEBUG) {
647: if (SanityManager.DEBUG_ON(DaemonService.DaemonTrace))
648: SanityManager.DEBUG(DaemonService.DaemonTrace,
649: "come back from yield");
650: }
651: }
652: }
653:
654: /* let everybody else run first */
655: private void yield() {
656: Thread currentThread = Thread.currentThread();
657: int oldPriority = currentThread.getPriority();
658:
659: if (oldPriority <= Thread.MIN_PRIORITY) {
660: currentThread.yield();
661: } else {
662: ModuleFactory mf = Monitor.getMonitor();
663: if (mf != null)
664: mf.setThreadPriority(Thread.MIN_PRIORITY);
665: currentThread.yield();
666: if (mf != null)
667: mf.setThreadPriority(oldPriority);
668: }
669: }
670: }
|