001: /*
002: * Copyright 2007 The Kuali Foundation
003: *
004: * Licensed under the Educational Community License, Version 1.0 (the "License");
005: * you may not use this file except in compliance with the License.
006: * You may obtain a copy of the License at
007: *
008: * http://www.opensource.org/licenses/ecl1.php
009: *
010: * Unless required by applicable law or agreed to in writing, software
011: * distributed under the License is distributed on an "AS IS" BASIS,
012: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013: * See the License for the specific language governing permissions and
014: * limitations under the License.
015: */
016: package edu.iu.uis.eden.messaging;
017:
018: import org.apache.log4j.Logger;
019: import org.kuali.bus.services.KSBServiceLocator;
020: import org.kuali.rice.RiceConstants;
021: import org.springframework.transaction.TransactionStatus;
022: import org.springframework.transaction.support.TransactionCallback;
023:
024: /**
025: * Fetches messages from the db. Marks as 'R'. Gives messages to ThreadPool for execution
026: *
027: * @author Kuali Rice Team (kuali-rice@googlegroups.com)
028: *
029: */
030: public class MessageFetcher implements Runnable {
031:
032: private static final Logger LOG = Logger
033: .getLogger(MessageFetcher.class);
034:
035: private Integer maxMessages;
036: private Long routeQueueId;
037:
038: public MessageFetcher(Integer maxMessages) {
039: this .maxMessages = maxMessages;
040: }
041:
042: public MessageFetcher(Long routeQueueId) {
043: this .routeQueueId = routeQueueId;
044: }
045:
046: public void run() {
047: try {
048: requeueDocument();
049: requeueMessages();
050: } catch (Throwable t) {
051: LOG.error("Failed to fetch messages.", t);
052: }
053: }
054:
055: private void requeueMessages() {
056: if (this .routeQueueId == null) {
057: try {
058: for (final PersistedMessage message : getRouteQueueService()
059: .getNextDocuments(maxMessages)) {
060: markEnrouteAndSaveMessage(message);
061: executeMessage(message);
062: }
063: } catch (Throwable t) {
064: LOG
065: .error(
066: "Failed to fetch or process some messages during requeueMessages",
067: t);
068: }
069: }
070: }
071:
072: private void requeueDocument() {
073: try {
074: if (this .routeQueueId != null) {
075: PersistedMessage message = getRouteQueueService()
076: .findByRouteQueueId(this .routeQueueId);
077: message
078: .setQueueStatus(RiceConstants.ROUTE_QUEUE_ROUTING);
079: getRouteQueueService().save(message);
080: executeMessage(message);
081: }
082: } catch (Throwable t) {
083: LOG
084: .error(
085: "Failed to fetch or process some messages during requeueDocument",
086: t);
087: }
088: }
089:
090: private void executeMessage(PersistedMessage message) {
091: try {
092: KSBServiceLocator.getThreadPool().execute(
093: new MessageServiceInvoker(message));
094: } catch (Throwable t) {
095: LOG.error("Failed to place message " + message
096: + " in thread pool for execution", t);
097: }
098: }
099:
100: private void markEnrouteAndSaveMessage(
101: final PersistedMessage message) {
102: try {
103: KSBServiceLocator.getTransactionTemplate().execute(
104: new TransactionCallback() {
105: public Object doInTransaction(
106: TransactionStatus status) {
107: message
108: .setQueueStatus(RiceConstants.ROUTE_QUEUE_ROUTING);
109: getRouteQueueService().save(message);
110: return null;
111: }
112: });
113: } catch (Throwable t) {
114: LOG.error("Caught error attempting to mark message "
115: + message + " as R", t);
116: }
117: }
118:
119: private MessageQueueService getRouteQueueService() {
120: return KSBServiceLocator.getRouteQueueService();
121: }
122:
123: }
|