001: /*
002: * hammurapi-rules @mesopotamia.version@
003: * Hammurapi rules engine.
004: * Copyright (C) 2005 Hammurapi Group
005: *
006: * This program is free software; you can redistribute it and/or
007: * modify it under the terms of the GNU Lesser General Public
008: * License as published by the Free Software Foundation; either
009: * version 2 of the License, or (at your option) any later version.
010: *
011: * This program is distributed in the hope that it will be useful,
012: * but WITHOUT ANY WARRANTY; without even the implied warranty of
013: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
014: * Lesser General Public License for more details.
015: *
016: * You should have received a copy of the GNU Lesser General Public
017: * License along with this library; if not, write to the Free Software
018: * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
019: *
020: * URL: http://http://www.hammurapi.biz
021: * e-Mail: support@hammurapi.biz
022: */
023: package biz.hammurapi.dispatch;
024:
025: import java.util.Collection;
026: import java.util.LinkedList;
027:
028: import biz.hammurapi.util.Worker;
029:
030: /**
031: * This class puts results to a queue, which is processed by a worker.
032: * An internal thread is used to push jobs to worker or execute jobs if worker is null.
033: * @author Pavel Vlasov
034: * @revision $Revision$
035: */
036: public class QueuingDispatcher extends Dispatcher {
037:
038: private Thread queueProcessor;
039:
040: /**
041: * Constructor
042: * @param targets
043: * @param worker Worker to delegate dispatching jobs to. Can be null.
044: */
045: public QueuingDispatcher(Collection targets, final Worker worker) {
046: super (targets);
047:
048: queueProcessor = new Thread() {
049: public void run() {
050: while (true) {
051: Runnable job = null;
052: synchronized (queue) {
053: while (!shutdown && queue.isEmpty()) {
054: try {
055: queue.wait();
056: } catch (InterruptedException e) {
057: shutdown = true;
058: queue.clear();
059: e.printStackTrace();
060: return;
061: }
062: }
063:
064: if (shutdown && queue.isEmpty()) {
065: return;
066: }
067:
068: job = (Runnable) queue.removeFirst();
069: }
070:
071: // Try to post job to worker, execute the job if worker is null or doesn't accept the job.
072: if (worker == null || !worker.post(job)) {
073: job.run();
074: }
075: }
076: }
077: };
078:
079: queueProcessor.setName("[Dispatch Queue Processor] "
080: + queueProcessor.getName());
081: queueProcessor.start();
082: }
083:
084: protected LinkedList queue = new LinkedList();
085: private int[] jobCounter = { 0 };
086:
087: /**
088: * Allows subclasses to post non-dispatching jobs to the queue.
089: * @param job
090: */
091: protected void postJobToQueue(final Runnable foreignJob) {
092: synchronized (queue) {
093: Runnable job = new Runnable() {
094:
095: // Increment job counter.
096: {
097: synchronized (jobCounter) {
098: ++jobCounter[0];
099: }
100:
101: }
102:
103: /**
104: * dispatch result and decrement job counter.
105: */
106: public void run() {
107: try {
108: foreignJob.run();
109: } finally {
110: synchronized (jobCounter) {
111: --jobCounter[0];
112: if (jobCounter[0] == 0) {
113: jobCounter.notifyAll();
114: }
115: }
116: }
117: }
118:
119: public String toString() {
120: return "[foreign job wrapper] " + foreignJob;
121: }
122: };
123:
124: // Add job to queue and notify posting thread.
125: queue.add(job);
126: queue.notifyAll();
127: }
128: }
129:
130: protected class DispatchJob implements Runnable {
131: private Object payload;
132:
133: public Object getPayload() {
134: return payload;
135: }
136:
137: public DispatchJob(Object payload) {
138: this .payload = payload;
139:
140: synchronized (jobCounter) {
141: ++jobCounter[0];
142: }
143: }
144:
145: public void run() {
146: try {
147: QueuingDispatcher.super .dispatch(payload);
148: } finally {
149: done();
150: }
151: }
152:
153: public void done() {
154: synchronized (jobCounter) {
155: --jobCounter[0];
156: if (jobCounter[0] == 0) {
157: jobCounter.notifyAll();
158: }
159: }
160: }
161:
162: public String toString() {
163: return "[dispatch job] " + payload;
164: }
165:
166: }
167:
168: /**
169: * Puts argument to dispatching queue.
170: * The queue is processed by an internal thread, which posts dispatching jobs to worker or
171: * processes them itself if worker is null.
172: */
173: public void dispatch(final Object arg) {
174: if (arg != null) {
175: if (shutdown) {
176: throw new IllegalStateException(
177: "Dispatcher is shut down");
178: }
179: synchronized (queue) {
180: // Add job to queue and notify posting thread.
181: queue.add(new DispatchJob(arg));
182: queue.notifyAll();
183: }
184: }
185: }
186:
187: /**
188: * Blocks until all jobs are processed.
189: * @throws InterruptedException
190: */
191: public void join() throws InterruptedException {
192: synchronized (jobCounter) {
193: while (jobCounter[0] != 0) {
194: jobCounter.wait();
195: }
196: }
197: }
198:
199: private boolean shutdown = false;
200:
201: /**
202: * Processes all jobs and stops queue processing thread.
203: * @throws InterruptedException
204: *
205: */
206: public void stop() throws InterruptedException {
207: shutdown = true;
208: //join();
209:
210: // Wakes up processing thread if queue is empty.
211: synchronized (queue) {
212: if (queue.isEmpty()) {
213: queue.notifyAll();
214: }
215: }
216: queueProcessor.join();
217:
218: }
219: }
|