001: /**
002: * EasyBeans
003: * Copyright (C) 2006 Bull S.A.S.
004: * Contact: easybeans@ow2.org
005: *
006: * This library is free software; you can redistribute it and/or
007: * modify it under the terms of the GNU Lesser General Public
008: * License as published by the Free Software Foundation; either
009: * version 2.1 of the License, or any later version.
010: *
011: * This library is distributed in the hope that it will be useful,
012: * but WITHOUT ANY WARRANTY; without even the implied warranty of
013: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
014: * Lesser General Public License for more details.
015: *
016: * You should have received a copy of the GNU Lesser General Public
017: * License along with this library; if not, write to the Free Software
018: * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
019: * USA
020: *
021: * --------------------------------------------------------------------------
022: * $Id: ResourceWorkManager.java 1970 2007-10-16 11:49:25Z benoitf $
023: * --------------------------------------------------------------------------
024: */package org.ow2.easybeans.jca.workmanager;
025:
026: import java.util.LinkedList;
027:
028: import javax.resource.spi.work.ExecutionContext;
029: import javax.resource.spi.work.Work;
030: import javax.resource.spi.work.WorkCompletedException;
031: import javax.resource.spi.work.WorkEvent;
032: import javax.resource.spi.work.WorkException;
033: import javax.resource.spi.work.WorkListener;
034: import javax.resource.spi.work.WorkManager;
035: import javax.resource.spi.work.WorkRejectedException;
036: import javax.transaction.NotSupportedException;
037: import javax.transaction.SystemException;
038: import javax.transaction.TransactionManager;
039: import javax.transaction.xa.Xid;
040:
041: import org.objectweb.jotm.Current;
042: import org.ow2.util.log.Log;
043: import org.ow2.util.log.LogFactory;
044:
045: /**
046: * Implementation of the Resource Work Manager API.
047: * @author Philippe Durieux (JOnAS)
048: * @author Florent Benoit (EasyBeans)
049: */
050: public class ResourceWorkManager implements WorkManager {
051:
052: /**
053: * MilliSeconds value.
054: */
055: private static final long MILLISECONDS = 1000L;
056:
057: /**
058: * Logger.
059: */
060: private static Log logger = LogFactory
061: .getLog(ResourceWorkManager.class);
062:
063: /**
064: * List of ResourceWork (which wrap Work object).
065: */
066: private LinkedList<ResourceWork> workList = new LinkedList<ResourceWork>();
067:
068: /**
069: * Identifier of this pool.
070: */
071: private static int poolnumber = 0;
072:
073: /**
074: * Thread number (when building ResourceWorkThread, it assigns a new thread
075: * number).
076: */
077: private static int threadnumber = 0;
078:
079: /**
080: * The maximum size of the pool.
081: */
082: private int maxpoolsz;
083:
084: /**
085: * The minimum size of the pool.
086: */
087: private int minpoolsz;
088:
089: /**
090: * The current size of thread pool.
091: */
092: private int poolsz;
093:
094: /**
095: * Threads that are ready to work.
096: */
097: private int freeThreads;
098:
099: /**
100: * The time to wait (in millisec).
101: */
102: private long waitingTime;
103:
104: /**
105: * Pool status : by default, it is not stopped.
106: */
107: private boolean stopped = false;
108:
109: /**
110: * Wait few more seconds when waiting.
111: */
112: private static final long FEW_MORE_SECONDS = 3000;
113:
114: /**
115: * TransactionManager to use.
116: */
117: private TransactionManager transactionManager;
118:
119: /**
120: * Default Constructor.
121: * @param minsz the minimum pool size
122: * @param maxsz the maximum pool size
123: * @param transactionManager the transaction manager to use.
124: * @param threadwait max time in seconds a thread will wait.
125: */
126: @SuppressWarnings("boxing")
127: public ResourceWorkManager(
128: final TransactionManager transactionManager,
129: final int minsz, final int maxsz, final long threadwait) {
130: this .minpoolsz = minsz;
131: this .maxpoolsz = maxsz;
132: this .waitingTime = threadwait * MILLISECONDS;
133: this .transactionManager = transactionManager;
134: // new identifier
135: poolnumber++;
136: if (logger.isDebugEnabled()) {
137: logger.debug("thread pool {0}", poolnumber);
138: logger.debug("minpool size = {0} and maxpool sizez = {1}",
139: minsz, maxsz);
140: }
141: // Build threads for work.
142: for (poolsz = 0; poolsz < minsz; poolsz++) {
143: ResourceWorkThread resourceWorkThread = new ResourceWorkThread(
144: this , poolnumber, threadnumber++);
145: resourceWorkThread.start();
146: }
147: }
148:
149: /**
150: * @return current pool size
151: */
152: public int getCurrentPoolSize() {
153: return poolsz;
154: }
155:
156: /**
157: * @return min pool size
158: */
159: public int getMinPoolSize() {
160: return minpoolsz;
161: }
162:
163: /**
164: * @return max pool size
165: */
166: public int getMaxPoolSize() {
167: return maxpoolsz;
168: }
169:
170: /**
171: * Sets the min pool size.
172: * @param minsz the min pool size.
173: */
174: public void setMinPoolSize(final int minsz) {
175: minpoolsz = minsz;
176: }
177:
178: /**
179: * Sets the max pool size.
180: * @param maxsz the max pool size.
181: */
182: public void setMaxPoolSize(final int maxsz) {
183: maxpoolsz = maxsz;
184: }
185:
186: /**
187: * Accepts a Work instance for processing. This call blocks until the Work
188: * instance completes execution. There is no guarantee on when the accepted
189: * Work instance would start execution ie., there is no time constraint to
190: * start execution.
191: * @param work The unit of work to be done. Could be long or short-lived.
192: * @throws WorkRejectedException a Work instance has been rejected from
193: * further processing.
194: * @throws WorkCompletedException a Work instance has completed execution
195: * with an exception.
196: * @throws WorkException if work is not done
197: */
198: public void doWork(final Work work) throws WorkRejectedException,
199: WorkCompletedException, WorkException {
200: doMyWork(work, INDEFINITE, null, null, 0);
201: }
202:
203: /**
204: * Accepts a Work instance for processing. This call blocks until the Work
205: * instance completes execution.
206: * @param work The unit of work to be done. Could be long or short-lived.
207: * @param timeout a time duration (in milliseconds) within which the
208: * execution of the Work instance must start. Otherwise, the Work
209: * instance is rejected with a WorkRejectedException set to an
210: * appropriate error code (WorkRejectedException.TIMED_OUT).
211: * @param executionContext an object containing the execution context with
212: * which the submitted Work instance must be executed.
213: * @param workListener an object which would be notified when the various
214: * Work processing events (work accepted, work rejected, work
215: * started, work completed) occur.
216: * @throws WorkRejectedException a Work instance has been rejected from
217: * further processing.
218: * @throws WorkCompletedException a Work instance has completed execution
219: * with an exception.
220: * @throws WorkException if work is not done
221: */
222: public void doWork(final Work work, final long timeout,
223: final ExecutionContext executionContext,
224: final WorkListener workListener)
225: throws WorkRejectedException, WorkCompletedException,
226: WorkException {
227: if (workListener != null) {
228: workListener.workAccepted(new WorkEvent(this ,
229: WorkEvent.WORK_ACCEPTED, work, null));
230: }
231: doMyWork(work, timeout, executionContext, workListener, System
232: .currentTimeMillis());
233: }
234:
235: /**
236: * Accepts a Work instance for processing. This call blocks until the Work
237: * instance starts execution but not until its completion. There is no
238: * guarantee on when the accepted Work instance would start execution ie.,
239: * there is no time constraint to start execution.
240: * @param work The unit of work to be done. Could be long or short-lived.
241: * @return the time elapsed (in milliseconds) from Work acceptance until
242: * start of execution. Note, this does not offer real-time
243: * guarantees. It is valid to return -1, if the actual start delay
244: * duration is unknown.
245: * @throws WorkRejectedException a Work instance has been rejected from
246: * further processing.
247: * @throws WorkException if work is not started
248: */
249: public long startWork(final Work work)
250: throws WorkRejectedException, WorkException {
251: return startWork(work, INDEFINITE, null, null);
252: }
253:
254: /**
255: * Accepts a Work instance for processing. This call blocks until the Work
256: * instance starts execution but not until its completion. There is no
257: * guarantee on when the accepted Work instance would start execution ie.,
258: * there is no time constraint to start execution.
259: * @param work The unit of work to be done. Could be long or short-lived.
260: * @param timeout a time duration (in milliseconds) within which the
261: * execution of the Work instance must start. Otherwise, the Work
262: * instance is rejected with a WorkRejectedException set to an
263: * appropriate error code (WorkRejectedException.TIMED_OUT).
264: * @param executionContext an object containing the execution context with
265: * which the submitted Work instance must be executed.
266: * @param workListener an object which would be notified when the various
267: * Work processing events (work accepted, work rejected, work
268: * started, work completed) occur.
269: * @return the time elapsed (in milliseconds) from Work acceptance until
270: * start of execution. Note, this does not offer real-time
271: * guarantees. It is valid to return -1, if the actual start delay
272: * duration is unknown.
273: * @throws WorkRejectedException a Work instance has been rejected from
274: * further processing.
275: * @throws WorkException if work is not started
276: */
277: public long startWork(final Work work, final long timeout,
278: final ExecutionContext executionContext,
279: final WorkListener workListener)
280: throws WorkRejectedException, WorkException {
281:
282: ResourceWork resourceWork = new ResourceWork(work, timeout,
283: executionContext, workListener);
284: if (workListener != null) {
285: workListener.workAccepted(new WorkEvent(this ,
286: WorkEvent.WORK_ACCEPTED, work, null));
287: }
288: long starttime = System.currentTimeMillis();
289: long duration = 0;
290: synchronized (workList) {
291: workList.add(resourceWork);
292: if (poolsz < maxpoolsz && workList.size() > freeThreads) {
293: // We need one more thread.
294: poolsz++;
295: ResourceWorkThread resourceWorkThread = new ResourceWorkThread(
296: this , threadnumber++, poolnumber);
297: resourceWorkThread.start();
298: } else {
299: workList.notify();
300: }
301: }
302: // Wait until my work is started.
303: boolean started = false;
304: synchronized (resourceWork) {
305: if (!resourceWork.isStarted()) {
306: try {
307: // No need to wait after timeout is elapsed
308: long waittime = waitingTime;
309: if (timeout < waittime) {
310: waittime = timeout + FEW_MORE_SECONDS;
311: }
312: resourceWork.wait(waittime);
313: } catch (InterruptedException e) {
314: throw new WorkRejectedException("Interrupted");
315: }
316: }
317: started = resourceWork.isStarted();
318: }
319: duration = System.currentTimeMillis() - starttime;
320: if (!started) {
321: synchronized (workList) {
322: // Remove the work in the list
323: if (!workList.remove(resourceWork)) {
324: logger.debug("Cannot remove the work");
325: }
326: throw new WorkRejectedException(
327: WorkException.START_TIMED_OUT);
328: }
329: }
330: return duration;
331: }
332:
333: /**
334: * Accepts a Work instance for processing. This call does not block and
335: * returns immediately once a Work instance has been accepted for
336: * processing. There is no guarantee on when the submitted Work instance
337: * would start execution ie., there is no time constraint to start
338: * execution.
339: * @param work The unit of work to be done. Could be long or short-lived.
340: * @throws WorkRejectedException - indicates that a Work instance has been
341: * rejected from further processing. This can occur due to internal
342: * factors.
343: * @throws WorkException if work is not scheduled.
344: */
345: public void scheduleWork(final Work work)
346: throws WorkRejectedException, WorkException {
347: scheduleWork(work, INDEFINITE, null, null);
348: }
349:
350: /**
351: * Accepts a Work instance for processing. This call does not block and
352: * returns immediately once a Work instance has been accepted for
353: * processing. There is no guarantee on when the submitted Work instance
354: * would start execution ie., there is no time constraint to start
355: * execution.
356: * @param work The unit of work to be done. Could be long or short-lived.
357: * @param timeout a time duration (in milliseconds) within which the
358: * execution of the Work instance must start. Otherwise, the Work
359: * instance is rejected with a WorkRejectedException set to an
360: * appropriate error code (WorkRejectedException.TIMED_OUT).
361: * @param executionContext an object containing the execution context with
362: * which the submitted Work instance must be executed.
363: * @param workListener an object which would be notified when the various
364: * Work processing events (work accepted, work rejected, work
365: * started, work completed) occur.
366: * @throws WorkRejectedException a Work instance has been rejected from
367: * further processing.
368: * @throws WorkException if work is not scheduled.
369: */
370: public void scheduleWork(final Work work, final long timeout,
371: final ExecutionContext executionContext,
372: final WorkListener workListener)
373: throws WorkRejectedException, WorkException {
374:
375: ResourceWork resourceWork = new ResourceWork(work, timeout,
376: executionContext, workListener);
377: if (workListener != null) {
378: workListener.workAccepted(new WorkEvent(this ,
379: WorkEvent.WORK_ACCEPTED, work, null));
380: }
381: synchronized (workList) {
382: workList.add(resourceWork);
383: if (poolsz < maxpoolsz && workList.size() > freeThreads) {
384: // We need one more thread.
385: poolsz++;
386: ResourceWorkThread resourceWorkThread = new ResourceWorkThread(
387: this , threadnumber++, poolnumber);
388: resourceWorkThread.start();
389: } else {
390: // Just wake up a thread waiting for work.
391: workList.notify();
392: }
393: }
394: }
395:
396: /**
397: * Internal method doing the work.
398: * @param work The unit of work to be done. Could be long or short-lived.
399: * @param timeout a time duration (in milliseconds) within which the
400: * execution of the Work instance must start. Otherwise, the Work
401: * instance is rejected with a WorkRejectedException set to an
402: * appropriate error code (WorkRejectedException.TIMED_OUT).
403: * @param executionContext an object containing the execution context with
404: * which the submitted Work instance must be executed.
405: * @param workListener an object which would be notified when the various
406: * Work processing events (work accepted, work rejected, work
407: * started, work completed) occur.
408: * @param creationTime the date of the creation of the work
409: * @throws WorkException if work is not performed.
410: */
411: @SuppressWarnings("boxing")
412: private void doMyWork(final Work work, final long timeout,
413: final ExecutionContext executionContext,
414: final WorkListener workListener, final long creationTime)
415: throws WorkException {
416:
417: // Notify the listener that the work is started or rejected by timeout.
418: if (workListener != null) {
419: long duration = System.currentTimeMillis() - creationTime;
420: if (duration > timeout) {
421: // This can occur only in case of scheduleWork
422: logger.warn("REJECTED: duration= {0}", duration);
423: workListener.workRejected(new WorkEvent(this ,
424: WorkEvent.WORK_REJECTED, work, null));
425: return;
426: }
427: workListener.workStarted(new WorkEvent(this ,
428: WorkEvent.WORK_STARTED, work, null));
429: }
430:
431: // Setup ExecutionContext
432: Xid xid = null;
433: if (executionContext != null) {
434: xid = executionContext.getXid();
435: if (xid != null) {
436: long txtimeout = executionContext
437: .getTransactionTimeout();
438: try {
439: if (txtimeout != WorkManager.UNKNOWN) {
440: ((Current) transactionManager).begin(xid,
441: txtimeout);
442: } else {
443: ((Current) transactionManager).begin(xid);
444: }
445: } catch (NotSupportedException e) {
446: throw new WorkException(
447: "Error starting a new transaction", e);
448: } catch (SystemException e) {
449: throw new WorkException(
450: "Error starting a new transaction", e);
451: }
452: }
453: }
454:
455: try {
456: work.run();
457: // Notify the listener that the work is completed.
458: if (workListener != null) {
459: workListener.workCompleted(new WorkEvent(this ,
460: WorkEvent.WORK_COMPLETED, work, null));
461: }
462: } catch (Exception e) {
463: if (workListener != null) {
464: workListener.workCompleted(new WorkEvent(this ,
465: WorkEvent.WORK_COMPLETED, work, null));
466: }
467: throw new WorkCompletedException(e);
468: } finally {
469: if (xid != null) {
470: ((Current) transactionManager).clearThreadTx();
471: }
472: }
473: }
474:
475: /**
476: * Do the next JWork object to be run.
477: * @throws WorkException if work is not done
478: * @throws InterruptedException if one object can't wait.
479: * @throws ResourceWorkManagerStoppedException if the manager is stopped.
480: */
481: public void nextWork() throws WorkException, InterruptedException,
482: ResourceWorkManagerStoppedException {
483: ResourceWork run = null;
484: boolean haswait = false;
485: synchronized (workList) {
486: while (workList.isEmpty()) {
487: if ((haswait && freeThreads > minpoolsz) || stopped) {
488: poolsz--;
489: throw new ResourceWorkManagerStoppedException(
490: "Manager is stopped");
491: }
492: try {
493: freeThreads++;
494: workList.wait(waitingTime);
495: freeThreads--;
496: haswait = true;
497: } catch (InterruptedException e) {
498: freeThreads--;
499: poolsz--;
500: throw e;
501: }
502: }
503: run = workList.removeFirst();
504: // In case startWork() was called
505: synchronized (run) {
506: logger.debug("Starting a new work");
507: run.setStarted();
508: run.notify();
509: }
510: }
511: doMyWork(run.getWork(), run.getTimeout(), run
512: .getExecutionContext(), run.getWorkListener(), run
513: .getCreationTime());
514: }
515:
516: /**
517: * Remove this WorkManager : Stop all threads.
518: */
519: public synchronized void stopThreads() {
520: stopped = true;
521: notifyAll();
522: poolnumber--;
523: }
524:
525: }
|