001: package org.objectweb.celtix.bus.workqueue;
002:
003: import java.util.logging.Logger;
004:
005: import org.objectweb.celtix.Bus;
006: import org.objectweb.celtix.bus.busimpl.ComponentCreatedEvent;
007: import org.objectweb.celtix.bus.busimpl.ComponentRemovedEvent;
008: import org.objectweb.celtix.workqueue.AutomaticWorkQueue;
009: import org.objectweb.celtix.workqueue.WorkQueueManager;
010:
011: public class WorkQueueManagerImpl implements WorkQueueManager {
012:
013: private static final Logger LOG = Logger
014: .getLogger(WorkQueueManagerImpl.class.getName());
015:
016: ThreadingModel threadingModel = ThreadingModel.MULTI_THREADED;
017: AutomaticWorkQueue autoQueue;
018: boolean inShutdown;
019: Bus bus;
020:
021: public WorkQueueManagerImpl(Bus b) {
022: bus = b;
023: }
024:
025: /*
026: * (non-Javadoc)
027: *
028: * @see org.objectweb.celtix.workqueue.WorkQueueManager#getAutomaticWorkQueue()
029: */
030: public synchronized AutomaticWorkQueue getAutomaticWorkQueue() {
031: if (autoQueue == null) {
032: autoQueue = createAutomaticWorkQueue();
033: bus.sendEvent(new ComponentCreatedEvent(this ));
034: }
035: return autoQueue;
036: }
037:
038: /*
039: * (non-Javadoc)
040: *
041: * @see org.objectweb.celtix.workqueue.WorkQueueManager#getThreadingModel()
042: */
043: public ThreadingModel getThreadingModel() {
044: return threadingModel;
045: }
046:
047: /*
048: * (non-Javadoc)
049: *
050: * @see org.objectweb.celtix.workqueue.WorkQueueManager#setThreadingModel(
051: * org.objectweb.celtix.workqueue.WorkQueueManager.ThreadingModel)
052: */
053: public void setThreadingModel(ThreadingModel model) {
054: threadingModel = model;
055: }
056:
057: /*
058: * (non-Javadoc)
059: *
060: * @see org.objectweb.celtix.workqueue.WorkQueueManager#shutdown(boolean)
061: */
062: public synchronized void shutdown(boolean processRemainingTasks) {
063: inShutdown = true;
064: if (autoQueue != null) {
065: autoQueue.shutdown(processRemainingTasks);
066: }
067:
068: //sent out remove event.
069:
070: bus.sendEvent(new ComponentRemovedEvent(this ));
071:
072: synchronized (this ) {
073: notifyAll();
074: }
075: }
076:
077: public void run() {
078: synchronized (this ) {
079: while (!inShutdown) {
080: try {
081: wait();
082: } catch (InterruptedException ex) {
083: // ignore
084: }
085: }
086: while (autoQueue != null && !autoQueue.isShutdown()) {
087: try {
088: Thread.sleep(100);
089: } catch (InterruptedException ex) {
090: // ignore
091: }
092: }
093: }
094: for (java.util.logging.Handler h : LOG.getHandlers()) {
095: h.flush();
096: }
097:
098: //sent out creation event.
099:
100: }
101:
102: private AutomaticWorkQueue createAutomaticWorkQueue() {
103:
104: // Configuration configuration = bus.getConfiguration();
105:
106: // configuration.getInteger("threadpool:initial_threads");
107: int initialThreads = 1;
108:
109: // int lwm = configuration.getInteger("threadpool:low_water_mark");
110: int lwm = 5;
111:
112: // int hwm = configuration.getInteger("threadpool:high_water_mark");
113: int hwm = 25;
114:
115: // configuration.getInteger("threadpool:max_queue_size");
116: int maxQueueSize = 10 * hwm;
117:
118: // configuration.getInteger("threadpool:dequeue_timeout");
119: long dequeueTimeout = 2 * 60 * 1000L;
120:
121: return new AutomaticWorkQueueImpl(maxQueueSize, initialThreads,
122: hwm, lwm, dequeueTimeout);
123:
124: }
125:
126: }
|