001: /*
002: * Copyright 2005-2006 The Kuali Foundation.
003: *
004: *
005: * Licensed under the Educational Community License, Version 1.0 (the "License"); you may not use this file except in
006: * compliance with the License. You may obtain a copy of the License at
007: *
008: * http://www.opensource.org/licenses/ecl1.php
009: *
010: * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS
011: * IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific
012: * language governing permissions and limitations under the License.
013: */
014: package edu.iu.uis.eden.messaging.web;
015:
016: import java.io.IOException;
017: import java.sql.Timestamp;
018: import java.util.ArrayList;
019: import java.util.Calendar;
020: import java.util.Collections;
021: import java.util.Comparator;
022: import java.util.Date;
023: import java.util.HashMap;
024: import java.util.Iterator;
025: import java.util.List;
026: import java.util.Map;
027:
028: import javax.servlet.ServletException;
029: import javax.servlet.http.HttpServletRequest;
030: import javax.servlet.http.HttpServletResponse;
031: import javax.xml.namespace.QName;
032:
033: import org.apache.commons.collections.comparators.ComparableComparator;
034: import org.apache.commons.lang.StringUtils;
035: import org.apache.commons.lang.math.NumberUtils;
036: import org.apache.struts.action.ActionForm;
037: import org.apache.struts.action.ActionForward;
038: import org.apache.struts.action.ActionMapping;
039: import org.apache.struts.action.ActionMessage;
040: import org.apache.struts.action.ActionMessages;
041: import org.kuali.bus.services.KSBServiceLocator;
042: import org.kuali.rice.RiceConstants;
043: import org.kuali.rice.core.Core;
044: import org.kuali.rice.util.RiceUtilities;
045:
046: import edu.iu.uis.eden.EdenConstants;
047: import edu.iu.uis.eden.KEWServiceLocator;
048: import edu.iu.uis.eden.WorkflowServiceErrorException;
049: import edu.iu.uis.eden.messaging.AsynchronousCall;
050: import edu.iu.uis.eden.messaging.MessageFetcher;
051: import edu.iu.uis.eden.messaging.MessageQueueService;
052: import edu.iu.uis.eden.messaging.MessageServiceInvoker;
053: import edu.iu.uis.eden.messaging.PersistedMessage;
054: import edu.iu.uis.eden.messaging.RemoteResourceServiceLocator;
055: import edu.iu.uis.eden.messaging.ServiceInfo;
056: import edu.iu.uis.eden.messaging.callforwarding.ForwardedCallHandler;
057: import edu.iu.uis.eden.messaging.resourceloading.KSBResourceLoaderFactory;
058: import edu.iu.uis.eden.web.WorkflowAction;
059:
060: /**
061: * Struts action for interacting with the queue of messages.
062: *
063: * @author Kuali Rice Team (kuali-rice@googlegroups.com)
064: */
065: public class MessageQueueAction extends WorkflowAction {
066:
067: private static final org.apache.log4j.Logger LOG = org.apache.log4j.Logger
068: .getLogger(MessageQueueAction.class);
069:
070: public ActionForward start(ActionMapping mapping, ActionForm form,
071: HttpServletRequest request, HttpServletResponse response)
072: throws IOException, ServletException {
073: return mapping.findForward("report");
074: }
075:
076: public ActionForward save(ActionMapping mapping, ActionForm form,
077: HttpServletRequest request, HttpServletResponse response)
078: throws Exception {
079: MessageQueueForm routeQueueForm = (MessageQueueForm) form;
080: save(routeQueueForm);
081:
082: Long routeQueueId = routeQueueForm.getMessageQueueFromForm()
083: .getRouteQueueId();
084: ActionMessages messages = new ActionMessages();
085: messages.add(ActionMessages.GLOBAL_MESSAGE, new ActionMessage(
086: "routequeue.RouteQueueService.saved"));
087: saveMessages(request, messages);
088:
089: routeQueueForm.setMessageId(null);
090: routeQueueForm.setMessageQueueFromDatabase(null);
091: routeQueueForm.setMessageQueueFromForm(null);
092: routeQueueForm.setShowEdit("yes");
093: routeQueueForm.setMethodToCall("");
094: establishRequiredState(request, form);
095: routeQueueForm.setMessageId(routeQueueId);
096: routeQueueForm.setMessageQueueFromForm(routeQueueForm
097: .getMessageQueueFromDatabase());
098: routeQueueForm.setNewQueueDate(routeQueueForm
099: .getExistingQueueDate());
100: routeQueueForm.getMessageQueueFromForm()
101: .setMethodCall(
102: unwrapPayload(routeQueueForm
103: .getMessageQueueFromForm()));
104: return mapping.findForward("report");
105: }
106:
107: public ActionForward saveAndResubmit(ActionMapping mapping,
108: ActionForm form, HttpServletRequest request,
109: HttpServletResponse response) throws Exception {
110: MessageQueueForm routeQueueForm = (MessageQueueForm) form;
111: PersistedMessage message = save(routeQueueForm);
112: KSBServiceLocator.getThreadPool().execute(
113: new MessageServiceInvoker(message));
114:
115: ActionMessages messages = new ActionMessages();
116: messages.add(ActionMessages.GLOBAL_MESSAGE, new ActionMessage(
117: "routequeue.RouteQueueService.queued"));
118: saveMessages(request, messages);
119:
120: routeQueueForm.setMessageId(null);
121: routeQueueForm.setMessageQueueFromDatabase(null);
122: routeQueueForm.setMessageQueueFromForm(null);
123: routeQueueForm.setShowEdit("yes");
124: routeQueueForm.setMethodToCall("");
125: establishRequiredState(request, form);
126: routeQueueForm.setMessageId(message.getRouteQueueId());
127: routeQueueForm.setMessageQueueFromForm(message);
128: routeQueueForm.setNewQueueDate(routeQueueForm
129: .getExistingQueueDate());
130: routeQueueForm.getMessageQueueFromForm().setMethodCall(
131: unwrapPayload(message));
132: return mapping.findForward("report");
133: }
134:
135: public ActionForward saveAndForward(ActionMapping mapping,
136: ActionForm form, HttpServletRequest request,
137: HttpServletResponse response) throws Exception {
138: MessageQueueForm routeQueueForm = (MessageQueueForm) form;
139: PersistedMessage message = save(routeQueueForm);
140: ForwardedCallHandler adminService = getAdminServiceToForwardTo(
141: message, routeQueueForm);
142: AsynchronousCall methodCall = message.getPayload()
143: .getMethodCall();
144: message.setMethodCall(methodCall);
145: adminService.handleCall(message);
146: KSBServiceLocator.getRouteQueueService().delete(message);
147:
148: ActionMessages messages = new ActionMessages();
149: messages.add(ActionMessages.GLOBAL_MESSAGE, new ActionMessage(
150: "routequeue.RouteQueueService.queued"));
151: saveMessages(request, messages);
152:
153: routeQueueForm.setMessageId(null);
154: routeQueueForm.setMessageQueueFromDatabase(null);
155: routeQueueForm.setMessageQueueFromForm(null);
156: routeQueueForm.setShowEdit("yes");
157: routeQueueForm.setMethodToCall("");
158: establishRequiredState(request, form);
159: routeQueueForm.setMessageId(message.getRouteQueueId());
160: routeQueueForm.setMessageQueueFromForm(message);
161: routeQueueForm.setNewQueueDate(routeQueueForm
162: .getExistingQueueDate());
163: routeQueueForm.getMessageQueueFromForm().setMethodCall(
164: unwrapPayload(message));
165: return mapping.findForward("report");
166: }
167:
168: private PersistedMessage save(MessageQueueForm routeQueueForm) {
169: Long routeQueueId = routeQueueForm.getMessageQueueFromForm()
170: .getRouteQueueId();
171: if ((routeQueueId == null) || (routeQueueId.longValue() <= 0)) {
172: throw new IllegalArgumentException(
173: "Invalid routeQueueId passed in. Cannot save");
174: }
175: // save the message
176: PersistedMessage existingMessage = KSBServiceLocator
177: .getRouteQueueService()
178: .findByRouteQueueId(routeQueueId);
179: PersistedMessage message = routeQueueForm
180: .getMessageQueueFromForm();
181: // copy the new values over
182: if (existingMessage == null) {
183: // TODO better error processing
184: throw new WorkflowServiceErrorException(
185: "Could locate the existing message, it may have already been processed.");
186: }
187: existingMessage.setQueuePriority(message.getQueuePriority());
188: existingMessage.setIpNumber(message.getIpNumber());
189: existingMessage.setLockVerNbr(message.getLockVerNbr());
190: existingMessage.setMessageEntity(message.getMessageEntity());
191: existingMessage.setMethodName(message.getMethodName());
192: existingMessage.setQueueStatus(message.getQueueStatus());
193: existingMessage.setRetryCount(message.getRetryCount());
194: existingMessage.setServiceName(message.getServiceName());
195: existingMessage.setValue1(message.getValue1());
196: existingMessage.setValue2(message.getValue2());
197: KSBServiceLocator.getRouteQueueService().save(existingMessage);
198: return existingMessage;
199: }
200:
201: private ForwardedCallHandler getAdminServiceToForwardTo(
202: PersistedMessage message, MessageQueueForm form) {
203: String ip = form.getIpAddress();
204: List<ServiceInfo> services = KSBServiceLocator
205: .getIPTableService().fetchAll();
206: for (ServiceInfo service : services) {
207: if (service.getQname().getLocalPart().equals(
208: QName.valueOf(message.getServiceName())
209: .getLocalPart()
210: + "-forwardHandler")
211: && service.getServerIp().equals(ip)) {
212: // retrieve a reference to the remote service
213: RemoteResourceServiceLocator remoteResourceLocator = KSBResourceLoaderFactory
214: .getRemoteResourceLocator();
215: ForwardedCallHandler handler = (ForwardedCallHandler) remoteResourceLocator
216: .getService(service.getQname(), service
217: .getEndpointUrl());
218: if (handler != null) {
219: return handler;
220: } else {
221: LOG
222: .warn("Failed to find forwarded call handler for service: "
223: + service.getQname().toString()
224: + " and endpoint URL: "
225: + service.getEndpointUrl());
226: }
227: }
228: }
229: throw new WorkflowServiceErrorException(
230: "Could not locate the BusAdminService for ip " + ip
231: + " in order to forward the message.");
232: }
233:
234: /**
235: * Performs a quick ReQueue of the indicated persisted message.
236: *
237: * The net effect of this requeue is to set the Date to now, and to reset the RetryCount to zero. The payload is not
238: * modified.
239: *
240: * @param message
241: * The populated message to be requeued.
242: */
243: protected void quickRequeueMessage(PersistedMessage message) {
244: message.setQueueDate(new Timestamp(Calendar.getInstance()
245: .getTimeInMillis()));
246: message.setRetryCount(new Integer(0));
247: getRouteQueueService().save(message);
248: }
249:
250: public ActionForward quickRequeueMessage(ActionMapping mapping,
251: ActionForm form, HttpServletRequest request,
252: HttpServletResponse response) throws Exception {
253: MessageQueueForm routeQueueForm = (MessageQueueForm) form;
254: if (routeQueueForm.getMessageQueueFromDatabase() == null) {
255: throw new IllegalArgumentException(
256: "No messageId passed in with the Request.");
257: }
258:
259: PersistedMessage message = routeQueueForm
260: .getMessageQueueFromDatabase();
261: quickRequeueMessage(message);
262: KSBServiceLocator.getThreadPool().execute(
263: new MessageServiceInvoker(message));
264:
265: ActionMessages messages = new ActionMessages();
266: messages.add(ActionMessages.GLOBAL_MESSAGE, new ActionMessage(
267: "routequeue.RouteQueueService.requeued"));
268: saveMessages(request, messages);
269:
270: routeQueueForm.setMessageQueueFromDatabase(null);
271: routeQueueForm.setMessageQueueFromForm(null);
272: routeQueueForm.setMessageId(null);
273: routeQueueForm.setMethodToCall("");
274:
275: // re-run the state method to load the full set of rows
276: establishRequiredState(request, form);
277: return mapping.findForward("report");
278: }
279:
280: public ActionForward edit(ActionMapping mapping, ActionForm form,
281: HttpServletRequest request, HttpServletResponse response)
282: throws IOException, ServletException {
283: MessageQueueForm routeQueueForm = (MessageQueueForm) form;
284: routeQueueForm.setShowEdit("yes");
285: routeQueueForm.setMessageQueueFromForm(routeQueueForm
286: .getMessageQueueFromDatabase());
287: routeQueueForm.setNewQueueDate(routeQueueForm
288: .getExistingQueueDate());
289: routeQueueForm.getMessageQueueFromForm()
290: .setMethodCall(
291: unwrapPayload(routeQueueForm
292: .getMessageQueueFromForm()));
293: return mapping.findForward("basic");
294: }
295:
296: public ActionForward view(ActionMapping mapping, ActionForm form,
297: HttpServletRequest request, HttpServletResponse response)
298: throws Exception {
299: MessageQueueForm routeQueueForm = (MessageQueueForm) form;
300: routeQueueForm.setShowEdit("no");
301: routeQueueForm.setMessageQueueFromForm(routeQueueForm
302: .getMessageQueueFromDatabase());
303: routeQueueForm.setNewQueueDate(routeQueueForm
304: .getExistingQueueDate());
305: AsynchronousCall messagePayload = unwrapPayload(routeQueueForm
306: .getMessageQueueFromDatabase());
307: routeQueueForm.getMessageQueueFromForm().setMethodCall(
308: messagePayload);
309: return mapping.findForward("payload");
310: }
311:
312: public ActionForward reset(ActionMapping mapping, ActionForm form,
313: HttpServletRequest request, HttpServletResponse response)
314: throws Exception {
315: MessageQueueForm routeQueueForm = (MessageQueueForm) form;
316: if (routeQueueForm.getShowEdit().equals("yes")) {
317: routeQueueForm.setMessageQueueFromForm(routeQueueForm
318: .getMessageQueueFromDatabase());
319: }
320: return mapping.findForward("basic");
321: }
322:
323: public ActionForward clear(ActionMapping mapping, ActionForm form,
324: HttpServletRequest request, HttpServletResponse response)
325: throws Exception {
326: MessageQueueForm routeQueueForm = (MessageQueueForm) form;
327: routeQueueForm.getMessageQueueFromForm().setQueuePriority(null);
328: routeQueueForm.getMessageQueueFromForm().setQueueStatus(null);
329: routeQueueForm.getMessageQueueFromForm().setQueueDate(null);
330: routeQueueForm.getMessageQueueFromForm()
331: .setExpirationDate(null);
332: routeQueueForm.getMessageQueueFromForm().setRetryCount(null);
333: routeQueueForm.getMessageQueueFromForm().setIpNumber(null);
334: routeQueueForm.getMessageQueueFromForm().setServiceName(null);
335: routeQueueForm.getMessageQueueFromForm().setMessageEntity(null);
336: routeQueueForm.getMessageQueueFromForm().setMethodName(null);
337: routeQueueForm.getMessageQueueFromForm().setPayload(null);
338: routeQueueForm.getMessageQueueFromForm().setMethodCall(null);
339: routeQueueForm.setExistingQueueDate(null);
340: routeQueueForm.setNewQueueDate(null);
341: return mapping.findForward("basic");
342: }
343:
344: public ActionForward delete(ActionMapping mapping, ActionForm form,
345: HttpServletRequest request, HttpServletResponse response)
346: throws Exception {
347: MessageQueueForm routeQueueForm = (MessageQueueForm) form;
348: routeQueueForm.setMessageQueueFromForm(routeQueueForm
349: .getMessageQueueFromDatabase());
350: routeQueueForm.setMessageQueueFromDatabase(null);
351: getRouteQueueService().delete(
352: routeQueueForm.getMessageQueueFromForm());
353: ActionMessages messages = new ActionMessages();
354: messages.add(ActionMessages.GLOBAL_MESSAGE, new ActionMessage(
355: "routequeue.RouteQueueService.deleted", routeQueueForm
356: .getMessageQueueFromForm().getRouteQueueId()
357: .toString()));
358: saveMessages(request, messages);
359: routeQueueForm.setMessageId(null);
360: establishRequiredState(request, form);
361: return mapping.findForward("report");
362: }
363:
364: public ActionForward executeMessageFetcher(ActionMapping mapping,
365: ActionForm form, HttpServletRequest request,
366: HttpServletResponse response) throws Exception {
367: MessageQueueForm routeQueueForm = (MessageQueueForm) form;
368: ActionMessages messages = new ActionMessages();
369: if (routeQueueForm.getMaxMessageFetcherMessages() == null
370: || routeQueueForm.getMaxMessageFetcherMessages() <= 0) {
371: messages
372: .add(
373: ActionMessages.GLOBAL_MESSAGE,
374: new ActionMessage(
375: "routequeue.RouteQueueService.invalidMessages",
376: routeQueueForm
377: .getMaxMessageFetcherMessages()));
378: }
379: if (!messages.isEmpty()) {
380: saveMessages(request, messages);
381: return mapping.findForward("report");
382: }
383: new MessageFetcher(routeQueueForm
384: .getMaxMessageFetcherMessages()).run();
385: return mapping.findForward("report");
386: }
387:
388: /**
389: * Sets up the expected state by retrieving the selected RouteQueue by RouteQueueId, and placing it in the
390: * ExistingRouteQueue member.
391: *
392: * Called by the super's Execute method on every request.
393: */
394: public ActionMessages establishRequiredState(
395: HttpServletRequest request, ActionForm form)
396: throws Exception {
397: MessageQueueForm routeQueueForm = (MessageQueueForm) form;
398: routeQueueForm.setMyIpAddress(RiceUtilities.getIpNumber());
399: routeQueueForm.setMyMessageEntity(Core
400: .getCurrentContextConfig().getProperty(
401: RiceConstants.MESSAGE_ENTITY));
402: routeQueueForm.setMessagePersistence(Core
403: .getCurrentContextConfig().getProperty(
404: RiceConstants.MESSAGE_PERSISTENCE));
405: routeQueueForm.setMessageDelivery(Core
406: .getCurrentContextConfig().getProperty(
407: RiceConstants.MESSAGE_DELIVERY));
408: routeQueueForm.setMessageOff(Core.getCurrentContextConfig()
409: .getProperty(RiceConstants.MESSAGING_OFF));
410: List<ServiceInfo> services = KSBServiceLocator
411: .getIPTableService().fetchAll();
412: if (routeQueueForm.getMessageId() != null) {
413: PersistedMessage rq = getRouteQueueService()
414: .findByRouteQueueId(routeQueueForm.getMessageId());
415: if (rq != null) {
416: // routeQueueForm.setExistingQueueDate(EdenConstants.getDefaultDateFormat().format(rq.getQueueDate()));
417: routeQueueForm.setExistingQueueDate(EdenConstants
418: .getDefaultDateFormat().format(new Date()));
419: routeQueueForm.setMessageQueueFromDatabase(rq);
420: // establish IP addresses where this message could safely be forwarded to
421: String serviceName = rq.getServiceName();
422: for (ServiceInfo serviceInfo : services) {
423: if (serviceInfo.getServiceName()
424: .equals(serviceName)) {
425: routeQueueForm.getIpAddresses().add(
426: new ValueLabelPair(serviceInfo
427: .getServerIp(), serviceInfo
428: .getServerIp()));
429: }
430: }
431: } else {
432: ActionMessages messages = new ActionMessages();
433: messages
434: .add(
435: ActionMessages.GLOBAL_MESSAGE,
436: new ActionMessage(
437: "messagequeue.RouteQueueService.queuedDocumentNotFound",
438: routeQueueForm.getMessageId()
439: .toString()));
440: return messages;
441: }
442: routeQueueForm.setMessageId(null);
443: } else if (!"clear".equalsIgnoreCase(request
444: .getParameter("methodToCall"))) {
445: List<PersistedMessage> queueEntries = findRouteQueues(
446: request, routeQueueForm, routeQueueForm
447: .getMaxRows() + 1);
448: if (queueEntries.size() > 0) {
449: Collections.sort(queueEntries, new Comparator() {
450: private Comparator comp = new ComparableComparator();
451:
452: public int compare(Object object1, Object object2) {
453: if (object1 == null && object2 == null) {
454: return 0;
455: } else if (object1 == null) {
456: return 1;
457: } else if (object2 == null) {
458: return -1;
459: }
460: Long id1 = ((PersistedMessage) object1)
461: .getRouteQueueId();
462: Long id2 = ((PersistedMessage) object2)
463: .getRouteQueueId();
464:
465: try {
466: return this .comp.compare(id1, id2);
467: } catch (Exception e) {
468: return 0;
469: }
470: }
471: });
472: }
473: routeQueueForm.setMessageQueueRows(queueEntries);
474: }
475: return null;
476: }
477:
478: protected List<PersistedMessage> findRouteQueues(
479: HttpServletRequest request,
480: MessageQueueForm routeQueueForm, int maxRows) {
481: List<PersistedMessage> routeQueues = new ArrayList<PersistedMessage>();
482:
483: // no filter applied
484: if (StringUtils.isBlank(routeQueueForm.getFilterApplied())) {
485: routeQueues.addAll(getRouteQueueService().findAll(maxRows));
486: }
487:
488: // one or more filters applied
489: else {
490: if (!StringUtils.isBlank(routeQueueForm
491: .getRouteQueueIdFilter())) {
492: if (!NumberUtils.isNumber(routeQueueForm
493: .getRouteQueueIdFilter())) {
494: // TODO better error handling here
495: throw new WorkflowServiceErrorException(
496: "Message Id must be a number.");
497: }
498: }
499:
500: Map<String, String> criteriaValues = new HashMap<String, String>();
501: String key = null;
502: String value = null;
503: String trimmedKey = null;
504: for (Iterator iter = request.getParameterMap().keySet()
505: .iterator(); iter.hasNext();) {
506: key = (String) iter.next();
507: if (key
508: .endsWith(EdenConstants.ROUTE_QUEUE_FILTER_SUFFIX)) {
509: value = request.getParameter(key);
510: if (StringUtils.isNotBlank(value)) {
511: trimmedKey = key
512: .substring(
513: 0,
514: key
515: .indexOf(EdenConstants.ROUTE_QUEUE_FILTER_SUFFIX));
516: criteriaValues.put(trimmedKey, value);
517: }
518: }
519: }
520: routeQueues.addAll(getRouteQueueService().findByValues(
521: criteriaValues, maxRows));
522: }
523: return routeQueues;
524: }
525:
526: private MessageQueueService getRouteQueueService() {
527: return (MessageQueueService) KEWServiceLocator
528: .getService(KEWServiceLocator.ROUTE_QUEUE_SRV);
529: }
530:
531: /**
532: * Extracts the payload from a PersistedMessage, attempts to convert it to the expected AsynchronousCall type, and
533: * returns it.
534: *
535: * Throws an IllegalArgumentException if the decoded payload isnt of the expected type.
536: *
537: * @param message
538: * The populated PersistedMessage object to extract the payload from.
539: * @return Returns the payload if one is present and it can be deserialized, otherwise returns null.
540: */
541: protected AsynchronousCall unwrapPayload(PersistedMessage message) {
542: String encodedPayload = message.getPayload().getPayload();
543: if (StringUtils.isBlank(encodedPayload)) {
544: return null;
545: }
546: Object decodedPayload = null;
547: if (encodedPayload != null) {
548: decodedPayload = KSBServiceLocator.getMessageHelper()
549: .deserializeObject(encodedPayload);
550: }
551: // fail fast if its not the expected type of AsynchronousCall
552: if ((decodedPayload != null)
553: && !(decodedPayload instanceof AsynchronousCall)) {
554: throw new IllegalArgumentException(
555: "PersistedMessage payload was not of the expected class. "
556: + "Expected was ["
557: + AsynchronousCall.class.getName()
558: + "], actual was: ["
559: + decodedPayload.getClass().getName() + "]");
560: }
561: return (AsynchronousCall) decodedPayload;
562: }
563:
564: }
|