001: /**
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */package org.apache.openejb.core.timer;
017:
018: import org.apache.openejb.core.BaseContext;
019: import org.apache.openejb.core.CoreDeploymentInfo;
020: import org.apache.openejb.core.ThreadContext;
021: import org.apache.openejb.RpcContainer;
022: import org.apache.openejb.OpenEJBException;
023: import org.apache.openejb.DeploymentInfo;
024: import org.apache.openejb.util.LogCategory;
025: import org.apache.openejb.util.Logger;
026: import org.apache.openejb.loader.SystemInstance;
027:
028: import javax.ejb.EJBException;
029: import javax.ejb.Timer;
030: import javax.transaction.Status;
031: import javax.transaction.TransactionManager;
032: import java.io.Serializable;
033: import java.util.ArrayList;
034: import java.util.Collection;
035: import java.util.Date;
036: import java.util.Iterator;
037: import java.util.TimerTask;
038: import java.util.concurrent.Executor;
039: import java.util.concurrent.Executors;
040: import java.lang.reflect.Method;
041:
042: public class EjbTimerServiceImpl implements EjbTimerService {
043: private static final Logger log = Logger.getInstance(
044: LogCategory.TIMER, "org.apache.openejb.util.resources");
045:
046: private final TransactionManager transactionManager;
047: private final DeploymentInfo deployment;
048: private final boolean transacted;
049: private final int retryAttempts;
050:
051: private final TimerStore timerStore;
052: private final Executor threadPool;
053:
054: private java.util.Timer timer;
055:
056: public EjbTimerServiceImpl(DeploymentInfo deployment) {
057: this (deployment, getDefaultTransactionManager(),
058: getDefaultExecutor(), new MemoryTimerStore(
059: getDefaultTransactionManager()), 1);
060: }
061:
062: public static Executor getDefaultExecutor() {
063: Executor executor = SystemInstance.get().getComponent(
064: Executor.class);
065: if (executor == null) {
066: executor = Executors.newFixedThreadPool(10);
067: SystemInstance.get().setComponent(Executor.class, executor);
068: }
069: return executor;
070: }
071:
072: public static TransactionManager getDefaultTransactionManager() {
073: return SystemInstance.get().getComponent(
074: TransactionManager.class);
075: }
076:
077: public EjbTimerServiceImpl(DeploymentInfo deployment,
078: TransactionManager transactionManager, Executor threadPool,
079: TimerStore timerStore, int retryAttempts) {
080: if (deployment.getEjbTimeout() == null)
081: throw new IllegalArgumentException(
082: "Ejb does not have an ejbTimeout method "
083: + deployment.getDeploymentID());
084:
085: this .deployment = deployment;
086: this .transactionManager = transactionManager;
087: this .threadPool = threadPool;
088: this .timerStore = timerStore;
089: byte txAttribute = deployment
090: .getTransactionAttribute(deployment.getEjbTimeout());
091: this .transacted = txAttribute == CoreDeploymentInfo.TX_REQUIRED
092: || txAttribute == CoreDeploymentInfo.TX_REQUIRES_NEW;
093: this .retryAttempts = retryAttempts;
094:
095: }
096:
097: public void start() throws TimerStoreException {
098: // load saved timers
099: Collection timerDatas = timerStore.loadTimers(this ,
100: (String) deployment.getDeploymentID());
101:
102: // create a new java.util.Timer
103: timer = new java.util.Timer(true);
104:
105: // schedule the saved timers
106: for (Iterator iterator = timerDatas.iterator(); iterator
107: .hasNext();) {
108: TimerData timerData = (TimerData) iterator.next();
109:
110: // schedule the timer with the java.util.Timer
111: schedule(timerData);
112: }
113: }
114:
115: public void stop() {
116: // stop all timers
117: for (Iterator iterator = timerStore.getTimers(
118: (String) deployment.getDeploymentID()).iterator(); iterator
119: .hasNext();) {
120: TimerData timerData = (TimerData) iterator.next();
121: timerData.stop();
122: }
123:
124: // stop the java.util.Timer
125: if (timer != null) {
126: timer.cancel();
127: timer = null;
128: }
129: }
130:
131: public TransactionManager getTransactionManager() {
132: return transactionManager;
133: }
134:
135: /**
136: * Called from TimerData and start when a timer should be scheduled with the java.util.Timer.
137: * @param timerData the timer to schedule
138: */
139: public void schedule(TimerData timerData) {
140: if (timer == null)
141: throw new IllegalStateException("Timer is stopped");
142:
143: try {
144: EjbTimeoutTimerTask timerTask = new EjbTimeoutTimerTask(
145: timerData);
146: timerData.setTimerTask(timerTask);
147: if (timerData.isOneTime()) {
148: timer.schedule(timerTask, timerData.getExpiration());
149: } else {
150: timer.scheduleAtFixedRate(timerTask, timerData
151: .getExpiration(), timerData
152: .getIntervalDuration());
153: }
154: } catch (Exception e) {
155: log.warning("Could not schedule timer " + e.getMessage()
156: + " at (now) " + System.currentTimeMillis()
157: + " for " + timerData.getExpiration().getTime());
158: }
159: }
160:
161: /**
162: * Call back from TimerData and ejbTimeout when a timer has been cancelled (or is complete) and should be removed from stores.
163: * @param timerData the timer that was cancelled
164: */
165: public void cancelled(TimerData timerData) {
166: // make sure it was removed from the strore
167: timerStore.removeTimer(timerData.getId());
168: }
169:
170: /**
171: * Returns a timerData to the TimerStore, if a cancel() is rolled back.
172: * @param timerData the timer to be returned to the timer store
173: */
174: public void addTimerData(TimerData timerData) {
175: try {
176: timerStore.addTimerData(timerData);
177: } catch (Exception e) {
178: log.warning("Could not add timer " + e.getMessage()
179: + " at (now) " + System.currentTimeMillis()
180: + " for " + timerData.getExpiration().getTime());
181: }
182: }
183:
184: public Timer getTimer(long timerId) {
185: TimerData timerData = timerStore.getTimer((String) deployment
186: .getDeploymentID(), timerId);
187: if (timerData != null) {
188: return timerData.getTimer();
189: } else {
190: return null;
191: }
192: }
193:
194: public Collection<Timer> getTimers(Object primaryKey)
195: throws IllegalStateException {
196: checkState();
197:
198: Collection<Timer> timers = new ArrayList<Timer>();
199: for (Iterator iterator = timerStore.getTimers(
200: (String) deployment.getDeploymentID()).iterator(); iterator
201: .hasNext();) {
202: TimerData timerData = (TimerData) iterator.next();
203: Timer timer = timerData.getTimer();
204: timers.add(timer);
205: }
206: return timers;
207: }
208:
209: public Timer createTimer(Object primaryKey, long duration,
210: Serializable info) throws IllegalArgumentException,
211: IllegalStateException, EJBException {
212: if (duration < 0)
213: throw new IllegalArgumentException("duration is negative: "
214: + duration);
215: checkState();
216:
217: Date time = new Date(System.currentTimeMillis() + duration);
218: try {
219: TimerData timerData = createTimerData(primaryKey, time, 0,
220: info);
221: return timerData.getTimer();
222: } catch (TimerStoreException e) {
223: throw new EJBException(e);
224: }
225: }
226:
227: public Timer createTimer(Object primaryKey, long initialDuration,
228: long intervalDuration, Serializable info)
229: throws IllegalArgumentException, IllegalStateException,
230: EJBException {
231: if (initialDuration < 0)
232: throw new IllegalArgumentException(
233: "initialDuration is negative: " + initialDuration);
234: if (intervalDuration < 0)
235: throw new IllegalArgumentException(
236: "intervalDuration is negative: " + intervalDuration);
237: checkState();
238:
239: Date time = new Date(System.currentTimeMillis()
240: + initialDuration);
241: try {
242: TimerData timerData = createTimerData(primaryKey, time,
243: intervalDuration, info);
244: return timerData.getTimer();
245: } catch (TimerStoreException e) {
246: throw new EJBException(e);
247: }
248: }
249:
250: public Timer createTimer(Object primaryKey, Date expiration,
251: Serializable info) throws IllegalArgumentException,
252: IllegalStateException, EJBException {
253: if (expiration == null)
254: throw new IllegalArgumentException("expiration is null");
255: if (expiration.getTime() < 0)
256: throw new IllegalArgumentException(
257: "expiration is negative: " + expiration.getTime());
258: checkState();
259:
260: try {
261: TimerData timerData = createTimerData(primaryKey,
262: expiration, 0, info);
263: return timerData.getTimer();
264: } catch (TimerStoreException e) {
265: throw new EJBException(e);
266: }
267: }
268:
269: public Timer createTimer(Object primaryKey, Date initialExpiration,
270: long intervalDuration, Serializable info)
271: throws IllegalArgumentException, IllegalStateException,
272: EJBException {
273: if (initialExpiration == null)
274: throw new IllegalArgumentException(
275: "initialExpiration is null");
276: if (initialExpiration.getTime() < 0)
277: throw new IllegalArgumentException(
278: "initialExpiration is negative: "
279: + initialExpiration.getTime());
280: if (intervalDuration < 0)
281: throw new IllegalArgumentException(
282: "intervalDuration is negative: " + intervalDuration);
283: checkState();
284:
285: try {
286: TimerData timerData = createTimerData(primaryKey,
287: initialExpiration, intervalDuration, info);
288: return timerData.getTimer();
289: } catch (TimerStoreException e) {
290: throw new EJBException(e);
291: }
292: }
293:
294: private TimerData createTimerData(Object primaryKey,
295: Date expiration, long intervalDuration, Object info)
296: throws TimerStoreException {
297: TimerData timerData = timerStore.createTimer(this ,
298: (String) deployment.getDeploymentID(), primaryKey,
299: info, expiration, intervalDuration);
300:
301: // mark this as a new timer... when the transaction completes it will schedule the timer
302: timerData.newTimer();
303:
304: return timerData;
305: }
306:
307: /**
308: * Insure that timer methods can be invoked for the current operation on this Context.
309: */
310: private void checkState() throws IllegalStateException {
311: if (!BaseContext.isTimerMethodAllowed()) {
312: throw new IllegalStateException(
313: "TimerService method not permitted for current operation "
314: + ThreadContext.getThreadContext()
315: .getCurrentOperation().name());
316: }
317: }
318:
319: /**
320: * This method calls the ejbTimeout method and starts a transaction if the timeout is transacted.
321: *
322: * This method will retry failed ejbTimeout calls until retryAttempts is exceeded.
323: *
324: * @param timerData the timer to call.
325: */
326: private void ejbTimeout(TimerData timerData) {
327: try {
328: Timer timer = getTimer(timerData.getId());
329: if (timer == null) {
330: return;
331: }
332:
333: for (int tries = 0; tries < (1 + retryAttempts); tries++) {
334: // if transacted, begin the transaction
335: if (transacted) {
336: try {
337: transactionManager.begin();
338: } catch (Exception e) {
339: log
340: .warning(
341: "Exception occured while starting container transaction",
342: e);
343: return;
344: }
345: }
346:
347: // call the timeout method
348: try {
349: RpcContainer container = (RpcContainer) deployment
350: .getContainer();
351: Method ejbTimeout = deployment.getEjbTimeout();
352: container.invoke(deployment.getDeploymentID(),
353: ejbTimeout.getDeclaringClass(), ejbTimeout,
354: new Object[] { timer }, timerData
355: .getPrimaryKey());
356: } catch (RuntimeException e) {
357: // exception from a timer does not necessairly mean failure
358: log.warning("RuntimeException from ejbTimeout on "
359: + deployment.getDeploymentID(), e);
360: } catch (OpenEJBException e) {
361: log.warning("Exception from ejbTimeout on "
362: + deployment.getDeploymentID(), e);
363: } finally {
364: try {
365: if (!transacted
366: || transactionManager.getStatus() == Status.STATUS_ACTIVE) {
367: // clean up the timer store
368: if (timerData.isOneTime()) {
369: timerStore.removeTimer(timerData
370: .getId());
371: } else {
372: timerData.nextTime();
373: timerStore
374: .updateIntervalTimer(timerData);
375: }
376:
377: // commit the tx
378: if (transacted) {
379: transactionManager.commit();
380: }
381:
382: // all is cool
383: //noinspection ReturnInsideFinallyBlock
384: return;
385: } else {
386: // tx was marked rollback, so roll it back
387: if (transacted) {
388: transactionManager.rollback();
389: }
390: }
391: } catch (Exception e) {
392: log
393: .warning(
394: "Exception occured while completing container transaction",
395: e);
396: }
397: }
398: }
399: log.warning("Failed to execute ejbTimeout on "
400: + timerData.getDeploymentId()
401: + " successfully within " + retryAttempts
402: + " attempts");
403: } catch (RuntimeException e) {
404: log
405: .warning(
406: "RuntimeException occured while calling ejbTimeout",
407: e);
408: throw e;
409: } catch (Error e) {
410: log.warning("Error occured while calling ejbTimeout", e);
411: throw e;
412: } finally {
413: // if this is a single action timer, mark it as cancelled
414: if (timerData.isOneTime()) {
415: cancelled(timerData);
416: }
417: }
418: }
419:
420: /**
421: * The timer task registered with the java.util.Timer. The run method of this class
422: * simply adds an execution of the ejbTimeout method to the thread pool. It is
423: * important to use the thread pool, since the java.util.Timer is single threaded.
424: */
425: private class EjbTimeoutTimerTask extends TimerTask {
426: private final TimerData timerData;
427:
428: public EjbTimeoutTimerTask(TimerData timerData) {
429: this .timerData = timerData;
430: }
431:
432: public void run() {
433: threadPool.execute(new Runnable() {
434: public void run() {
435: ejbTimeout(timerData);
436: }
437: });
438: }
439: }
440: }
|