001: /* ====================================================================
002: * The Jcorporate Apache Style Software License, Version 1.2 05-07-2002
003: *
004: * Copyright (c) 1995-2002 Jcorporate Ltd. All rights reserved.
005: *
006: * Redistribution and use in source and binary forms, with or without
007: * modification, are permitted provided that the following conditions
008: * are met:
009: *
010: * 1. Redistributions of source code must retain the above copyright
011: * notice, this list of conditions and the following disclaimer.
012: *
013: * 2. Redistributions in binary form must reproduce the above copyright
014: * notice, this list of conditions and the following disclaimer in
015: * the documentation and/or other materials provided with the
016: * distribution.
017: *
018: * 3. The end-user documentation included with the redistribution,
019: * if any, must include the following acknowledgment:
020: * "This product includes software developed by Jcorporate Ltd.
021: * (http://www.jcorporate.com/)."
022: * Alternately, this acknowledgment may appear in the software itself,
023: * if and wherever such third-party acknowledgments normally appear.
024: *
025: * 4. "Jcorporate" and product names such as "Expresso" must
026: * not be used to endorse or promote products derived from this
027: * software without prior written permission. For written permission,
028: * please contact info@jcorporate.com.
029: *
030: * 5. Products derived from this software may not be called "Expresso",
031: * or other Jcorporate product names; nor may "Expresso" or other
032: * Jcorporate product names appear in their name, without prior
033: * written permission of Jcorporate Ltd.
034: *
035: * 6. No product derived from this software may compete in the same
036: * market space, i.e. framework, without prior written permission
037: * of Jcorporate Ltd. For written permission, please contact
038: * partners@jcorporate.com.
039: *
040: * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
041: * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
042: * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
043: * DISCLAIMED. IN NO EVENT SHALL JCORPORATE LTD OR ITS CONTRIBUTORS
044: * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
045: * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
046: * TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
047: * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
048: * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
049: * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
050: * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
051: * SUCH DAMAGE.
052: * ====================================================================
053: *
054: * This software consists of voluntary contributions made by many
055: * individuals on behalf of the Jcorporate Ltd. Contributions back
056: * to the project(s) are encouraged when you make modifications.
057: * Please send them to support@jcorporate.com. For more information
058: * on Jcorporate Ltd. and its products, please see
059: * <http://www.jcorporate.com/>.
060: *
061: * Portions of this software are based upon other open source
062: * products and are subject to their respective licenses.
063: */
064:
065: package com.jcorporate.expresso.services.asyncprocess;
066:
067: import com.jcorporate.expresso.core.db.DBException;
068: import com.jcorporate.expresso.core.misc.StringUtil;
069: import com.jcorporate.expresso.core.registry.MutableRequestRegistry;
070: import com.jcorporate.expresso.core.registry.RequestRegistry;
071: import com.jcorporate.expresso.core.security.User;
072: import com.jcorporate.expresso.services.dbobj.Setup;
073: import org.apache.log4j.Logger;
074:
075: import java.util.HashMap;
076: import java.util.LinkedList;
077: import java.util.Map;
078:
079: /**
080: * This is a default implementation of the AsyncProcessor. It has specific
081: * claim timeout, specific threads and queue size, and a particular order of
082: * dealing with things. A different kind of implementation might, for example,
083: * dispatch the requests to another machine for processing and return the result.
084: * <p/>
085: * This asynchronous processor relies on the set up tables for the following parameters.
086: * parameters are missing from the set up table then the default values will be used instead.
087: * These parameters are:
088: * <ul>
089: * <li><b>AsyncClaimTimeout</b>: when the job is completed there is always the chance
090: * that it will never be claimed by the client a for example if they close their
091: * browser window. AsyncClaimTimeout specifies in milliseconds how long to wait
092: * before discarding
093: * a completed job. The default value is 30 minutes.</li>
094: * <li><b>AsyncNumThreads</b>: This value specifies how many threads
095: * the asynchronous processor has available to process waiting processes.
096: * the default value for this is 10 threads.</li>
097: * <li><b>AsyncQueueSize</b>: When all threads are occupied with running processes
098: * this parameter specifies how he processes to hold in a waiting queue
099: * before throwing a queue full exception. The default value for this
100: * is 20.</li>
101: * </ul>
102: * </p>
103: *
104: * @author Michael Rimov
105: * @version $Revision: 1.12 $ on $Date: 2004/11/17 20:48:17 $
106: */
107: public class DefaultAsyncProcessor implements AsyncProcessor {
108:
109: public static final String DEFAULT_PROCESSSOR_PROPERTIES_FILE = "DefaultProcessor.properties";
110: private int nextIssuedTicket = 0;
111:
112: private LinkedList waitQueue = new LinkedList();
113:
114: private Map allProcesses = new HashMap();
115:
116: private ProcessThread processThreads[];
117:
118: private ThreadGroup processThreadGroup = new ThreadGroup(
119: "AsyncProcess Threads");
120:
121: long claimTimeout = 30000;
122:
123: int numThreads = 10;
124:
125: int queueSize = 20;
126:
127: private static final Logger log = Logger
128: .getLogger(DefaultAsyncProcessor.class);
129:
130: public DefaultAsyncProcessor() {
131: try {
132: // props.load(this.getClass()
133: // .getResourceAsStream("DefaultProcessor.properties"))
134: claimTimeout = Long.parseLong((StringUtil.notNull(
135: Setup.getValue("default", "AsyncClaimTimeout"))
136: .length() != 0) ? Setup.getValue("default",
137: "AsyncClaimTimeout") : "30000");
138: // claimTimeout = Long.parseLong(
139: // props.getProperty("claimTimeout","30000").trim());
140:
141: numThreads = Integer.parseInt((StringUtil.notNull(
142: Setup.getValue("default", "AsyncNumThreads"))
143: .length() != 0) ? Setup.getValue("default",
144: "AsyncNumThreads") : "10");
145: // numThreads = Integer.parseInt(props.getProperty("numThreads","10").trim());
146:
147: queueSize = Integer.parseInt((StringUtil.notNull(
148: Setup.getValue("default", "AsyncQueueSize"))
149: .length() != 0) ? Setup.getValue("default",
150: "AsyncQueueSize") : "20");
151:
152: // queueSize = Integer.parseInt(props.getProperty("queueSize","20").trim());
153: } catch (DBException ex) {
154: log.error("Error loading properties", ex);
155: } catch (NumberFormatException ex) {
156: log
157: .error("Error parsing setup values for Async processor. "
158: + "Has this DB gone through initial setup? Using default "
159: + "values instead.");
160: }
161:
162: processThreads = new ProcessThread[numThreads];
163: for (int i = 0; i < numThreads; i++) {
164: processThreads[i] = new ProcessThread(this ,
165: processThreadGroup, "Processor" + i);
166: }
167:
168: if (log.isInfoEnabled()) {
169: log.info("Starting processing threads");
170: }
171:
172: for (int i = 0; i < numThreads; i++) {
173: processThreads[i].start();
174: }
175: }
176:
177: /**
178: * Destroys all threads;
179: */
180: public synchronized void destroy() {
181: for (int i = 0; i < numThreads; i++) {
182: Thread aThread = processThreads[i];
183: if (aThread != null) {
184: aThread.interrupt();
185: }
186: }
187:
188: for (int i = 0; i < numThreads; i++) {
189: Thread aThread = processThreads[i];
190: if (aThread != null) {
191: try {
192: if (aThread != null) {
193: aThread.join(1000);
194: }
195: } catch (InterruptedException ex) {
196: log
197: .info(
198: "Interrupted while waiting for process thread",
199: ex);
200: }
201:
202: if (aThread.isAlive()) {
203: log
204: .warn("After waiting a second, the process thread: "
205: + aThread.getName()
206: + " has still not exited");
207: }
208: }
209: }
210:
211: processThreads = null;
212: if (processThreadGroup != null) {
213: processThreadGroup.destroy();
214: }
215: }
216:
217: /**
218: * Add an asynchronous process ot the queue
219: *
220: * @param newProcess the AsyncProcess implementation to run
221: * @return AsyncTicket object for use in reeming the process' status.
222: * @throws QueueFullException if there are too many jobs waiting.
223: */
224: public synchronized AsyncTicket addToQueue(AsyncProcess newProcess)
225: throws QueueFullException {
226: if (processThreads == null) {
227: throw new IllegalStateException(
228: "Async Processor has already shut down");
229: }
230:
231: nextIssuedTicket++;
232: DefaultTicket ticket = new DefaultTicket(nextIssuedTicket);
233: ProcessWrapper wrapper = new ProcessWrapper(newProcess, ticket);
234: wrapper.getResult().setStatusCode(
235: AsyncProcessResult.STATUS_PENDING);
236: DefaultTicket theTicket = new DefaultTicket(nextIssuedTicket);
237:
238: synchronized (allProcesses) {
239: allProcesses.put(theTicket, wrapper);
240: }
241:
242: synchronized (waitQueue) {
243: if (waitQueue.size() > queueSize) {
244: throw new QueueFullException("Queue already has "
245: + waitQueue.size()
246: + " processes waiting. Cannot continue");
247: }
248:
249: waitQueue.addLast(wrapper);
250: waitQueue.notify();
251: }
252: return theTicket;
253: }
254:
255: /**
256: * Similar to addToQueue(AsyncProcess), but it tries to wait for the specified
257: * period of time before returning to see if the process completes during
258: * that time.
259: *
260: * @param newProcess The AsyncProcess to add to the queue
261: * @param waitTimeout the number of milliseconds to wait to see if the process
262: * completes in that time.
263: * @return AsyncTicket instance.
264: */
265:
266: public AsyncTicket addToQueue(AsyncProcess newProcess,
267: long waitTimeout) throws QueueFullException {
268:
269: AsyncTicket ticket = addToQueue(newProcess);
270: synchronized (newProcess) {
271: try {
272: newProcess.wait(waitTimeout);
273: } catch (InterruptedException ex) {
274: log.info("Interrupted while waiting for process", ex);
275: return ticket;
276: }
277: }
278:
279: synchronized (allProcesses) {
280: ProcessWrapper wrapper = this .getProcessWrapper(ticket);
281: if (wrapper == null) {
282: return null;
283: }
284:
285: if (wrapper.getResult().getStatusCode() == AsyncProcessResult.STATUS_COMPLETE) {
286: this .getProcessResult(ticket);
287: return new DefaultTicket(-1);
288: }
289: }
290:
291: return ticket;
292: }
293:
294: /**
295: * Private wrapper.... MAKE SURE you synchronize all Processes before
296: * calling.
297: *
298: * @param ticketId the Async ticket to redeem
299: * @return ProcessWrapper or null if it doesn't exist.
300: */
301: private ProcessWrapper getProcessWrapper(AsyncTicket ticketId) {
302: ProcessWrapper wrapper = (ProcessWrapper) allProcesses
303: .get(ticketId);
304: if (wrapper == null) {
305: return null;
306: }
307:
308: return wrapper;
309: }
310:
311: /**
312: * Retrieve the queue. This function purposefully has package access.
313: *
314: * @return java.util.LinkedList
315: */
316: LinkedList getQueue() {
317: return waitQueue;
318: }
319:
320: /**
321: * Clean out any processes that have been completed over the timeout interval
322: * and nobody has claimed it.
323: */
324: // private synchronized void cleanProcesses() {
325: // long curTime = System.currentTimeMillis();
326: // synchronized(allProcesses) {
327: // for (Iterator i = allProcesses.values().iterator(); i.hasNext();) {
328: // ProcessWrapper oneProcess = (ProcessWrapper)i.next();
329: // if (oneProcess.getCompletedTime() + claimTimeout > curTime) {
330: // allProcesses.remove(oneProcess.getObjectId());
331: // }
332: // }
333: // }
334: // }
335:
336: /**
337: * Wrapper object for the process queue
338: *
339: * @author Michael Rimov
340: * @version $Revision: 1.12 $ on $Date: 2004/11/17 20:48:17 $
341: */
342: class ProcessWrapper {
343: private AsyncProcess wrappedObject;
344:
345: DefaultTicket objectId;
346:
347: DefaultProcessResult result;
348:
349: long queueTime;
350:
351: long startTime;
352:
353: long completedTime;
354:
355: private String defaultDataContext;
356:
357: private User defaultUser;
358:
359: public ProcessWrapper(AsyncProcess objectToWrap,
360: DefaultTicket newObjectId) {
361: queueTime = System.currentTimeMillis();
362: wrappedObject = objectToWrap;
363: objectId = newObjectId;
364: result = new DefaultProcessResult();
365: result.setOriginalProcess(objectToWrap);
366:
367: //This object is created in one thread and run in another
368: //So we propagate the thread capabilities.
369: defaultDataContext = RequestRegistry.getDataContext();
370: defaultUser = RequestRegistry.getUser();
371: }
372:
373: public AsyncProcess getWrappedObject() {
374: return wrappedObject;
375: }
376:
377: public DefaultProcessResult getResult() {
378: return result;
379: }
380:
381: /**
382: * Do the actual processing. Any exceptions are caught and saved
383: * in the process object itself and status fault is set.
384: */
385: public void process() {
386: //Set the context for this thread to be that of the
387: //spawning thread.
388: new MutableRequestRegistry(defaultDataContext, defaultUser);
389:
390: startTime = System.currentTimeMillis();
391: this .getResult().setStatusCode(
392: AsyncProcessResult.STATUS_RUNNING);
393: try {
394: wrappedObject.process();
395: } catch (Throwable ex) {
396: this .getResult().setException(ex);
397: this .getResult().setStatusCode(
398: AsyncProcessResult.STATUS_FAULT);
399: }
400: completedTime = System.currentTimeMillis();
401: this .getResult().setStatusCode(
402: AsyncProcessResult.STATUS_COMPLETE);
403:
404: //
405: //If somebody is waiting on the object, notify them so that
406: //they can continue.
407: //
408: synchronized (wrappedObject) {
409: wrappedObject.notify();
410: }
411:
412: //
413: //Most jobs tend to require quite a bit of memory. Schedule a GC
414: //as soon as the system can to clear the cruft left behind by
415: //the job.
416: //
417: System.gc();
418: }
419:
420: public long getQueueTime() {
421: return queueTime;
422: }
423:
424: public long getCompletedTime() {
425: return completedTime;
426: }
427:
428: public DefaultTicket getObjectId() {
429: return objectId;
430: }
431:
432: }
433:
434: /**
435: * Retrieve the result of the process
436: *
437: * @param ticketId the ticket id of the process
438: * @return java.lang.Object
439: */
440: public AsyncProcessResult getProcessResult(AsyncTicket ticketId) {
441: synchronized (allProcesses) {
442: ProcessWrapper wrapper = getProcessWrapper(ticketId);
443:
444: if (wrapper == null) {
445: log.warn("Error getting process wrapper for ticket: "
446: + ticketId.toString());
447: return null;
448: }
449:
450: if (wrapper.getResult().getStatusCode() == AsyncProcessResult.STATUS_COMPLETE) {
451: allProcesses.remove(ticketId);
452: }
453:
454: return wrapper.getResult();
455: }
456: }
457:
458: /**
459: * Retrieve the result of the process waiting up to a specified time for the
460: * process to complete
461: *
462: * @param ticketId the ticket ID
463: * @param waitTimeout the time in ms to wait for the process to complete before
464: * returning.
465: * @return AsyncProcessResult
466: */
467: public AsyncProcessResult getProcessResult(AsyncTicket ticketId,
468: long waitTimeout) {
469: ProcessWrapper wrapper;
470:
471: synchronized (allProcesses) {
472: wrapper = getProcessWrapper(ticketId);
473:
474: if (wrapper == null) {
475: log.warn("Didn't get a process warpper for ticket: "
476: + ticketId.toString());
477: return null;
478: }
479:
480: if (wrapper.getResult().getStatusCode() == AsyncProcessResult.STATUS_COMPLETE) {
481: allProcesses.remove(ticketId);
482: return wrapper.getResult();
483: }
484: }
485:
486: AsyncProcess process = wrapper.getWrappedObject();
487: synchronized (process) {
488: try {
489: process.wait(waitTimeout);
490: } catch (InterruptedException ex) {
491: log.info("Interrupted while waiting for process", ex);
492: }
493: }
494:
495: if (wrapper.getResult().getStatusCode() == AsyncProcessResult.STATUS_COMPLETE) {
496: allProcesses.remove(ticketId);
497: }
498:
499: return wrapper.getResult();
500: }
501:
502: }
|