001: /*
002: * Copyright 2005-2006 The Kuali Foundation.
003: *
004: *
005: * Licensed under the Educational Community License, Version 1.0 (the "License"); you may not use this file except in
006: * compliance with the License. You may obtain a copy of the License at
007: *
008: * http://www.opensource.org/licenses/ecl1.php
009: *
010: * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS
011: * IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific
012: * language governing permissions and limitations under the License.
013: */
014: package edu.iu.uis.eden.messaging;
015:
016: import java.io.Serializable;
017: import java.lang.reflect.Method;
018: import java.util.List;
019:
020: import javax.xml.namespace.QName;
021:
022: import org.apache.log4j.Logger;
023: import org.kuali.bus.services.KSBServiceLocator;
024: import org.kuali.rice.core.Core;
025: import org.kuali.rice.resourceloader.GlobalResourceLoader;
026: import org.springframework.transaction.TransactionStatus;
027: import org.springframework.transaction.support.TransactionCallback;
028:
029: import edu.iu.uis.eden.messaging.callforwarding.ForwardedCallHandler;
030: import edu.iu.uis.eden.messaging.resourceloading.KSBResourceLoaderFactory;
031:
032: /**
033: * Handles invocation of a {@link PersistedMessage}.
034: *
035: * @author Kuali Rice Team (kuali-rice@googlegroups.com)
036: */
037: public class MessageServiceInvoker implements Runnable {
038:
039: protected static final Logger LOG = Logger
040: .getLogger(MessageServiceInvoker.class);
041:
042: private PersistedMessage message;
043: private Object service;
044: private AsynchronousCall methodCall;
045:
046: public MessageServiceInvoker(PersistedMessage message) {
047: this .message = message;
048: }
049:
050: public void run() {
051: LOG.debug("calling service from persisted message "
052: + getMessage().getRouteQueueId());
053: try {
054: KSBServiceLocator.getTransactionTemplate().execute(
055: new TransactionCallback() {
056: public Object doInTransaction(
057: TransactionStatus status) {
058: Object result = null;
059: AsynchronousCall methodCall = getMessage()
060: .getPayload().getMethodCall();
061:
062: try {
063: result = invokeService(methodCall);
064: KSBServiceLocator
065: .getRouteQueueService().delete(
066: getMessage());
067: } catch (Throwable t) {
068: LOG.warn(
069: "Caught throwable making async service call "
070: + methodCall, t);
071: throw new MessageProcessingException(t);
072: } finally {
073: try {
074: notifyOnCallback(methodCall, result);
075: } catch (Exception e) {
076: LOG
077: .warn(
078: "Exception caught notifying callback",
079: e);
080: }
081: try {
082: notifyGlobalCallbacks(methodCall,
083: result);
084: } catch (Exception e) {
085: LOG
086: .warn(
087: "Exception caught notifying callback",
088: e);
089: }
090:
091: }
092: return null;
093: }
094: });
095: } catch (Throwable t) {
096: placeInExceptionRouting(t, getMethodCall(), getService());
097: }
098: }
099:
100: public void placeInExceptionRouting(Throwable t,
101: AsynchronousCall call, Object service) {
102: LOG.error("Error processing message: " + this .message, t);
103: final Throwable throwable;
104: if (t instanceof MessageProcessingException) {
105: throwable = t.getCause();
106: } else {
107: throwable = t;
108: }
109: KSBServiceLocator.getExceptionRoutingService()
110: .placeInExceptionRouting(throwable, this .message,
111: service);
112: }
113:
114: /**
115: * Invokes the AsynchronousCall represented on the methodCall on the service contained in the ServiceInfo object on
116: * the AsynchronousCall.
117: *
118: */
119: public Object invokeService(AsynchronousCall methodCall)
120: throws Exception {
121: this .methodCall = methodCall;
122: ServiceInfo serviceInfo = methodCall.getServiceInfo();
123: if (LOG.isDebugEnabled()) {
124: LOG.debug("Attempting to call service "
125: + serviceInfo.getQname());
126: }
127:
128: if (Core.getCurrentContextConfig().getStoreAndForward()
129: && !methodCall.isIgnoreStoreAndForward()) {
130: QName serviceName = serviceInfo.getQname();
131: RemoteResourceServiceLocator remoteResourceLocator = KSBResourceLoaderFactory
132: .getRemoteResourceLocator();
133: QName storeAndForwardName = new QName(serviceName
134: .getNamespaceURI(), serviceName.getLocalPart()
135: + RemotedServiceRegistry.FORWARD_HANDLER_SUFFIX);
136: List<RemotedServiceHolder> forwardServices = remoteResourceLocator
137: .getAllServices(storeAndForwardName);
138: if (forwardServices.isEmpty()) {
139: LOG.warn("Could not find store and forward service "
140: + storeAndForwardName
141: + ". Defaulting to regular messaging.");
142: } else {
143: serviceInfo = forwardServices.get(0).getServiceInfo();
144: }
145: ForwardedCallHandler service = (ForwardedCallHandler) getService(serviceInfo);
146: this .message.setMethodCall(methodCall);
147: service.handleCall(this .message);
148: return null;
149: }
150:
151: Object service = getService(serviceInfo);
152: Method method = service.getClass().getMethod(
153: methodCall.getMethodName(), methodCall.getParamTypes());
154: return method.invoke(service, methodCall.getArguments());
155: }
156:
157: public Object getService(ServiceInfo serviceInfo) {
158: Object service;
159: if (serviceInfo.getServiceDefinition().getQueue()) {
160: service = getQueueService(serviceInfo);
161: } else {
162: service = getTopicService(serviceInfo);
163: }
164: return service;
165: }
166:
167: /**
168: * Get the service as a topic. This means we want to contact every service that is a part of this topic. We've
169: * grabbed all the services that are a part of this topic and we want to make sure that we get everyone of them =
170: * that is we want to circumvent loadbalancing and therefore not ask for the service by it's name but the url to get
171: * the exact service we want.
172: *
173: * @param serviceInfo
174: * @return
175: */
176: public Object getTopicService(ServiceInfo serviceInfo) {
177: // get the service locally if we have it so we don't go through any remoting
178: RemotedServiceRegistry remoteRegistry = KSBServiceLocator
179: .getServiceDeployer();
180: Object service = remoteRegistry.getService(serviceInfo
181: .getQname(), serviceInfo.getEndpointUrl());
182: if (service != null) {
183: return service;
184: }
185: RemoteResourceServiceLocator remoteResourceLocator = KSBResourceLoaderFactory
186: .getRemoteResourceLocator();
187: return remoteResourceLocator.getService(serviceInfo.getQname(),
188: serviceInfo.getEndpointUrl());
189: }
190:
191: /**
192: * Because this is a queue we just need to grab one.
193: *
194: * @param serviceInfo
195: * @return
196: */
197: public Object getQueueService(ServiceInfo serviceInfo) {
198: RemotedServiceRegistry remoteRegistry = KSBServiceLocator
199: .getServiceDeployer();
200: Object service = remoteRegistry.getLocalService(serviceInfo
201: .getQname());
202: if (service != null) {
203: return service;
204: }
205: // get client to remote service if not in our local repository
206: return GlobalResourceLoader.getService(serviceInfo.getQname());
207: }
208:
209: /**
210: * Used in case the thread that dumped this work into the queue is waiting for the work to be done to continue
211: * processing.
212: *
213: * @param callback
214: */
215: public void notifyOnCallback(AsynchronousCall methodCall,
216: Object callResult) {
217: AsynchronousCallback callback = methodCall.getCallback();
218: notifyOnCallback(methodCall, callback, callResult);
219: }
220:
221: public void notifyGlobalCallbacks(AsynchronousCall methodCall,
222: Object callResult) {
223: if (LOG.isDebugEnabled()) {
224: LOG.debug("Notifying global callbacks");
225: }
226: for (AsynchronousCallback globalCallBack : GlobalCallbackRegistry
227: .getCallbacks()) {
228: notifyOnCallback(methodCall, globalCallBack, callResult);
229: }
230: }
231:
232: public void notifyOnCallback(AsynchronousCall methodCall,
233: AsynchronousCallback callback, Object callResult) {
234: if (callback != null) {
235: try {
236: synchronized (callback) {
237: if (LOG.isDebugEnabled()) {
238: LOG.debug("Notifying callback " + callback
239: + " with callResult " + callResult);
240: }
241: callback.notifyAll();
242: if (callResult instanceof Serializable
243: || callResult == null) {
244: callback.callback((Serializable) callResult,
245: methodCall);
246: } else {
247: // may never happen
248: LOG
249: .warn("Attempted to call callback with non-serializable object.");
250: }
251: }
252: } catch (Throwable t) {
253: LOG.error("Caught throwable from callback object "
254: + callback.getClass(), t);
255: }
256: }
257: }
258:
259: public PersistedMessage getMessage() {
260: return this .message;
261: }
262:
263: public void setMessage(PersistedMessage message) {
264: this .message = message;
265: }
266:
267: public Object getService() {
268: return this .service;
269: }
270:
271: public AsynchronousCall getMethodCall() {
272: return this .methodCall;
273: }
274:
275: public void setMethodCall(AsynchronousCall methodCall) {
276: this .methodCall = methodCall;
277: }
278:
279: public void setService(Object service) {
280: this.service = service;
281: }
282: }
|