001: /**
002: * JOnAS: Java(TM) Open Application Server
003: * Copyright (C) 1999 Bull S.A.
004: * Contact: jonas-team@objectweb.org
005: *
006: * This library is free software; you can redistribute it and/or
007: * modify it under the terms of the GNU Lesser General Public
008: * License as published by the Free Software Foundation; either
009: * version 2.1 of the License, or any later version.
010: *
011: * This library is distributed in the hope that it will be useful,
012: * but WITHOUT ANY WARRANTY; without even the implied warranty of
013: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
014: * Lesser General Public License for more details.
015: *
016: * You should have received a copy of the GNU Lesser General Public
017: * License along with this library; if not, write to the Free Software
018: * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
019: * USA
020: *
021: * --------------------------------------------------------------------------
022: * $Id: JWorkManager.java 7137 2005-07-28 12:00:42Z durieuxp $
023: * --------------------------------------------------------------------------
024: */package org.objectweb.jonas_lib;
025:
026: import java.util.LinkedList;
027:
028: import javax.resource.spi.work.ExecutionContext;
029: import javax.resource.spi.work.Work;
030: import javax.resource.spi.work.WorkCompletedException;
031: import javax.resource.spi.work.WorkEvent;
032: import javax.resource.spi.work.WorkException;
033: import javax.resource.spi.work.WorkListener;
034: import javax.resource.spi.work.WorkManager;
035: import javax.resource.spi.work.WorkRejectedException;
036: import javax.transaction.NotSupportedException;
037: import javax.transaction.SystemException;
038: import javax.transaction.xa.Xid;
039:
040: import org.objectweb.jonas.common.Log;
041: import org.objectweb.jotm.Current;
042: import org.objectweb.transaction.jta.TransactionManager;
043: import org.objectweb.util.monolog.api.BasicLevel;
044: import org.objectweb.util.monolog.api.Logger;
045:
046: /**
047: * Jonas Implementation of the Resource Work Manager
048: * @author durieuxp
049: */
050: public class JWorkManager implements WorkManager {
051:
052: protected LinkedList workList = new LinkedList();
053:
054: protected static int poolnumber = 0;
055: protected static int threadnumber = 0;
056:
057: protected int maxpoolsz;
058: protected int minpoolsz;
059: protected int poolsz; // current size of thread pool
060: protected int freeThreads; // free threads ready to work
061: protected long waitingTime; // in millisec
062:
063: protected boolean valid = true; // set to false when WorkManager is removed.
064:
065: protected static final long FEW_MORE_SECONDS = 3000;
066:
067: private static Logger logger = null;
068:
069: private TransactionManager tm;
070:
071: /**
072: * Constructor
073: * @param threadwait max time in seconds a thread will wait
074: */
075: public JWorkManager(int minsz, int maxsz, TransactionManager tm,
076: long threadwait) {
077: minpoolsz = minsz;
078: maxpoolsz = maxsz;
079: waitingTime = threadwait * 1000L;
080: this .tm = tm;
081: poolnumber++;
082: logger = Log.getLogger(Log.JONAS_WORK_MGR_PREFIX);
083: if (logger.isLoggable(BasicLevel.DEBUG)) {
084: logger.log(BasicLevel.DEBUG, "thread pool #" + poolnumber);
085: logger.log(BasicLevel.DEBUG, "minpoolsz = " + minsz
086: + " maxpoolsz = " + maxsz);
087: }
088: for (poolsz = 0; poolsz < minsz; poolsz++) {
089: WorkThread st = new WorkThread(this , threadnumber++,
090: poolnumber);
091: st.start();
092: }
093: }
094:
095: // --------------------------------------------------------------------------
096: // Management
097: // --------------------------------------------------------------------------
098:
099: /**
100: * @return current pool size
101: */
102: public int getCurrentPoolSize() {
103: return poolsz;
104: }
105:
106: /**
107: * @return min pool size
108: */
109: public int getMinPoolSize() {
110: return minpoolsz;
111: }
112:
113: /**
114: * @return max pool size
115: */
116: public int getMaxPoolSize() {
117: return maxpoolsz;
118: }
119:
120: /**
121: * Set the min pool size
122: * @param minsz
123: */
124: public void setMinPoolSize(int minsz) {
125: minpoolsz = minsz;
126: }
127:
128: /**
129: * Set the max pool size
130: * @param maxsz
131: */
132: public void setMaxPoolSize(int maxsz) {
133: maxpoolsz = maxsz;
134: }
135:
136: // --------------------------------------------------------------------------
137: // WorkManager implementation
138: // --------------------------------------------------------------------------
139:
140: /**
141: * Accepts a Work instance for processing.
142: * This call blocks until the Work instance completes execution.
143: * There is no guarantee on when the accepted Work instance would start execution ie.,
144: * there is no time constraint to start execution.
145: * @param work The unit of work to be done. Could be long or short-lived.
146: * @throws WorkRejectedException a Work instance has been rejected from further processing.
147: * @throws WorkCompletedException a Work instance has completed execution with an exception.
148: * @throws WorkException
149: */
150: public void doWork(Work work) throws WorkException {
151: doMyWork(work, INDEFINITE, null, null, 0);
152: }
153:
154: /**
155: * Accepts a Work instance for processing. This call blocks until the Work
156: * instance completes execution.
157: * @param work The unit of work to be done. Could be long or short-lived.
158: * @param timeout a time duration (in milliseconds) within which the
159: * execution of the Work instance must start. Otherwise, the Work
160: * instance is rejected with a WorkRejectedException set to an
161: * appropriate error code (WorkRejectedException.TIMED_OUT).
162: * @param ectx an object containing the execution context with which the
163: * submitted Work instance must be executed.
164: * @param listener an object which would be notified when the various Work
165: * processing events (work accepted, work rejected, work started,
166: * work completed) occur.
167: * @throws WorkRejectedException a Work instance has been rejected from
168: * further processing.
169: * @throws WorkCompletedException a Work instance has completed execution
170: * with an exception.
171: * @throws WorkException
172: */
173: public void doWork(Work work, long timeout, ExecutionContext ectx,
174: WorkListener listener) throws WorkException {
175: if (logger.isLoggable(BasicLevel.DEBUG)) {
176: logger.log(BasicLevel.DEBUG, "");
177: }
178: if (listener != null) {
179: listener.workAccepted(new WorkEvent(this ,
180: WorkEvent.WORK_ACCEPTED, work, null));
181: }
182: doMyWork(work, timeout, ectx, listener, System
183: .currentTimeMillis());
184: }
185:
186: /**
187: * Accepts a Work instance for processing. This call blocks until the Work
188: * instance starts execution but not until its completion. There is no
189: * guarantee on when the accepted Work instance would start execution ie.,
190: * there is no time constraint to start execution.
191: * @param work The unit of work to be done. Could be long or short-lived.
192: * @return the time elapsed (in milliseconds) from Work acceptance until
193: * start of execution. Note, this does not offer real-time
194: * guarantees. It is valid to return -1, if the actual start delay
195: * duration is unknown.
196: * @throws WorkRejectedException a Work instance has been rejected from
197: * further processing.
198: * @throws WorkException
199: */
200: public long startWork(Work work) throws WorkException {
201: return startWork(work, INDEFINITE, null, null);
202: }
203:
204: /**
205: * Accepts a Work instance for processing. This call blocks until the Work
206: * instance starts execution but not until its completion. There is no
207: * guarantee on when the accepted Work instance would start execution ie.,
208: * there is no time constraint to start execution.
209: * @param work The unit of work to be done. Could be long or short-lived.
210: * @param timeout a time duration (in milliseconds) within which the
211: * execution of the Work instance must start. Otherwise, the Work
212: * instance is rejected with a WorkRejectedException set to an
213: * appropriate error code (WorkRejectedException.TIMED_OUT).
214: * @param ectx an object containing the execution context with which the
215: * submitted Work instance must be executed.
216: * @param listener an object which would be notified when the various Work
217: * processing events (work accepted, work rejected, work started,
218: * work completed) occur.
219: * @return the time elapsed (in milliseconds) from Work acceptance until
220: * start of execution. Note, this does not offer real-time
221: * guarantees. It is valid to return -1, if the actual start delay
222: * duration is unknown.
223: * @throws WorkRejectedException a Work instance has been rejected from
224: * further processing.
225: * @throws WorkException
226: */
227: public long startWork(Work work, long timeout,
228: ExecutionContext ectx, WorkListener listener)
229: throws WorkException {
230: if (logger.isLoggable(BasicLevel.DEBUG)) {
231: logger.log(BasicLevel.DEBUG, "");
232: }
233: JWork mywork = new JWork(work, timeout, ectx, listener);
234: if (listener != null) {
235: listener.workAccepted(new WorkEvent(this ,
236: WorkEvent.WORK_ACCEPTED, work, null));
237: }
238: long starttime = System.currentTimeMillis();
239: long duration = 0;
240: synchronized (workList) {
241: workList.add(mywork);
242: if (poolsz < maxpoolsz && workList.size() > freeThreads) {
243: // We need one more thread.
244: poolsz++;
245: WorkThread st = new WorkThread(this , threadnumber++,
246: poolnumber);
247: st.start();
248: } else {
249: workList.notify();
250: }
251: }
252: // Wait until my work is started.
253: boolean started = false;
254: synchronized (mywork) {
255: if (!mywork.isStarted()) {
256: try {
257: // No need to wait after timeout is elapsed
258: long waittime = waitingTime;
259: if (timeout < waittime) {
260: waittime = timeout + FEW_MORE_SECONDS;
261: }
262: mywork.wait(waittime);
263: } catch (InterruptedException e) {
264: throw new WorkRejectedException("Interrupted");
265: }
266: }
267: started = mywork.isStarted();
268: }
269: duration = System.currentTimeMillis() - starttime;
270: if (!started) {
271: synchronized (workList) {
272: // Remove the work in the list
273: if (!workList.remove(mywork)) {
274: if (logger.isLoggable(BasicLevel.DEBUG)) {
275: logger.log(BasicLevel.DEBUG,
276: "cannot remove work");
277: }
278: }
279: throw new WorkRejectedException(
280: WorkException.START_TIMED_OUT);
281: }
282: }
283: return duration;
284: }
285:
286: /**
287: * Accepts a Work instance for processing. This call does not block and
288: * returns immediately once a Work instance has been accepted for
289: * processing. There is no guarantee on when the submitted Work instance
290: * would start execution ie., there is no time constraint to start
291: * execution.
292: * @param work The unit of work to be done. Could be long or short-lived.
293: * @param timeout a time duration (in milliseconds) within which the
294: * execution of the Work instance must start. Otherwise, the Work
295: * instance is rejected with a WorkRejectedException set to an
296: * appropriate error code (WorkRejectedException.TIMED_OUT).
297: * @param ectx an object containing the execution context with which the
298: * submitted Work instance must be executed.
299: * @param listener an object which would be notified when the various Work
300: * processing events (work accepted, work rejected, work started,
301: * work completed) occur.
302: * @throws WorkRejectedException a Work instance has been rejected from
303: * further processing.
304: * @throws WorkException
305: */
306: public void scheduleWork(Work work) throws WorkException {
307: scheduleWork(work, INDEFINITE, null, null);
308: }
309:
310: /**
311: * Accepts a Work instance for processing. This call does not block and
312: * returns immediately once a Work instance has been accepted for
313: * processing. There is no guarantee on when the submitted Work instance
314: * would start execution ie., there is no time constraint to start
315: * execution.
316: * @param work The unit of work to be done. Could be long or short-lived.
317: * @param timeout a time duration (in milliseconds) within which the
318: * execution of the Work instance must start. Otherwise, the Work
319: * instance is rejected with a WorkRejectedException set to an
320: * appropriate error code (WorkRejectedException.TIMED_OUT).
321: * @param ectx an object containing the execution context with which the
322: * submitted Work instance must be executed.
323: * @param listener an object which would be notified when the various Work
324: * processing events (work accepted, work rejected, work started,
325: * work completed) occur.
326: * @throws WorkRejectedException a Work instance has been rejected from
327: * further processing.
328: * @throws WorkException
329: */
330: public void scheduleWork(Work work, long timeout,
331: ExecutionContext ectx, WorkListener listener)
332: throws WorkException {
333: if (logger.isLoggable(BasicLevel.DEBUG)) {
334: logger.log(BasicLevel.DEBUG, "");
335: }
336: JWork mywork = new JWork(work, timeout, ectx, listener);
337: if (listener != null) {
338: listener.workAccepted(new WorkEvent(this ,
339: WorkEvent.WORK_ACCEPTED, work, null));
340: }
341: synchronized (workList) {
342: workList.add(mywork);
343: if (poolsz < maxpoolsz && workList.size() > freeThreads) {
344: // We need one more thread.
345: poolsz++;
346: WorkThread st = new WorkThread(this , threadnumber++,
347: poolnumber);
348: st.start();
349: } else {
350: // Just wake up a thread waiting for work.
351: workList.notify();
352: }
353: }
354: }
355:
356: /**
357: * Internal method doing the work.
358: */
359: private void doMyWork(Work work, long timeout,
360: ExecutionContext ectx, WorkListener listener,
361: long creationTime) throws WorkException {
362: if (logger.isLoggable(BasicLevel.DEBUG)) {
363: logger.log(BasicLevel.DEBUG, "timeout=" + timeout);
364: }
365: // Notify the listener that the work is started or rejected by timeout.
366: if (listener != null) {
367: long duration = System.currentTimeMillis() - creationTime;
368: if (duration > timeout) {
369: // This can occur only in case of scheduleWork
370: logger.log(BasicLevel.WARN, "REJECTED: duration="
371: + duration);
372: listener.workRejected(new WorkEvent(this ,
373: WorkEvent.WORK_REJECTED, work, null));
374: return;
375: }
376: listener.workStarted(new WorkEvent(this ,
377: WorkEvent.WORK_STARTED, work, null));
378: }
379:
380: // Setup ExecutionContext
381: // TODO: Check this if doWork (same thread)
382: Xid xid = null;
383: if (ectx != null) {
384: xid = ectx.getXid();
385: if (xid != null) {
386: long txtimeout = ectx.getTransactionTimeout();
387: try {
388: if (txtimeout != WorkManager.UNKNOWN) {
389: ((Current) tm).begin(xid, txtimeout);
390: } else {
391: ((Current) tm).begin(xid);
392: }
393: } catch (NotSupportedException e) {
394: throw new WorkException(
395: "Error starting a new transaction", e);
396: } catch (SystemException e) {
397: throw new WorkException(
398: "Error starting a new transaction", e);
399: }
400: }
401: }
402:
403: try {
404: work.run();
405: // Notify the listener that the work is completed.
406: if (listener != null) {
407: listener.workCompleted(new WorkEvent(this ,
408: WorkEvent.WORK_COMPLETED, work, null));
409: }
410: } catch (Exception e) {
411: if (listener != null) {
412: listener.workCompleted(new WorkEvent(this ,
413: WorkEvent.WORK_COMPLETED, work, null));
414: }
415: throw new WorkCompletedException(e);
416: } finally {
417: if (xid != null) {
418: ((Current) tm).clearThreadTx();
419: }
420: }
421: }
422:
423: /**
424: * Get the next JWork object to be run.
425: * @return next JWork object to be run, or null if thread must end.
426: */
427: public void nextWork() throws WorkException, InterruptedException {
428: JWork run = null;
429: boolean haswait = false;
430: synchronized (workList) {
431: while (workList.isEmpty()) {
432: if ((haswait && freeThreads > minpoolsz) || !valid) {
433: poolsz--;
434: throw new InterruptedException("Thread ending");
435: }
436: try {
437: freeThreads++;
438: if (logger.isLoggable(BasicLevel.DEBUG)) {
439: logger.log(BasicLevel.DEBUG, "waiting");
440: }
441: workList.wait(waitingTime);
442: if (logger.isLoggable(BasicLevel.DEBUG)) {
443: logger.log(BasicLevel.DEBUG, "notified");
444: }
445: freeThreads--;
446: haswait = true;
447: } catch (InterruptedException e) {
448: freeThreads--;
449: poolsz--;
450: throw e;
451: }
452: }
453: run = (JWork) workList.removeFirst();
454: // In case startWork() was called
455: synchronized (run) {
456: if (logger.isLoggable(BasicLevel.DEBUG)) {
457: logger.log(BasicLevel.DEBUG, "start new work");
458: }
459: run.setStarted();
460: run.notify();
461: }
462: }
463: doMyWork(run.getWork(), run.getTimeout(), run
464: .getExecutionContext(), run.getWorkListener(), run
465: .getCreationTime());
466: }
467:
468: /**
469: * Remove this WorkManager : Stop all threads
470: */
471: public synchronized void stopThreads() {
472: if (logger.isLoggable(BasicLevel.DEBUG)) {
473: logger.log(BasicLevel.DEBUG, "");
474: }
475: valid = false;
476: notifyAll();
477: poolnumber--;
478: }
479:
480: class JWork {
481: private Work work;
482: private long timeout;
483: private ExecutionContext ectx;
484: private WorkListener listener;
485: private long creationTime;
486: private boolean started = false;
487:
488: public JWork(Work work, long timeout, ExecutionContext ectx,
489: WorkListener listener) {
490: this .work = work;
491: this .timeout = timeout;
492: this .ectx = ectx;
493: this .listener = listener;
494: creationTime = System.currentTimeMillis();
495: if (logger.isLoggable(BasicLevel.DEBUG)) {
496: logger.log(BasicLevel.DEBUG, "timeout=" + timeout);
497: }
498: }
499:
500: public Work getWork() {
501: return work;
502: }
503:
504: public long getTimeout() {
505: return timeout;
506: }
507:
508: public ExecutionContext getExecutionContext() {
509: return ectx;
510: }
511:
512: public WorkListener getWorkListener() {
513: return listener;
514: }
515:
516: public long getCreationTime() {
517: return creationTime;
518: }
519:
520: public boolean isStarted() {
521: return started;
522: }
523:
524: public void setStarted() {
525: if (logger.isLoggable(BasicLevel.DEBUG)) {
526: logger.log(BasicLevel.DEBUG, "");
527: }
528: started = true;
529: }
530: }
531:
532: /**
533: * Thread executing works for the work manager.
534: */
535: class WorkThread extends Thread {
536:
537: private JWorkManager mgr;
538: private int number;
539:
540: /**
541: * Constructor
542: * @param m The WorkManager
543: * @param num thread number
544: * @param wm workManager number
545: */
546: WorkThread(JWorkManager m, int num, int wm) {
547: mgr = m;
548: number = num;
549: setName("WorkThread-" + wm + "/" + num);
550: }
551:
552: public void run() {
553: if (logger.isLoggable(BasicLevel.DEBUG)) {
554: logger.log(BasicLevel.DEBUG, "running");
555: }
556: while (true) {
557: try {
558: mgr.nextWork();
559: } catch (InterruptedException e) {
560: if (logger.isLoggable(BasicLevel.DEBUG)) {
561: logger.log(BasicLevel.DEBUG, "Exiting: ", e);
562: }
563: return;
564: } catch (WorkException e) {
565: logger.log(BasicLevel.ERROR,
566: "Exception during work run: ", e);
567: }
568: }
569: }
570:
571: }
572:
573: }
|