001: //The contents of this file are subject to the Mozilla Public License Version 1.1
002: //(the "License"); you may not use this file except in compliance with the
003: //License. You may obtain a copy of the License at http://www.mozilla.org/MPL/
004: //
005: //Software distributed under the License is distributed on an "AS IS" basis,
006: //WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
007: //for the specific language governing rights and
008: //limitations under the License.
009: //
010: //The Original Code is "The Columba Project"
011: //
012: //The Initial Developers of the Original Code are Frederik Dietz and Timo Stich.
013: //Portions created by Frederik Dietz and Timo Stich are Copyright (C) 2003.
014: //
015: //All Rights Reserved.
016: package org.columba.core.command;
017:
018: import java.util.ArrayList;
019: import java.util.List;
020: import java.util.logging.Logger;
021:
022: import org.columba.api.command.ICommand;
023: import org.columba.core.base.Mutex;
024: import org.columba.core.gui.exception.ExceptionHandler;
025:
026: /**
027: * Scheduler for background threads
028: * <p>
029: * DefaultProcessor keeps a pool of {@link Worker}instances, which are assigned
030: * to {@link Command}, when executed.
031: *
032: * @author tstich
033: */
034: public class CommandProcessor implements Runnable {
035: /** JDK 1.4+ logging framework logger, used for logging. */
036: private static final Logger LOG = Logger
037: .getLogger("org.columba.api.command"); //$NON-NLS-1$
038:
039: public final static int MAX_WORKERS = 5;
040:
041: List<OperationItem> operationQueue;
042:
043: List<Worker> worker;
044:
045: private Mutex oneMutex;
046:
047: private int timeStamp;
048:
049: private boolean stopped = false;
050:
051: private static CommandProcessor instance = new CommandProcessor();
052:
053: public CommandProcessor() {
054: this (true);
055: }
056:
057: public static CommandProcessor getInstance() {
058: return instance;
059: }
060:
061: /**
062: * Constructs a DefaultProcessor.
063: */
064: public CommandProcessor(boolean start) {
065: operationQueue = new ArrayList<OperationItem>(10);
066:
067: worker = new ArrayList<Worker>(MAX_WORKERS);
068:
069: // Create the workers
070: for (int i = 0; i < MAX_WORKERS; i++) {
071: Worker w = new Worker(this );
072: w.addExceptionListener(new ExceptionHandler());
073: worker.add(w);
074:
075: }
076:
077: oneMutex = new Mutex();
078:
079: timeStamp = 0;
080:
081: if (start)
082: new Thread(this ).start();
083: }
084:
085: /**
086: * Add a Command to the Queue. Calls {@link #addOp(Command, int)}with
087: * Command.FIRST_EXECUTION.
088: *
089: * @param op
090: * the command to add
091: */
092: public void addOp(final Command op) {
093: addOp(op, Command.FIRST_EXECUTION);
094: }
095:
096: /**
097: * Adds a Command to the queue.
098: *
099: * @param op
100: * the command
101: * @param operationMode
102: * the mode in wich the command should be processed
103: */
104: public void addOp(final Command op, final int operationMode) {
105: try {
106: oneMutex.lock();
107:
108: LOG.finest("Command " + op.toString() + " added"); //$NON-NLS-1$ //$NON-NLS-2$
109:
110: int p = operationQueue.size() - 1;
111: OperationItem nextOp;
112:
113: // Sort in with respect to priority and synchronize:
114: // Commands with higher priority will be processed
115: // before commands with lower priority.
116: // If there is a command that is of type synchronize
117: // don't put this command in front.
118: while (p != -1) {
119: nextOp = operationQueue.get(p);
120:
121: if ((nextOp.getOperation().getPriority() < op
122: .getPriority())
123: && !nextOp.getOperation().isSynchronize()) {
124: p--;
125: } else {
126: break;
127: }
128: }
129:
130: operationQueue.add(p + 1, new OperationItem(op,
131: operationMode));
132: } finally {
133: oneMutex.release();
134: }
135:
136: wakeUp();
137: }
138:
139: /**
140: * Checks if the command can be processed. This is true if all
141: * references are not blocked.
142: *
143: * @param opItem
144: * the internal command structure
145: * @return true if the operation will not be blocked
146: */
147: private boolean canBeProcessed(final OperationItem opItem) {
148: return opItem.getOperation().canBeProcessed();
149: }
150:
151: /**
152: * Get the next Operation from the queue.
153: *
154: * @return the next non-blocking operation or null if none found.
155: */
156: private OperationItem nextOpItem() {
157: OperationItem nextOp = null;
158:
159: for (int i = 0; i < operationQueue.size() && nextOp == null; i++) {
160: nextOp = operationQueue.get(i);
161:
162: if ((i != 0) && (nextOp.getOperation().isSynchronize())) {
163: nextOp = null;
164:
165: // We have to process this command first
166: // -> break here!
167: break;
168: }
169:
170: try {
171: if (!canBeProcessed(nextOp)) {
172: nextOp = null;
173: }
174: } catch (RuntimeException e) {
175: // Remove bogus Operation
176: operationQueue.remove(nextOp);
177: nextOp = null;
178:
179: LOG.warning("Operation failed: " + e.getMessage()); //$NON-NLS-1$
180: }
181:
182: }
183:
184: return nextOp;
185: }
186:
187: /**
188: * Called by the worker to signal that his operation has finished.
189: *
190: * @param op
191: * the command the worker has processed
192: * @param w
193: * the worker himself
194: */
195: public void operationFinished(final ICommand op, final Worker w) {
196:
197: try {
198: oneMutex.lock();
199:
200: worker.add(w);
201: } finally {
202: oneMutex.release();
203: }
204:
205: // notify that a new worker is available
206: wakeUp();
207: }
208:
209: /**
210: * Get an available Worker from the workerpool. Reserve one worker for
211: * Real-Time Priority tasks
212: *
213: * @param priority
214: * @return an available worker or null if none available.
215: */
216: Worker getWorker(int priority) {
217: Worker result = null;
218: if (worker.size() > 1) {
219: result = worker.remove(0);
220: } else if (worker.size() > 0
221: && priority >= Command.REALTIME_PRIORITY) {
222: result = worker.remove(0);
223: }
224:
225: return result;
226: }
227:
228: /**
229: * Wait until a worker is available or a new command is added.
230: */
231: private synchronized void waitForNotify() {
232: try {
233: wait();
234: } catch (InterruptedException e) {
235: e.printStackTrace();
236: }
237: }
238:
239: private synchronized void wakeUp() {
240: notifyAll();
241: }
242:
243: /**
244: * @see java.lang.Runnable#run()
245: */
246: public void run() {
247: boolean sleep;
248:
249: while (true && !stopped) {
250: sleep = startOperation();
251:
252: if (sleep) {
253: waitForNotify();
254: sleep = false;
255: }
256: }
257:
258: }
259:
260: /**
261: * @param sleep
262: * @return
263: */
264: boolean startOperation() {
265: boolean sleep = false;
266: try {
267: oneMutex.lock();
268: OperationItem _opItem;
269: Worker _worker;
270: _opItem = nextOpItem();
271: if (_opItem != null && !stopped) {
272: _worker = getWorker(_opItem.getOperation()
273: .getPriority());
274: if (_worker != null && !stopped) {
275: operationQueue.remove(_opItem);
276:
277: _worker.process(_opItem.getOperation(), _opItem
278: .getOperationMode(), timeStamp++);
279:
280: _worker.start();
281: } else {
282: sleep = true;
283: }
284: } else {
285: sleep = true;
286: }
287: } finally {
288: oneMutex.release();
289: }
290: return sleep;
291: }
292:
293: public synchronized void stop() {
294: stopped = true;
295: notify();
296: }
297:
298: }
299:
300: /**
301: * Intern represenation of the Commands.
302: *
303: * @author Timo Stich <tstich@users.sourceforge.net>
304: */
305:
306: class OperationItem {
307: private Command operation;
308:
309: private int operationMode;
310:
311: public OperationItem(Command op, int opMode) {
312: operation = op;
313: operationMode = opMode;
314: }
315:
316: public Command getOperation() {
317: return operation;
318: }
319:
320: public int getOperationMode() {
321: return operationMode;
322: }
323: }
|