001: /*
002: * Copyright 2005-2006 The Kuali Foundation.
003: *
004: *
005: * Licensed under the Educational Community License, Version 1.0 (the "License"); you may not use this file except in
006: * compliance with the License. You may obtain a copy of the License at
007: *
008: * http://www.opensource.org/licenses/ecl1.php
009: *
010: * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS
011: * IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific
012: * language governing permissions and limitations under the License.
013: */
014: package edu.iu.uis.eden.messaging.exceptionhandling;
015:
016: import java.sql.Timestamp;
017:
018: import org.apache.commons.lang.StringUtils;
019: import org.apache.log4j.Logger;
020: import org.kuali.bus.services.KSBServiceLocator;
021: import org.kuali.rice.RiceConstants;
022: import org.kuali.rice.core.Core;
023: import org.quartz.JobDataMap;
024: import org.quartz.JobDetail;
025: import org.quartz.Scheduler;
026: import org.quartz.SimpleTrigger;
027: import org.quartz.Trigger;
028:
029: import edu.iu.uis.eden.messaging.PersistedMessage;
030: import edu.iu.uis.eden.messaging.ServiceInfo;
031: import edu.iu.uis.eden.messaging.quartz.MessageServiceExecutorJob;
032: import edu.iu.uis.eden.messaging.quartz.MessageServiceExecutorJobListener;
033:
034: /**
035: * Default implementation of the {@link MessageExceptionHandler} which handles exceptions thrown from message processing.
036: *
037: * @author Kuali Rice Team (kuali-rice@googlegroups.com)
038: */
039: public class DefaultMessageExceptionHandler implements
040: MessageExceptionHandler {
041:
042: private static final Logger LOG = Logger
043: .getLogger(DefaultMessageExceptionHandler.class);
044:
045: private static final long DEFAULT_TIME_INCREMENT = 60 * 60 * 1000;
046:
047: private static final int DEFAULT_MAX_RETRIES = 7;
048:
049: public void handleException(Throwable throwable,
050: PersistedMessage message, Object service) {
051: LOG.error("Exception caught processing message "
052: + message.getRouteQueueId(), throwable);
053: try {
054: if (isInException(message)) {
055: placeInException(throwable, message);
056: } else {
057: requeue(throwable, message);
058: }
059: } catch (Throwable t) {
060: LOG
061: .error(
062: "Caught Exception trying to put message in exception routing!!! Returning without notifying callbacks.",
063: t);
064: }
065: }
066:
067: public boolean isInException(PersistedMessage message) {
068: ServiceInfo serviceInfo = message.getMethodCall()
069: .getServiceInfo();
070:
071: if (getImmediateExceptionRouting()) {
072: return true;
073: }
074:
075: Integer globalMaxRetryAttempts = getGlobalMaxRetryAttempts();
076: if (globalMaxRetryAttempts != null) {
077: LOG
078: .info("Global Max Retry has been set, so is overriding other max retry attempts.");
079: LOG.info("Global Max Retry count = "
080: + globalMaxRetryAttempts + ".");
081: return (message.getRetryCount().intValue() >= globalMaxRetryAttempts
082: .intValue());
083: }
084:
085: if (serviceInfo.getServiceDefinition().getRetryAttempts() > 0) {
086: LOG
087: .info("Message set for retry exception handling. Message retry count = "
088: + message.getRetryCount());
089: if (message.getRetryCount() >= serviceInfo
090: .getServiceDefinition().getRetryAttempts()) {
091: return true;
092: }
093: } else if (serviceInfo.getServiceDefinition().getMillisToLive() > 0) {
094: LOG
095: .info("Message set for time to live exception handling. Message expiration date = "
096: + message.getExpirationDate().getTime());
097: if (System.currentTimeMillis() > message
098: .getExpirationDate().getTime()) {
099: return true;
100: }
101: } else if (message.getRetryCount() >= this
102: .getMaxRetryAttempts()) {
103: LOG
104: .info("Message set for default exception handling. Comparing retry count = "
105: + message.getRetryCount()
106: + " against default max count.");
107: return true;
108: }
109: return false;
110: }
111:
112: protected void requeue(Throwable throwable, PersistedMessage message)
113: throws Exception {
114: Integer retryCount = message.getRetryCount();
115: message.setQueueStatus(RiceConstants.ROUTE_QUEUE_QUEUED);
116: long addMilliseconds = Math.round(getTimeIncrement()
117: * Math.pow(2, retryCount));
118: Timestamp currentTime = message.getQueueDate();
119: Timestamp newTime = new Timestamp(currentTime.getTime()
120: + addMilliseconds);
121: message.setQueueStatus(RiceConstants.ROUTE_QUEUE_QUEUED);
122: message.setRetryCount(new Integer(retryCount + 1));
123: message.setQueueDate(newTime);
124: scheduleExecution(throwable, message);
125: }
126:
127: protected void placeInException(Throwable throwable,
128: PersistedMessage message) {
129: message.setQueueStatus(RiceConstants.ROUTE_QUEUE_EXCEPTION);
130: message.setQueueDate(new Timestamp(System.currentTimeMillis()));
131: KSBServiceLocator.getExceptionRoutingService()
132: .placeInExceptionRouting(throwable, message);
133: }
134:
135: protected void scheduleExecution(Throwable throwable,
136: PersistedMessage message) throws Exception {
137: KSBServiceLocator.getRouteQueueService().delete(message);
138: Scheduler scheduler = KSBServiceLocator.getScheduler();
139: JobDataMap jobData = new JobDataMap();
140: jobData.put(MessageServiceExecutorJob.MESSAGE_KEY, message);
141: JobDetail jobDetail = new JobDetail("Exception_Message_Job "
142: + Math.random(), "Exception Messaging",
143: MessageServiceExecutorJob.class);
144: jobDetail.setJobDataMap(jobData);
145: jobDetail
146: .addJobListener(MessageServiceExecutorJobListener.NAME);
147: Trigger trigger = new SimpleTrigger(
148: "Exception_Message_Trigger " + Math.random(),
149: "Exception Messaging", message.getQueueDate());
150: trigger.setJobDataMap(jobData);//1.6 bug required or derby will choke
151: scheduler.scheduleJob(jobDetail, trigger);
152: }
153:
154: public Integer getMaxRetryAttempts() {
155: try {
156: return new Integer(
157: Core
158: .getCurrentContextConfig()
159: .getProperty(
160: RiceConstants.ROUTE_QUEUE_MAX_RETRY_ATTEMPTS_KEY));
161: } catch (NumberFormatException e) {
162: LOG
163: .error("Constant '"
164: + RiceConstants.ROUTE_QUEUE_MAX_RETRY_ATTEMPTS_KEY
165: + "' is not a number and is being "
166: + "used as a default for exception messages. "
167: + DEFAULT_MAX_RETRIES
168: + " will be used as a retry limit until this number is fixed");
169: return DEFAULT_MAX_RETRIES;
170: }
171: }
172:
173: protected Integer getGlobalMaxRetryAttempts() {
174: String globalMax = Core
175: .getCurrentContextConfig()
176: .getProperty(
177: RiceConstants.ROUTE_QUEUE_MAX_RETRY_ATTEMPTS_OVERRIDE_KEY);
178: if (StringUtils.isBlank(globalMax)) {
179: return null;
180: }
181: try {
182: Integer globalMaxRetries = new Integer(globalMax);
183: if (globalMaxRetries >= 0) {
184: return globalMaxRetries;
185: }
186: } catch (NumberFormatException e) {
187: LOG
188: .error("Constant '"
189: + RiceConstants.ROUTE_QUEUE_MAX_RETRY_ATTEMPTS_OVERRIDE_KEY
190: + "' is not a number and is being "
191: + "used as a default for exception messages. "
192: + DEFAULT_MAX_RETRIES
193: + " will be used as a retry limit until this number is fixed");
194: }
195: return null;
196: }
197:
198: public Long getTimeIncrement() {
199: try {
200: return new Long(Core.getCurrentContextConfig().getProperty(
201: RiceConstants.ROUTE_QUEUE_TIME_INCREMENT_KEY));
202: } catch (NumberFormatException e) {
203: LOG
204: .error("Constant '"
205: + RiceConstants.ROUTE_QUEUE_TIME_INCREMENT_KEY
206: + "' is not a number and will not be used "
207: + "as the default time increment for exception routing. Default of "
208: + DEFAULT_TIME_INCREMENT + " will be used.");
209: return DEFAULT_TIME_INCREMENT;
210: }
211: }
212:
213: public Boolean getImmediateExceptionRouting() {
214: return new Boolean(Core.getCurrentContextConfig().getProperty(
215: RiceConstants.IMMEDIATE_EXCEPTION_ROUTING));
216: }
217: }
|