001: package org.objectweb.celtix.bus.workqueue;
002:
003: import java.util.concurrent.ArrayBlockingQueue;
004: import java.util.concurrent.RejectedExecutionException;
005: import java.util.concurrent.ThreadPoolExecutor;
006: import java.util.concurrent.TimeUnit;
007: import java.util.logging.Level;
008: import java.util.logging.Logger;
009:
010: import org.objectweb.celtix.common.logging.LogUtils;
011: import org.objectweb.celtix.workqueue.AutomaticWorkQueue;
012:
013: public class AutomaticWorkQueueImpl extends ThreadPoolExecutor
014: implements AutomaticWorkQueue {
015:
016: static final int DEFAULT_MAX_QUEUE_SIZE = 128;
017: private static final Logger LOG = LogUtils
018: .getL7dLogger(AutomaticWorkQueueImpl.class);
019:
020: int maxQueueSize;
021:
022: AutomaticWorkQueueImpl(int mqs, int initialThreads,
023: int highWaterMark, int lowWaterMark, long dequeueTimeout) {
024:
025: super (
026: -1 == lowWaterMark ? Integer.MAX_VALUE : lowWaterMark,
027: -1 == highWaterMark ? Integer.MAX_VALUE : highWaterMark,
028: TimeUnit.MILLISECONDS.toMillis(dequeueTimeout),
029: TimeUnit.MILLISECONDS,
030: mqs == -1 ? new ArrayBlockingQueue<Runnable>(
031: DEFAULT_MAX_QUEUE_SIZE)
032: : new ArrayBlockingQueue<Runnable>(mqs));
033:
034: maxQueueSize = mqs == -1 ? DEFAULT_MAX_QUEUE_SIZE : mqs;
035: lowWaterMark = -1 == lowWaterMark ? Integer.MAX_VALUE
036: : lowWaterMark;
037: highWaterMark = -1 == highWaterMark ? Integer.MAX_VALUE
038: : highWaterMark;
039:
040: StringBuffer buf = new StringBuffer();
041: buf.append("Constructing automatic work queue with:\n");
042: buf.append("max queue size: " + maxQueueSize + "\n");
043: buf.append("initialThreads: " + initialThreads + "\n");
044: buf.append("lowWaterMark: " + lowWaterMark + "\n");
045: buf.append("highWaterMark: " + highWaterMark + "\n");
046: LOG.fine(buf.toString());
047:
048: if (initialThreads > highWaterMark) {
049: initialThreads = highWaterMark;
050: }
051:
052: // as we cannot prestart more core than corePoolSize initial threads, we temporarily
053: // change the corePoolSize to the number of initial threads
054: // this is important as otherwise these threads will be created only when the queue has filled up,
055: // potentially causing problems with starting up under heavy load
056: if (initialThreads < Integer.MAX_VALUE && initialThreads > 0) {
057: setCorePoolSize(initialThreads);
058: int started = prestartAllCoreThreads();
059: if (started < initialThreads) {
060: LOG.log(Level.WARNING, "THREAD_START_FAILURE_MSG",
061: new Object[] { started, initialThreads });
062: }
063: setCorePoolSize(lowWaterMark);
064: }
065: }
066:
067: public String toString() {
068: StringBuffer buf = new StringBuffer();
069: buf.append(super .toString());
070: buf.append(" [queue size: ");
071: buf.append(getSize());
072: buf.append(", max size: ");
073: buf.append(maxQueueSize);
074: buf.append(", threads: ");
075: buf.append(getPoolSize());
076: buf.append(", active threads: ");
077: buf.append(getActiveCount());
078: buf.append(", low water mark: ");
079: buf.append(getLowWaterMark());
080: buf.append(", high water mark: ");
081: buf.append(getHighWaterMark());
082: buf.append("]");
083: return buf.toString();
084: }
085:
086: // WorkQueue interface
087:
088: /* (non-Javadoc)
089: * @see org.objectweb.celtix.workqueue.WorkQueue#execute(java.lang.Runnable, long)
090: */
091: public void execute(Runnable work, long timeout) {
092: try {
093: execute(work);
094: } catch (RejectedExecutionException ree) {
095: try {
096: getQueue().offer(work, timeout, TimeUnit.MILLISECONDS);
097: } catch (InterruptedException ie) {
098: throw new RejectedExecutionException(ie);
099: }
100: }
101: }
102:
103: /* (non-Javadoc)
104: * @see org.objectweb.celtix.workqueue.WorkQueue#schedule(java.lang.Runnable, long)
105: */
106: public void schedule(final Runnable work, final long delay) {
107: // temporary implementation, replace with shared long-lived scheduler
108: // task
109: execute(new Runnable() {
110: public void run() {
111: try {
112: Thread.sleep(delay);
113: } catch (InterruptedException ie) {
114: // ignore
115: }
116: work.run();
117: }
118: });
119: }
120:
121: // AutomaticWorkQueue interface
122:
123: public void shutdown(boolean processRemainingWorkItems) {
124: if (!processRemainingWorkItems) {
125: getQueue().clear();
126: }
127: shutdown();
128: }
129:
130: /**
131: * Gets the maximum size (capacity) of the backing queue.
132: * @return the maximum size (capacity) of the backing queue.
133: */
134: long getMaxSize() {
135: return maxQueueSize;
136: }
137:
138: /**
139: * Gets the current size of the backing queue.
140: * @return the current size of the backing queue.
141: */
142: public long getSize() {
143: return getQueue().size();
144: }
145:
146: public boolean isEmpty() {
147: return getQueue().size() == 0;
148: }
149:
150: boolean isFull() {
151: return getQueue().remainingCapacity() == 0;
152: }
153:
154: int getHighWaterMark() {
155: int hwm = getMaximumPoolSize();
156: return hwm == Integer.MAX_VALUE ? -1 : hwm;
157: }
158:
159: int getLowWaterMark() {
160: int lwm = getCorePoolSize();
161: return lwm == Integer.MAX_VALUE ? -1 : lwm;
162: }
163:
164: void setHighWaterMark(int hwm) {
165: setMaximumPoolSize(hwm < 0 ? Integer.MAX_VALUE : hwm);
166: }
167:
168: void setLowWaterMark(int lwm) {
169: setCorePoolSize(lwm < 0 ? 0 : lwm);
170: }
171: }
|