001: /**
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */package org.apache.geronimo.timer;
017:
018: import java.util.ArrayList;
019: import java.util.Collection;
020: import java.util.Collections;
021: import java.util.Date;
022: import java.util.HashMap;
023: import java.util.Iterator;
024: import java.util.Map;
025: import java.util.Timer;
026: import java.util.TimerTask;
027: import javax.transaction.RollbackException;
028: import javax.transaction.Status;
029: import javax.transaction.Synchronization;
030: import javax.transaction.SystemException;
031: import javax.transaction.Transaction;
032: import javax.transaction.TransactionManager;
033:
034: import java.util.concurrent.Executor;
035:
036: import org.apache.commons.logging.Log;
037: import org.apache.commons.logging.LogFactory;
038: import org.apache.geronimo.gbean.GBeanLifecycle;
039:
040: /**
041: *
042: *
043: * @version $Rev: 520573 $ $Date: 2007-03-20 13:48:26 -0700 (Tue, 20 Mar 2007) $
044: *
045: * */
046: public class ThreadPooledTimer implements PersistentTimer,
047: GBeanLifecycle {
048:
049: private static final Log log = LogFactory
050: .getLog(ThreadPooledTimer.class);
051:
052: private final ExecutorTaskFactory executorTaskFactory;
053: private final WorkerPersistence workerPersistence;
054: private final Executor executor;
055: private final TransactionManager transactionManager;
056:
057: private Timer delegate;
058:
059: private final Map idToWorkInfoMap = Collections
060: .synchronizedMap(new HashMap());
061:
062: //default constructor for use as reference endpoint.
063: public ThreadPooledTimer() {
064: this (null, null, null, null);
065: }
066:
067: public ThreadPooledTimer(ExecutorTaskFactory executorTaskFactory,
068: WorkerPersistence workerPersistence, Executor executor,
069: TransactionManager transactionManager) {
070: this .executorTaskFactory = executorTaskFactory;
071: this .workerPersistence = workerPersistence;
072: this .executor = executor;
073: this .transactionManager = transactionManager;
074: }
075:
076: public void doStart() throws Exception {
077: delegate = new Timer(true);
078: }
079:
080: public void doStop() {
081: if (delegate != null) {
082: delegate.cancel();
083: delegate = null;
084: }
085: }
086:
087: public void doFail() {
088: doStop();
089: }
090:
091: public WorkInfo schedule(UserTaskFactory userTaskFactory,
092: String key, Object userId, Object userInfo, long delay)
093: throws PersistenceException, RollbackException,
094: SystemException {
095: if (delay < 0) {
096: throw new IllegalArgumentException("Negative delay: "
097: + delay);
098: }
099: Date time = new Date(System.currentTimeMillis() + delay);
100: return schedule(key, userTaskFactory, userId, userInfo, time);
101: }
102:
103: public WorkInfo schedule(String key,
104: UserTaskFactory userTaskFactory, Object userId,
105: Object userInfo, Date time) throws PersistenceException,
106: RollbackException, SystemException {
107: if (time == null) {
108: throw new IllegalArgumentException("No time supplied");
109: }
110: if (time.getTime() < 0) {
111: throw new IllegalArgumentException("Negative time: "
112: + time.getTime());
113: }
114: WorkInfo worker = createWorker(key, userTaskFactory,
115: executorTaskFactory, userId, userInfo, time, null,
116: false);
117: registerSynchronization(new ScheduleSynchronization(worker
118: .getExecutorFeedingTimerTask(), time));
119: addWorkInfo(worker);
120: return worker;
121: }
122:
123: public WorkInfo schedule(String key,
124: UserTaskFactory userTaskFactory, Object userInfo,
125: long delay, long period, Object userId)
126: throws PersistenceException, RollbackException,
127: SystemException {
128: if (delay < 0) {
129: throw new IllegalArgumentException("Negative delay: "
130: + delay);
131: }
132: if (period < 0) {
133: throw new IllegalArgumentException("Negative period: "
134: + period);
135: }
136: Date time = new Date(System.currentTimeMillis() + delay);
137: return schedule(key, userTaskFactory, userId, userInfo, time,
138: period);
139: }
140:
141: public WorkInfo schedule(String key,
142: UserTaskFactory userTaskFactory, Object userId,
143: Object userInfo, Date firstTime, long period)
144: throws PersistenceException, RollbackException,
145: SystemException {
146: if (firstTime == null) {
147: throw new IllegalArgumentException("No time supplied");
148: }
149: if (firstTime.getTime() < 0) {
150: throw new IllegalArgumentException("Negative time: "
151: + firstTime.getTime());
152: }
153: if (period < 0) {
154: throw new IllegalArgumentException("Negative period: "
155: + period);
156: }
157: WorkInfo worker = createWorker(key, userTaskFactory,
158: executorTaskFactory, userId, userInfo, firstTime,
159: new Long(period), false);
160: registerSynchronization(new ScheduleRepeatedSynchronization(
161: worker.getExecutorFeedingTimerTask(), firstTime, period));
162: addWorkInfo(worker);
163: return worker;
164: }
165:
166: public WorkInfo scheduleAtFixedRate(String key,
167: UserTaskFactory userTaskFactory, Object userId,
168: Object userInfo, long delay, long period)
169: throws PersistenceException, RollbackException,
170: SystemException {
171: if (delay < 0) {
172: throw new IllegalArgumentException("Negative delay: "
173: + delay);
174: }
175: if (period < 0) {
176: throw new IllegalArgumentException("Negative period: "
177: + period);
178: }
179: Date time = new Date(System.currentTimeMillis() + delay);
180: return scheduleAtFixedRate(key, userTaskFactory, userId,
181: userInfo, time, period);
182: }
183:
184: public WorkInfo scheduleAtFixedRate(String key,
185: UserTaskFactory userTaskFactory, Object userId,
186: Object userInfo, Date firstTime, long period)
187: throws PersistenceException, RollbackException,
188: SystemException {
189: if (firstTime == null) {
190: throw new IllegalArgumentException("No time supplied");
191: }
192: if (firstTime.getTime() < 0) {
193: throw new IllegalArgumentException("Negative time: "
194: + firstTime.getTime());
195: }
196: if (period < 0) {
197: throw new IllegalArgumentException("Negative period: "
198: + period);
199: }
200: WorkInfo worker = createWorker(key, userTaskFactory,
201: executorTaskFactory, userId, userInfo, firstTime,
202: new Long(period), true);
203: registerSynchronization(new ScheduleAtFixedRateSynchronization(
204: worker.getExecutorFeedingTimerTask(), firstTime, period));
205: addWorkInfo(worker);
206: return worker;
207: }
208:
209: public Collection playback(String key,
210: UserTaskFactory userTaskFactory)
211: throws PersistenceException {
212: PlaybackImpl playback = new PlaybackImpl(userTaskFactory);
213: workerPersistence.playback(key, playback);
214: return playback.getWorkInfos();
215: }
216:
217: public Collection getIdsByKey(String key, Object userId)
218: throws PersistenceException {
219: return workerPersistence.getIdsByKey(key, userId);
220: }
221:
222: public WorkInfo getWorkInfo(Long id) {
223: return (WorkInfo) idToWorkInfoMap.get(id);
224: }
225:
226: /**
227: * Called when client, eg. ejb container, is stopped and needs to cancel its timertasks without
228: * affecting persisted timer data.
229: * @param ids list of ids to have their corresponding workInfo timertasks cancelled.
230: */
231: public void cancelTimerTasks(Collection ids) {
232: for (Iterator iterator = ids.iterator(); iterator.hasNext();) {
233: Long idLong = (Long) iterator.next();
234: WorkInfo workInfo = getWorkInfo(idLong);
235: if (workInfo != null) {
236: TimerTask timerTask = workInfo
237: .getExecutorFeedingTimerTask();
238: timerTask.cancel();
239: }
240: }
241: }
242:
243: void addWorkInfo(WorkInfo worker) {
244: idToWorkInfoMap.put(new Long(worker.getId()), worker);
245: }
246:
247: void removeWorkInfo(WorkInfo workInfo) {
248: idToWorkInfoMap.remove(new Long(workInfo.getId()));
249: }
250:
251: void workPerformed(WorkInfo workInfo) throws PersistenceException {
252: if (workInfo.isOneTime()) {
253: workerPersistence.cancel(workInfo.getId());
254: } else if (workInfo.getAtFixedRate()) {
255: workerPersistence.fixedRateWorkPerformed(workInfo.getId());
256: workInfo.nextTime();
257: } else {
258: workInfo.nextInterval();
259: workerPersistence.intervalWorkPerformed(workInfo.getId(),
260: workInfo.getPeriod().longValue());
261: }
262: }
263:
264: Timer getTimer() {
265: if (delegate == null) {
266: throw new IllegalStateException("Timer is stopped");
267: }
268: return delegate;
269: }
270:
271: WorkerPersistence getWorkerPersistence() {
272: return workerPersistence;
273: }
274:
275: Executor getExecutor() {
276: return executor;
277: }
278:
279: private WorkInfo createWorker(String key,
280: UserTaskFactory userTaskFactory,
281: ExecutorTaskFactory executorTaskFactory, Object userId,
282: Object userInfo, Date time, Long period, boolean atFixedRate)
283: throws PersistenceException {
284: if (time == null) {
285: throw new IllegalArgumentException("Null initial time");
286: }
287: WorkInfo workInfo = new WorkInfo(key, userId, userInfo, time,
288: period, atFixedRate);
289: //save and assign id
290: workerPersistence.save(workInfo);
291:
292: Runnable userTask = userTaskFactory.newTask(workInfo.getId());
293: ExecutorTask executorTask = executorTaskFactory
294: .createExecutorTask(userTask, workInfo, this );
295: ExecutorFeedingTimerTask worker = new ExecutorFeedingTimerTask(
296: workInfo, this );
297: workInfo.initialize(worker, executorTask);
298: return workInfo;
299: }
300:
301: void registerSynchronization(Synchronization sync)
302: throws RollbackException, SystemException {
303: Transaction transaction = transactionManager.getTransaction();
304: int status = transaction == null ? Status.STATUS_NO_TRANSACTION
305: : transaction.getStatus();
306:
307: if (transaction != null && status == Status.STATUS_ACTIVE
308: || status == Status.STATUS_MARKED_ROLLBACK) {
309: transaction.registerSynchronization(sync);
310: } else {
311: sync.beforeCompletion();
312: sync.afterCompletion(Status.STATUS_COMMITTED);
313: }
314: }
315:
316: private class ScheduleSynchronization implements Synchronization {
317:
318: private final ExecutorFeedingTimerTask worker;
319: private final Date time;
320:
321: public ScheduleSynchronization(ExecutorFeedingTimerTask worker,
322: Date time) {
323: this .worker = worker;
324: this .time = time;
325: }
326:
327: public void beforeCompletion() {
328: }
329:
330: public void afterCompletion(int status) {
331: if (status == Status.STATUS_COMMITTED) {
332: if (worker.isCancelled()) {
333: log
334: .trace("Worker is already cancelled, not scheduling");
335: return;
336: }
337: try {
338: getTimer().schedule(worker, time);
339: } catch (IllegalStateException e) {
340: //TODO consider again if catching this exception is appropriate
341: log.warn("Couldn't schedule worker "
342: + e.getMessage() + "at (now) "
343: + System.currentTimeMillis() + " for "
344: + time.getTime());
345: }
346: }
347: }
348: }
349:
350: private class ScheduleRepeatedSynchronization implements
351: Synchronization {
352:
353: private final ExecutorFeedingTimerTask worker;
354: private final Date time;
355: private final long period;
356:
357: public ScheduleRepeatedSynchronization(
358: ExecutorFeedingTimerTask worker, Date time, long period) {
359: this .worker = worker;
360: this .time = time;
361: this .period = period;
362: }
363:
364: public void beforeCompletion() {
365: }
366:
367: public void afterCompletion(int status) {
368: if (status == Status.STATUS_COMMITTED) {
369: if (worker.isCancelled()) {
370: log
371: .trace("Worker is already cancelled, not scheduling/period");
372: return;
373: }
374: try {
375: getTimer().schedule(worker, time, period);
376: } catch (Exception e) {
377: log.warn("Couldn't schedule/period worker "
378: + e.getMessage() + "at (now) "
379: + System.currentTimeMillis() + " for "
380: + time.getTime());
381: }
382: }
383: }
384: }
385:
386: private class ScheduleAtFixedRateSynchronization implements
387: Synchronization {
388:
389: private final ExecutorFeedingTimerTask worker;
390: private final Date time;
391: private final long period;
392:
393: public ScheduleAtFixedRateSynchronization(
394: ExecutorFeedingTimerTask worker, Date time, long period) {
395: this .worker = worker;
396: this .time = time;
397: this .period = period;
398: }
399:
400: public void beforeCompletion() {
401: }
402:
403: public void afterCompletion(int status) {
404: if (status == Status.STATUS_COMMITTED) {
405: if (worker.isCancelled()) {
406: log
407: .trace("Worker is already cancelled, not scheduleAtFixedRate");
408: return;
409: }
410: try {
411: getTimer()
412: .scheduleAtFixedRate(worker, time, period);
413: } catch (Exception e) {
414: log.warn("Couldn't scheduleAtFixedRate worker "
415: + e.getMessage() + "at (now) "
416: + System.currentTimeMillis() + " for "
417: + time.getTime());
418: }
419: }
420: }
421: }
422:
423: private class PlaybackImpl implements Playback {
424:
425: private final UserTaskFactory userTaskFactory;
426:
427: private final Collection workInfos = new ArrayList();
428:
429: public PlaybackImpl(UserTaskFactory userTaskFactory) {
430: this .userTaskFactory = userTaskFactory;
431: }
432:
433: public void schedule(WorkInfo workInfo) {
434: Runnable userTask = userTaskFactory.newTask(workInfo
435: .getId());
436: ExecutorTask executorTask = executorTaskFactory
437: .createExecutorTask(userTask, workInfo,
438: ThreadPooledTimer.this );
439: ExecutorFeedingTimerTask worker = new ExecutorFeedingTimerTask(
440: workInfo, ThreadPooledTimer.this );
441: workInfo.initialize(worker, executorTask);
442: if (workInfo.getPeriod() == null) {
443: getTimer().schedule(worker, workInfo.getTime());
444: } else if (!workInfo.getAtFixedRate()) {
445: getTimer().schedule(worker, workInfo.getTime(),
446: workInfo.getPeriod().longValue());
447: } else {
448: getTimer().scheduleAtFixedRate(worker,
449: workInfo.getTime(),
450: workInfo.getPeriod().longValue());
451: }
452: addWorkInfo(workInfo);
453: workInfos.add(workInfo);
454: }
455:
456: public Collection getWorkInfos() {
457: return workInfos;
458: }
459:
460: }
461:
462: }
|