0001: /*
0002: * MessageService: The message service daemon
0003: * Copyright (C) 2007 Rift IT Contracting
0004: *
0005: * This library is free software; you can redistribute it and/or
0006: * modify it under the terms of the GNU Lesser General Public
0007: * License as published by the Free Software Foundation; either
0008: * version 2.1 of the License, or (at your option) any later version.
0009: *
0010: * This library is distributed in the hope that it will be useful,
0011: * but WITHOUT ANY WARRANTY; without even the implied warranty of
0012: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
0013: * Lesser General Public License for more details.
0014: *
0015: * You should have received a copy of the GNU Lesser General Public
0016: * License along with this library; if not, write to the Free Software
0017: * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
0018: *
0019: * MessageProcessor.java
0020: */
0021:
0022: // package path
0023: package com.rift.coad.daemon.messageservice;
0024:
0025: // java imports
0026: import com.rift.coad.daemon.messageservice.named.NamedQueueManagerImpl;
0027: import java.lang.reflect.Method;
0028: import java.util.ArrayList;
0029: import java.util.Date;
0030: import java.util.HashSet;
0031: import java.util.List;
0032: import java.util.Set;
0033: import java.util.StringTokenizer;
0034: import javax.naming.Context;
0035: import javax.naming.InitialContext;
0036:
0037: // logging import
0038: import org.apache.log4j.Logger;
0039:
0040: // coadunation imports
0041: import com.rift.coad.daemon.messageservice.message.MessageImpl;
0042: import com.rift.coad.daemon.messageservice.message.RPCMessageImpl;
0043: import com.rift.coad.daemon.messageservice.message.MessageManagerImpl;
0044: import com.rift.coad.daemon.messageservice.message.MessageManagerFactory;
0045: import com.rift.coad.daemon.messageservice.named.NamedMemoryQueue;
0046: import com.rift.coad.daemon.servicebroker.ServiceBroker;
0047: import com.rift.coad.lib.common.RandomGuid;
0048: import com.rift.coad.lib.configuration.Configuration;
0049: import com.rift.coad.lib.configuration.ConfigurationFactory;
0050: import com.rift.coad.lib.deployment.DeploymentMonitor;
0051: import com.rift.coad.lib.naming.NamingDirector;
0052: import com.rift.coad.lib.naming.NamingConstants;
0053: import com.rift.coad.lib.interceptor.InterceptorWrapper;
0054: import com.rift.coad.lib.interceptor.credentials.Session;
0055: import com.rift.coad.lib.thread.pool.Task;
0056: import com.rift.coad.lib.thread.pool.ThreadPoolManager;
0057: import com.rift.coad.lib.thread.pool.PoolException;
0058: import com.rift.coad.lib.deployment.BeanInfo;
0059: import com.rift.coad.lib.deployment.bean.BeanConnector;
0060: import com.rift.coad.lib.deployment.bean.BeanManager;
0061: import com.rift.coad.lib.deployment.jmxbean.JMXBeanConnector;
0062: import com.rift.coad.lib.deployment.jmxbean.JMXBeanManager;
0063: import com.rift.coad.util.connection.ConnectionManager;
0064: import com.rift.coad.util.transaction.UserTransactionWrapper;
0065:
0066: /**
0067: * This object is responsible for processing the message that are sent to the
0068: * message service.
0069: *
0070: * @author Brett Chaldecott
0071: */
0072: public class MessageProcessor extends InterceptorWrapper implements
0073: Task {
0074:
0075: // class constants
0076: private final static long BACK_OFF_PERIOD = 1000;
0077: private final static String RETRY_DELAY = "retry_delay";
0078: private final static long DEFAULT_RETRY_DELAY = 60000;
0079: private final static String PARENT_INSTANCE = "../";
0080:
0081: // the logger reference
0082: protected static Logger log = Logger
0083: .getLogger(MessageProcessor.class.getName());
0084:
0085: // private member variables
0086: private Context context = null;
0087: private UserTransactionWrapper utw = null;
0088: private MessageProcessInfo messageProcessInfo = null;
0089: private NamingDirector namingDirector = null;
0090: private long delay = 0;
0091:
0092: /**
0093: * Creates a new instance of MessageProcessor
0094: *
0095: * @exception Exception
0096: */
0097: public MessageProcessor() throws Exception {
0098: try {
0099: context = new InitialContext();
0100: utw = new UserTransactionWrapper();
0101: namingDirector = NamingDirector.getInstance();
0102: Configuration config = ConfigurationFactory.getInstance()
0103: .getConfig(MessageProcessor.class);
0104: delay = config.getLong(RETRY_DELAY, DEFAULT_RETRY_DELAY);
0105: } catch (Exception ex) {
0106: log.error("Failed init the message processor : "
0107: + ex.getMessage(), ex);
0108: throw new Exception("Failed init the message processor : "
0109: + ex.getMessage(), ex);
0110: }
0111: }
0112:
0113: /**
0114: * The implementation of the process method used by the coadunation
0115: * threading pool.
0116: *
0117: * @param poolManager The reference to the thread pool manager.
0118: * @exception Exception
0119: */
0120: public void process(ThreadPoolManager poolManager) throws Exception {
0121: DeploymentMonitor.getInstance().waitUntilInitDeployComplete();
0122: if (DeploymentMonitor.getInstance().isTerminated()) {
0123: return;
0124: }
0125: boolean foundMessage = getMessageManager();
0126: poolManager.releaseThread();
0127: if (foundMessage) {
0128: processMessage();
0129: }
0130: }
0131:
0132: /**
0133: * This method retrieves the next message from the message queue manager
0134: * for processing.
0135: *
0136: * @return True if a message was found false if not.
0137: */
0138: private boolean getMessageManager() {
0139: try {
0140: Date currentTime = new Date();
0141: Date delayTime = new Date();
0142: messageProcessInfo = MessageQueueManager.getInstance()
0143: .getNextMessage(currentTime);
0144: if (messageProcessInfo != null) {
0145: return true;
0146: }
0147: long difference = delayTime.getTime()
0148: - currentTime.getTime();
0149: if ((delayTime.getTime() == currentTime.getTime())
0150: || (difference > BACK_OFF_PERIOD)
0151: || (difference < 0)) {
0152: ProcessMonitor.getInstance().monitor(BACK_OFF_PERIOD);
0153: } else {
0154: ProcessMonitor.getInstance().monitor(difference);
0155: }
0156: } catch (Exception ex) {
0157: log.error("Failed to retrieve a message : "
0158: + ex.getMessage(), ex);
0159: }
0160: return false;
0161: }
0162:
0163: /**
0164: * This method is called to process a message.
0165: */
0166: private void processMessage() {
0167: Message message = null;
0168: try {
0169: message = getMessage();
0170: if (message.getState() == Message.UNDELIVERED) {
0171: processUndelivered(message);
0172: } else if (message.getState() == Message.DELIVERED) {
0173: processDelivered(message);
0174: } else if (message.getState() == Message.UNDELIVERABLE) {
0175: processUndeliverable(message);
0176: }
0177: } catch (Exception ex) {
0178: log.error("Failed to process the message : "
0179: + ex.getMessage(), ex);
0180: if (message == null) {
0181: pushMessage(messageProcessInfo);
0182: } else {
0183: pushMessage(message, messageProcessInfo);
0184: }
0185: }
0186: }
0187:
0188: /**
0189: * This method returns the message contained within.
0190: *
0191: * @return Message The message.
0192: * @exception MessageServiceException
0193: */
0194: private Message getMessage() throws MessageServiceException {
0195: Message message = null;
0196: try {
0197: utw.begin();
0198: MessageManager messageManager = messageProcessInfo
0199: .getMessageManager();
0200: message = messageManager.getMessage();
0201: utw.commit();
0202: } catch (Exception ex) {
0203: log.error("Failed to retrieve the message : "
0204: + ex.getMessage(), ex);
0205: throw new MessageServiceException(
0206: "Failed to retrieve the message : "
0207: + ex.getMessage(), ex);
0208: } finally {
0209: utw.release();
0210: }
0211:
0212: return message;
0213: }
0214:
0215: /**
0216: * This method will process the message.
0217: *
0218: * @param message The message to process.
0219: * @exception MessageServiceException
0220: */
0221: private void processUndelivered(Message message)
0222: throws MessageServiceException {
0223: try {
0224: if (message.getMessageType() == Message.POINT_TO_POINT) {
0225: if (checkIfTargetLocal(message)
0226: && checkIfMessageInQueue(message)) {
0227: deliverMessage(message);
0228: }
0229: } else if (message.getMessageType() == Message.POINT_TO_SERVICE) {
0230: if (checkIfServiceLocal(message)
0231: && checkIfMessageInQueue(message)) {
0232: deliverMessage(message);
0233: }
0234: } else if (message.getMessageType() == Message.POINT_TO_MULTI_SERVICE) {
0235: if (!namingDirector.isPrimary()) {
0236: deliverToParent(message);
0237: } else {
0238: cloneMessageForServices(message);
0239: }
0240: }
0241: } catch (Exception ex) {
0242: log.error("Failed to process the undelivered message : "
0243: + ex.getMessage(), ex);
0244: throw new MessageServiceException(
0245: "Failed to process the undelivered message : "
0246: + ex.getMessage(), ex);
0247: }
0248: }
0249:
0250: /**
0251: * This method will process the message.
0252: *
0253: * @param message The message to process.
0254: * @exception MessageServiceException
0255: */
0256: private void processDelivered(Message message)
0257: throws MessageServiceException {
0258: try {
0259: if (checkIfReplyLocal(message)
0260: && checkIfReplyMessageInQueue(message)) {
0261: deliverReplyMessage(message);
0262: }
0263: } catch (Exception ex) {
0264: log.error("Failed to process the undelivered message : "
0265: + ex.getMessage(), ex);
0266: throw new MessageServiceException(
0267: "Failed to process the undelivered message : "
0268: + ex.getMessage(), ex);
0269: }
0270: }
0271:
0272: /**
0273: * This method will process the undeliverable message.
0274: *
0275: * @param message The message to process.
0276: * @exception MessageServiceException
0277: */
0278: private void processUndeliverable(Message message)
0279: throws MessageServiceException {
0280: try {
0281: if (checkIfReplyLocal(message)) {
0282: utw.begin();
0283: if (NamedQueueManagerImpl.getInstance()
0284: .checkForNamedQueue(
0285: MessageQueueManager.DEAD_LETTER, true)) {
0286: log.info("Assign message to dead letter queue.");
0287: messageProcessInfo.getMessageQueue().removeMessage(
0288: message.getMessageId());
0289: ((MessageManagerImpl) messageProcessInfo
0290: .getMessageManager())
0291: .assignToQueue(MessageQueueManager.DEAD_LETTER);
0292: NamedMemoryQueue.getInstance(
0293: MessageQueueManager.DEAD_LETTER)
0294: .addMessage(
0295: messageProcessInfo
0296: .getMessageManager());
0297: log
0298: .info("Added the value to the dead letter queue");
0299: } else {
0300: log
0301: .error("Failed to add to the dead letter queue.");
0302: }
0303: utw.commit();
0304: }
0305: } catch (Exception ex) {
0306: log.error("Failed to process the Undeliverable message : "
0307: + ex.getMessage(), ex);
0308: throw new MessageServiceException(
0309: "Failed to process the Undeliverable message : "
0310: + ex.getMessage(), ex);
0311: } finally {
0312: utw.release();
0313: }
0314: }
0315:
0316: /**
0317: * This method pushes the message back onto the queue from which it was
0318: * retrieved.
0319: *
0320: * @param messageProcessInfo The processing information for this thread.
0321: */
0322: private void pushMessage(MessageProcessInfo messageProcessInfo) {
0323: try {
0324: messageProcessInfo.getMessageQueue().pushBackMessage(
0325: messageProcessInfo.getMessageManager());
0326: ProcessMonitor.getInstance().notifyProcessor();
0327: } catch (Exception ex) {
0328: log.error(
0329: "Failed to push the message manager onto a queue : "
0330: + ex.getMessage(), ex);
0331: }
0332: }
0333:
0334: /**
0335: * This method pushes the message back onto the queue from which it was
0336: * retrieved.
0337: *
0338: * @param message The message to push back.
0339: * @param messageProcessInfo The processing information for this thread.
0340: */
0341: private void pushMessage(Message message,
0342: MessageProcessInfo messageProcessInfo) {
0343: try {
0344: try {
0345: utw.begin();
0346: Date nextDate = new Date();
0347: nextDate.setTime(nextDate.getTime() + delay);
0348: ((MessageImpl) message).setNextProcessDate(nextDate);
0349: messageProcessInfo.getMessageManager().updateMessage(
0350: message);
0351: utw.commit();
0352: } catch (Exception ex) {
0353: log.error("Failed to process the message : "
0354: + ex.getMessage(), ex);
0355: } finally {
0356: utw.release();
0357: }
0358: messageProcessInfo.getMessageQueue().pushBackMessage(
0359: messageProcessInfo.getMessageManager());
0360: ProcessMonitor.getInstance().notifyProcessor();
0361: } catch (Exception ex) {
0362: log.error(
0363: "Failed to push the message manager onto a queue : "
0364: + ex.getMessage(), ex);
0365: }
0366: }
0367:
0368: /**
0369: * This method checks if the target this message is going to is local to
0370: * this Coadunation Instance.
0371: *
0372: * @return TRUE if local, FALSE if not.
0373: * @param message The message to perform the test on.
0374: */
0375: private boolean checkIfTargetLocal(Message message)
0376: throws MessageServiceException {
0377: try {
0378: String target = message.getTarget();
0379: if (target == null) {
0380: message.addError(Message.ERROR,
0381: "There is no target for this message");
0382: initUndeliverableProcess(message);
0383: return false;
0384: }
0385:
0386: // check if this fall withing this part of the tree or below
0387: String jndiBase = NamingDirector.getInstance()
0388: .getJNDIBase()
0389: + "/";
0390: String parentUrl = NamingDirector.getInstance()
0391: .getPrimaryJNDIUrl();
0392: String instanceURL = NamingDirector.getInstance()
0393: .getInstanceId()
0394: + "/";
0395: int pos = target.indexOf(jndiBase);
0396: int instancePos = target.indexOf(instanceURL);
0397: if ((((target.indexOf(parentUrl)) != -1) && (target
0398: .indexOf(jndiBase) == -1))
0399: || (target.indexOf(PARENT_INSTANCE) == 0)
0400: || ((target
0401: .indexOf(NamingConstants.JNDI_NETWORK_PREFIX) == 0) && !NamingDirector
0402: .getInstance().isPrimary())) {
0403: deliverToParent(message);
0404: return false;
0405: } else if (((instancePos != -1) && (target.indexOf(
0406: NamingConstants.SUBCONTEXT,
0407: (instancePos + instanceURL.length())) != -1))
0408: || (target.indexOf(NamingConstants.SUBCONTEXT) == 0)) {
0409: deliverToChild(message.getTarget(), message);
0410: return false;
0411: } else if (instancePos != -1) {
0412: utw.begin();
0413: target = target.substring(instancePos
0414: + instanceURL.length());
0415: message.setTarget(target);
0416: messageProcessInfo.getMessageManager().updateMessage(
0417: message);
0418: utw.commit();
0419: messageProcessInfo.getMessageQueue().pushBackMessage(
0420: messageProcessInfo.getMessageManager());
0421: ProcessMonitor.getInstance().notifyProcessor();
0422: return false;
0423: }
0424: return true;
0425: } catch (MessageServiceException ex) {
0426: throw ex;
0427: } catch (Exception ex) {
0428: log.error("Failed to check the message is local : "
0429: + ex.getMessage(), ex);
0430: throw new MessageServiceException(
0431: "Failed to check the message is local : "
0432: + ex.getMessage(), ex);
0433: } finally {
0434: utw.release();
0435: }
0436: }
0437:
0438: /**
0439: * This method checks if the target this message is going to is local to
0440: * this Coadunation Instance.
0441: *
0442: * @return TRUE if local, FALSE if not.
0443: * @param message The message to perform the test on.
0444: */
0445: private boolean checkIfReplyLocal(Message message)
0446: throws MessageServiceException {
0447: try {
0448: String reply = message.getReplyTo();
0449: if (reply == null) {
0450: reply = message.getFrom();
0451: if (reply == null) {
0452: message.addError(Message.ERROR,
0453: "There is no reply for this message");
0454: initUndeliverableProcess(message);
0455: return false;
0456: }
0457: }
0458:
0459: // check if this fall withing this part of the tree or below
0460: String jndiBase = NamingDirector.getInstance()
0461: .getJNDIBase()
0462: + "/";
0463: String parentUrl = NamingDirector.getInstance()
0464: .getPrimaryJNDIUrl();
0465: String instanceURL = NamingDirector.getInstance()
0466: .getInstanceId()
0467: + "/";
0468: int pos = reply.indexOf(jndiBase);
0469: int instancePos = reply.indexOf(instanceURL);
0470: if ((((reply.indexOf(parentUrl)) != -1) && (reply
0471: .indexOf(jndiBase) == -1))
0472: || (reply.indexOf(PARENT_INSTANCE) == 0)
0473: || ((reply
0474: .indexOf(NamingConstants.JNDI_NETWORK_PREFIX) == 0) && !NamingDirector
0475: .getInstance().isPrimary())) {
0476: deliverToParent(message);
0477: return false;
0478: } else if (((instancePos != -1) && (reply.indexOf(
0479: NamingConstants.SUBCONTEXT,
0480: (instancePos + instanceURL.length())) != -1))
0481: || (reply.indexOf(NamingConstants.SUBCONTEXT) == 0)) {
0482: deliverToChild(reply, message);
0483: return false;
0484: } else if (instancePos != -1) {
0485: utw.begin();
0486: reply = reply.substring(instancePos
0487: + instanceURL.length());
0488: if (message.getReplyTo() != null) {
0489: message.setReplyTo(reply);
0490: } else {
0491: message.setFrom(reply);
0492: }
0493: messageProcessInfo.getMessageManager().updateMessage(
0494: message);
0495: utw.commit();
0496: messageProcessInfo.getMessageQueue().pushBackMessage(
0497: messageProcessInfo.getMessageManager());
0498: ProcessMonitor.getInstance().notifyProcessor();
0499: return false;
0500: }
0501: return true;
0502: } catch (MessageServiceException ex) {
0503: throw ex;
0504: } catch (Exception ex) {
0505: log.error("Failed to check the message is local : "
0506: + ex.getMessage(), ex);
0507: throw new MessageServiceException(
0508: "Failed to check the message is local : "
0509: + ex.getMessage(), ex);
0510: }
0511: }
0512:
0513: /**
0514: * This method checks if the target this message is going to is local to
0515: * this Coadunation Instance.
0516: *
0517: * @return TRUE if local, FALSE if not.
0518: * @param message The message to perform the test on.
0519: */
0520: private boolean checkIfServiceLocal(Message message)
0521: throws MessageServiceException {
0522: try {
0523: String target = message.getTarget();
0524: if (target != null) {
0525: return checkIfTargetLocal(message);
0526: }
0527:
0528: String[] services = message.getServices();
0529: if (services == null) {
0530: message.addError(Message.ERROR,
0531: "There are no services for this message");
0532: initUndeliverableProcess(message);
0533: return false;
0534: }
0535: List serviceList = new ArrayList();
0536: for (int index = 0; index < services.length; index++) {
0537: serviceList.add(services[index]);
0538: }
0539: ServiceBroker serviceBroker = (ServiceBroker) ConnectionManager
0540: .getInstance().getConnection(ServiceBroker.class,
0541: ServiceBroker.JNDI_URL);
0542: String service = serviceBroker
0543: .getServiceProvider(serviceList);
0544: if (service.length() != 0) {
0545: message.setTarget(service);
0546: utw.begin();
0547: messageProcessInfo.getMessageManager().updateMessage(
0548: message);
0549: utw.commit();
0550: messageProcessInfo.getMessageQueue().pushBackMessage(
0551: messageProcessInfo.getMessageManager());
0552: } else {
0553: deliverToParent(message);
0554: }
0555: } catch (MessageServiceException ex) {
0556: throw ex;
0557: } catch (Exception ex) {
0558: log.error("Failed to check if the services are local : "
0559: + ex.getMessage(), ex);
0560: throw new MessageServiceException(
0561: "Failed to check if the services are local : "
0562: + ex.getMessage(), ex);
0563: } finally {
0564: utw.release();
0565: }
0566: return false;
0567: }
0568:
0569: /**
0570: * This method checks if the target this message is going to is local to
0571: * this Coadunation Instance.
0572: *
0573: * @return TRUE if local, FALSE if not.
0574: * @param message The message to perform the test on.
0575: */
0576: private boolean checkIfMessageInQueue(Message message)
0577: throws MessageServiceException {
0578: try {
0579: if (messageProcessInfo.getMessageQueue().getName().equals(
0580: message.getTarget())) {
0581: return true;
0582: }
0583: utw.begin();
0584: if (message.getTarget().equals(
0585: MessageServiceManagerMBean.JNDI_URL)
0586: && (message.getTargetNamedQueue() != null)) {
0587: if (false == NamedQueueManagerImpl.getInstance()
0588: .checkForNamedQueue(
0589: message.getTargetNamedQueue(), false)) {
0590: utw.release();
0591: message.addError(Message.ERROR, "The named queue ["
0592: + message.getTargetNamedQueue()
0593: + "] does not exist.");
0594: initUndeliverableProcess(message);
0595: return false;
0596: }
0597: messageProcessInfo.getMessageQueue().removeMessage(
0598: message.getMessageId());
0599: ((MessageManagerImpl) messageProcessInfo
0600: .getMessageManager()).assignToQueue(message
0601: .getTargetNamedQueue());
0602: NamedMemoryQueue.getInstance(
0603: message.getTargetNamedQueue()).addMessage(
0604: messageProcessInfo.getMessageManager());
0605: } else {
0606: MessageQueue messageQueue = MessageQueueManager
0607: .getInstance().getQueue(message.getTarget());
0608: messageProcessInfo.getMessageQueue().removeMessage(
0609: message.getMessageId());
0610: ((MessageManagerImpl) messageProcessInfo
0611: .getMessageManager()).assignToQueue(message
0612: .getTarget());
0613: messageQueue.addMessage(messageProcessInfo
0614: .getMessageManager());
0615: }
0616: utw.commit();
0617: return false;
0618: } catch (Exception ex) {
0619: log.error(
0620: "Failed to check the target : " + ex.getMessage(),
0621: ex);
0622: throw new MessageServiceException(
0623: "Failed to check the target : " + ex.getMessage(),
0624: ex);
0625: } finally {
0626: utw.release();
0627: }
0628: }
0629:
0630: /**
0631: * Check if this message is in the correct queue.
0632: *
0633: * @return TRUE if local, FALSE if not.
0634: * @param message The message to perform the test on.
0635: * @exception MessageServiceException
0636: */
0637: private boolean checkIfReplyMessageInQueue(Message message)
0638: throws MessageServiceException {
0639: try {
0640: String reply = message.getReplyTo();
0641: if (reply == null) {
0642: reply = message.getFrom();
0643: if (reply == null) {
0644: message.addError(Message.ERROR,
0645: "There is no reply for this message");
0646: initUndeliverableProcess(message);
0647: return false;
0648: }
0649: }
0650:
0651: if (messageProcessInfo.getMessageQueue().getName().equals(
0652: reply)) {
0653: return true;
0654: }
0655: utw.begin();
0656: if (reply.equals(MessageServiceManagerMBean.JNDI_URL)
0657: && (message.getTargetNamedQueue() != null)) {
0658: if (false == NamedQueueManagerImpl.getInstance()
0659: .checkForNamedQueue(
0660: message.getReplyNamedQueue(), false)) {
0661: utw.release();
0662: message.addError(Message.ERROR, "The named queue ["
0663: + message.getReplyNamedQueue()
0664: + "] does not exist.");
0665: initUndeliverableProcess(message);
0666: return false;
0667: }
0668: messageProcessInfo.getMessageQueue().removeMessage(
0669: message.getMessageId());
0670: ((MessageManagerImpl) messageProcessInfo
0671: .getMessageManager()).assignToQueue(message
0672: .getReplyNamedQueue());
0673: NamedMemoryQueue.getInstance(
0674: message.getTargetNamedQueue()).addMessage(
0675: messageProcessInfo.getMessageManager());
0676: } else {
0677: MessageQueue messageQueue = MessageQueueManager
0678: .getInstance().getQueue(reply);
0679: messageProcessInfo.getMessageQueue().removeMessage(
0680: message.getMessageId());
0681: ((MessageManagerImpl) messageProcessInfo
0682: .getMessageManager()).assignToQueue(reply);
0683: messageQueue.addMessage(messageProcessInfo
0684: .getMessageManager());
0685: }
0686: utw.commit();
0687: return false;
0688: } catch (Exception ex) {
0689: log.error("Failed to check the reply queue : "
0690: + ex.getMessage(), ex);
0691: throw new MessageServiceException(
0692: "Failed to check the reply queue : "
0693: + ex.getMessage(), ex);
0694: } finally {
0695: utw.release();
0696: }
0697: }
0698:
0699: /**
0700: * This method clones the original message so that it can be sent to all
0701: * the daemons suppliung the services.
0702: *
0703: * @param message The message to clone.
0704: * @exception MessageServiceException
0705: */
0706: private void cloneMessageForServices(Message message)
0707: throws MessageServiceException {
0708: try {
0709: String[] services = message.getServices();
0710: List serviceList = new ArrayList();
0711: for (int index = 0; index < services.length; index++) {
0712: serviceList.add(services[index]);
0713: }
0714: ServiceBroker serviceBroker = (ServiceBroker) ConnectionManager
0715: .getInstance().getConnection(ServiceBroker.class,
0716: ServiceBroker.JNDI_URL);
0717: List daemonList = serviceBroker
0718: .getServiceProviders(serviceList);
0719: if (daemonList.size() == 0) {
0720: message
0721: .addError(Message.ERROR,
0722: "There are no daemon providing these services.");
0723: initUndeliverableProcess(message);
0724: return;
0725: }
0726:
0727: utw.begin();
0728: for (int index = 0; index < daemonList.size(); index++) {
0729: MessageImpl newMessage = (MessageImpl) ((MessageImpl) message)
0730: .clone();
0731: newMessage.setMessageId(RandomGuid.getInstance()
0732: .getGuid());
0733: newMessage.setTarget((String) daemonList.get(index));
0734: newMessage.setMessageType(Message.POINT_TO_POINT);
0735: newMessage.setNextProcessDate(new Date());
0736: MessageManager messageManager = MessageManagerFactory
0737: .getInstance().getMessageManager(newMessage);
0738: MessageQueue messageQueue = MessageQueueManager
0739: .getInstance().getQueue(
0740: MessageQueueManager.UNSORTED);
0741: ((MessageManagerImpl) messageManager)
0742: .assignToQueue(MessageQueueManager.UNSORTED);
0743: messageQueue.addMessage(messageManager);
0744: }
0745: messageProcessInfo.getMessageManager().remove();
0746: messageProcessInfo.getMessageQueue().removeMessage(
0747: message.getMessageId());
0748: utw.commit();
0749: } catch (Exception ex) {
0750: log.error("Failed to clone the messages : "
0751: + ex.getMessage(), ex);
0752: throw new MessageServiceException(
0753: "Failed to clone the messages : " + ex.getMessage(),
0754: ex);
0755: } finally {
0756: utw.release();
0757: }
0758: }
0759:
0760: /**
0761: * This method delivers the message to another coadunation instance.
0762: *
0763: * @param message The reference to the message object.
0764: * @exception MessageServiceException
0765: */
0766: private void deliverToParent(Message message)
0767: throws MessageServiceException {
0768: try {
0769: if (namingDirector.isPrimary()) {
0770: message.addError(Message.ERROR,
0771: "The primary has no parent cannot go further.");
0772: initUndeliverableProcess(message);
0773: return;
0774: }
0775: Message messageCopy = (Message) ((MessageImpl) message)
0776: .clone();
0777: if (message.getTarget() != null) {
0778: messageCopy.setFrom(downJNDIUrl(message.getTarget()));
0779: }
0780: if (message.getReplyTo() != null) {
0781: messageCopy
0782: .setReplyTo(downJNDIUrl(message.getReplyTo()));
0783: }
0784: if (message.getFrom() != null) {
0785: messageCopy.setFrom(downJNDIUrl(message.getFrom()));
0786: }
0787:
0788: log.debug("Deliver message to parent : "
0789: + message.getMessageId());
0790: utw.begin();
0791: MessageStore messageStore = (MessageStore) ConnectionManager
0792: .getInstance().getConnection(
0793: MessageStore.class,
0794: namingDirector.getPrimaryJNDIUrl() + "/"
0795: + MessageStore.JNDI_URL);
0796: messageProcessInfo.getMessageManager().remove();
0797: messageProcessInfo.getMessageQueue().removeMessage(
0798: message.getMessageId());
0799: log
0800: .debug("The message has been deliverd to parent committing : "
0801: + message.getMessageId());
0802: IDLock.getInstance().lock(message.getMessageId());
0803: messageStore.addMessage(messageCopy);
0804: utw.commit();
0805: log.debug("Delivered message to parent : "
0806: + message.getMessageId());
0807: } catch (Exception ex) {
0808: log.error("Failed to deliver to a the parent : "
0809: + ex.getMessage(), ex);
0810: throw new MessageServiceException(
0811: "Failed to deliver to a the parent : "
0812: + ex.getMessage(), ex);
0813: } finally {
0814: utw.release();
0815: }
0816: }
0817:
0818: /**
0819: * This method delivers the message to another coadunation instance.
0820: *
0821: * @param target The target of the message.
0822: * @param message The reference to the message object.
0823: */
0824: private void deliverToChild(String target, Message message)
0825: throws MessageServiceException {
0826: try {
0827: Message messageCopy = (Message) ((MessageImpl) message)
0828: .clone();
0829: if (message.getTarget() != null) {
0830: messageCopy.setTarget(upJNDIUrl(target, message
0831: .getTarget()));
0832: }
0833: if (message.getReplyTo() != null) {
0834: messageCopy.setReplyTo(upJNDIUrl(target, message
0835: .getReplyTo()));
0836: }
0837: if (message.getFrom() != null) {
0838: messageCopy
0839: .setFrom(upJNDIUrl(target, message.getFrom()));
0840: }
0841:
0842: String subContextUrl = NamingConstants.SUBCONTEXT + "/";
0843: if (target.contains(namingDirector.getInstanceId())) {
0844: subContextUrl = namingDirector.getInstanceId() + "/"
0845: + NamingConstants.SUBCONTEXT + "/";
0846: }
0847: int pos = target.indexOf(subContextUrl);
0848: if (pos == -1) {
0849: message.addError(Message.ERROR,
0850: "Cannot find the sub reference information : "
0851: + target);
0852: initUndeliverableProcess(message);
0853: return;
0854: }
0855: String subContext = target.substring(pos
0856: + subContextUrl.length());
0857: subContext = NamingConstants.SUBCONTEXT + "/"
0858: + subContext.substring(0, subContext.indexOf('/'))
0859: + "/" + MessageStore.JNDI_URL;
0860: log.debug("Deliver message to child : "
0861: + message.getMessageId());
0862: utw.begin();
0863: MessageStore messageStore = (MessageStore) ConnectionManager
0864: .getInstance().getConnection(MessageStore.class,
0865: subContext);
0866: messageProcessInfo.getMessageManager().remove();
0867: messageProcessInfo.getMessageQueue().removeMessage(
0868: message.getMessageId());
0869: IDLock.getInstance().lock(message.getMessageId());
0870: messageStore.addMessage(messageCopy);
0871: log
0872: .debug("The message has been deliverd to child committing : "
0873: + message.getMessageId());
0874: utw.commit();
0875: log.debug("Delivered message to child : "
0876: + message.getMessageId());
0877: } catch (Exception ex) {
0878: log.error("Failed to deliver to a the child : "
0879: + ex.getMessage(), ex);
0880: throw new MessageServiceException(
0881: "Failed to deliver to a the child : "
0882: + ex.getMessage(), ex);
0883: } finally {
0884: utw.release();
0885: }
0886: }
0887:
0888: /**
0889: * This method delivers the message.
0890: *
0891: * @param message The message to deliver
0892: * @exception MessageServiceException
0893: */
0894: private void deliverMessage(Message message)
0895: throws MessageServiceException {
0896: initUserSession(message);
0897: try {
0898: if (message instanceof RPCMessage) {
0899: deliverRPCMessage(message.getTarget(), message);
0900: } else if (message instanceof TextMessage) {
0901: deliverTextMessage(message.getTarget(), message);
0902: }
0903: } finally {
0904: releaseUserSession();
0905: }
0906: }
0907:
0908: /**
0909: * This method delivers the reply message.
0910: *
0911: * @param message The message to deliver
0912: * @exception MessageServiceException
0913: */
0914: private void deliverReplyMessage(Message message)
0915: throws MessageServiceException {
0916: initUserSession(message);
0917: try {
0918: String reply = message.getReplyTo();
0919: if (reply == null) {
0920: reply = message.getFrom();
0921: if (reply == null) {
0922: message.addError(Message.ERROR,
0923: "There is no reply for this message");
0924: initUndeliverableProcess(message);
0925: return;
0926: }
0927: }
0928: if (message instanceof RPCMessage) {
0929: deliverReplyRPCMessage(reply, message);
0930: } else if (message instanceof TextMessage) {
0931: deliverReplyTextMessage(reply, message);
0932: }
0933: } finally {
0934: releaseUserSession();
0935: }
0936: }
0937:
0938: /**
0939: * This method delivers the rpc message to its target.
0940: *
0941: * @param message The message to deliver.
0942: * @exception MessageServiceException
0943: */
0944: private void deliverRPCMessage(String target, Message message)
0945: throws MessageServiceException {
0946: ClassLoader original = null;
0947: try {
0948: Object ref = null;
0949: if (((ref = BeanConnector.getInstance().getBean(target)) == null)
0950: && ((ref = JMXBeanConnector.getInstance()
0951: .getJMXBean(target)) == null)) {
0952: message.addError(Message.ERROR, "The target [" + target
0953: + "] does not exist.");
0954: initUndeliverableProcess(message);
0955: return;
0956: }
0957: original = Thread.currentThread().getContextClassLoader();
0958: Thread.currentThread().setContextClassLoader(
0959: ref.getClass().getClassLoader());
0960:
0961: RPCMessageImpl rpcMessageImpl = (RPCMessageImpl) ((RPCMessageImpl) message)
0962: .clone();
0963: Method method = null;
0964: try {
0965: method = ref.getClass().getMethod(
0966: rpcMessageImpl.getMethodName(),
0967: rpcMessageImpl.getArgumentTypes());
0968: } catch (Exception ex) {
0969: log.error("Failed to find the method on the target : "
0970: + ex.getMessage(), ex);
0971: if (original != null) {
0972: Thread.currentThread().setContextClassLoader(
0973: original);
0974: original = null;
0975: }
0976: message.addError(Message.ERROR,
0977: "Failed to find the method on the target : "
0978: + ex.getMessage());
0979: initUndeliverableProcess(message);
0980: return;
0981: }
0982: utw.begin();
0983: try {
0984: Object result = method.invoke(ref, rpcMessageImpl
0985: .getArguments());
0986: ((RPCMessage) message).setResult(result);
0987:
0988: // reset the class loader
0989: if (original != null) {
0990: Thread.currentThread().setContextClassLoader(
0991: original);
0992: original = null;
0993: }
0994: } catch (Throwable ex) {
0995: log.error("Caught an exception : " + ex.getMessage(),
0996: ex);
0997: // reset the class loader
0998: if (original != null) {
0999: Thread.currentThread().setContextClassLoader(
1000: original);
1001: original = null;
1002: }
1003: // force the rollback of any changes
1004: utw.release();
1005: // start a new transaction
1006: utw.begin();
1007:
1008: // deal with invocation exception
1009: if (ex instanceof java.lang.reflect.InvocationTargetException) {
1010: ex = ex.getCause();
1011: }
1012:
1013: // if this is a remote exception
1014: if (ex instanceof java.rmi.RemoteException) {
1015: log.info("This is a remote exception and results "
1016: + "in a retry");
1017: message.incrementRetries();
1018: Date nextDate = new Date();
1019: nextDate.setTime(nextDate.getTime() + delay);
1020: ((MessageImpl) message)
1021: .setNextProcessDate(nextDate);
1022: messageProcessInfo.getMessageManager()
1023: .updateMessage(message);
1024: utw.commit();
1025: messageProcessInfo.getMessageQueue()
1026: .pushBackMessage(
1027: messageProcessInfo
1028: .getMessageManager());
1029:
1030: return;
1031: }
1032: ((RPCMessage) message).setThrowable(ex);
1033: }
1034: // deal with reply
1035: if (message.getReply()) {
1036: log.info("Init the process to deliver to the sender : "
1037: + message.getMessageId());
1038: ((RPCMessageImpl) message).setState(Message.DELIVERED);
1039: messageProcessInfo.getMessageManager().updateMessage(
1040: message);
1041: messageProcessInfo.getMessageQueue().removeMessage(
1042: message.getMessageId());
1043: MessageQueue messageQueue = MessageQueueManager
1044: .getInstance().getQueue(
1045: MessageQueueManager.UNSORTED);
1046: ((MessageManagerImpl) messageProcessInfo
1047: .getMessageManager())
1048: .assignToQueue(MessageQueueManager.UNSORTED);
1049: messageQueue.addMessage(messageProcessInfo
1050: .getMessageManager());
1051: } else {
1052: log.info("Removing the completed rpc message : "
1053: + message.getMessageId());
1054: messageProcessInfo.getMessageManager().remove();
1055: messageProcessInfo.getMessageQueue().removeMessage(
1056: message.getMessageId());
1057: }
1058: utw.commit();
1059: } catch (Exception ex) {
1060: log.error("Failed to deliver the RPC Message : "
1061: + ex.getMessage(), ex);
1062: throw new MessageServiceException(
1063: "Failed to deliver the RPC Message : "
1064: + ex.getMessage(), ex);
1065: } finally {
1066: utw.release();
1067: if (original != null) {
1068: Thread.currentThread().setContextClassLoader(original);
1069: }
1070: }
1071: }
1072:
1073: /**
1074: * This method delivers the rpc reply message.
1075: *
1076: * @param reply The reply address for the message.
1077: * @param message The message to deliver.
1078: * @exception MessageServiceException
1079: */
1080: private void deliverReplyRPCMessage(String reply, Message message)
1081: throws MessageServiceException {
1082: ClassLoader original = null;
1083: try {
1084: Object ref = null;
1085: if (((ref = BeanConnector.getInstance().getBean(reply)) == null)
1086: && ((ref = JMXBeanConnector.getInstance()
1087: .getJMXBean(reply)) == null)) {
1088: message.addError(Message.ERROR, "The reply [" + reply
1089: + "] does not exist.");
1090: initUndeliverableProcess(message);
1091: return;
1092: }
1093: original = Thread.currentThread().getContextClassLoader();
1094: Thread.currentThread().setContextClassLoader(
1095: ref.getClass().getClassLoader());
1096:
1097: utw.begin();
1098: RPCMessage rpcMessage = (RPCMessage) message;
1099: try {
1100: if (rpcMessage.generatedException()) {
1101: Method method = ref.getClass().getMethod(
1102: "onFailure",
1103: new Class[] { String.class, String.class,
1104: Throwable.class });
1105: Throwable ex = rpcMessage.getThrowable();
1106: if (ex instanceof java.lang.reflect.InvocationTargetException) {
1107: ex = ((java.lang.reflect.InvocationTargetException) ex)
1108: .getCause();
1109: }
1110: method.invoke(ref, new Object[] {
1111: rpcMessage.getMessageId(),
1112: rpcMessage.getCorrelationId(), ex });
1113: } else {
1114: Method method = ref.getClass().getMethod(
1115: "onSuccess",
1116: new Class[] { String.class, String.class,
1117: Object.class });
1118: method.invoke(ref, new Object[] {
1119: rpcMessage.getMessageId(),
1120: rpcMessage.getCorrelationId(),
1121: rpcMessage.getResult() });
1122: }
1123: } catch (Exception ex) {
1124: log.error("Failed to deliver the message [" + reply
1125: + "] to the AsyncCallbackHandler method : "
1126: + ex.getMessage(), ex);
1127: utw.release();
1128: message
1129: .addError(
1130: Message.ERROR,
1131: "Failed to deliver the message ["
1132: + reply
1133: + "] to the AsyncCallbackHandler method : "
1134: + ex.getMessage());
1135: initUndeliverableProcess(message);
1136: return;
1137: }
1138: if (original != null) {
1139: Thread.currentThread().setContextClassLoader(original);
1140: original = null;
1141: }
1142: log.info("Removing the completed rpc message : "
1143: + rpcMessage.getMessageId());
1144: messageProcessInfo.getMessageManager().remove();
1145: messageProcessInfo.getMessageQueue().removeMessage(
1146: rpcMessage.getMessageId());
1147:
1148: utw.commit();
1149: } catch (Exception ex) {
1150: log.error("Failed to deliver the reply RPC Message : "
1151: + ex.getMessage(), ex);
1152: throw new MessageServiceException(
1153: "Failed to deliver the reply RPC Message : "
1154: + ex.getMessage(), ex);
1155: } finally {
1156: utw.release();
1157: if (original != null) {
1158: Thread.currentThread().setContextClassLoader(original);
1159: }
1160: }
1161: }
1162:
1163: /**
1164: * This method delivers the text message to its target.
1165: *
1166: * @param target The target to deliver the message to.
1167: * @param message The message to deliver.
1168: * @exception MessageServiceException
1169: */
1170: private void deliverTextMessage(String target, Message message)
1171: throws MessageServiceException {
1172: try {
1173: MessageHandler messageHandler = (MessageHandler) ConnectionManager
1174: .getInstance().getConnection(MessageHandler.class,
1175: target);
1176: utw.begin();
1177: Message result = messageHandler.processMessage(message);
1178: result.incrementRetries();
1179: if (result.isAcknowledged() && result.getReply()
1180: && (message.getState() == Message.UNDELIVERED)) {
1181: log.info("Init the process to deliver to the sender : "
1182: + message.getMessageId());
1183: ((MessageImpl) result).setState(Message.DELIVERED);
1184: messageProcessInfo.getMessageManager().updateMessage(
1185: result);
1186: messageProcessInfo.getMessageQueue().removeMessage(
1187: message.getMessageId());
1188: MessageQueue messageQueue = MessageQueueManager
1189: .getInstance().getQueue(
1190: MessageQueueManager.UNSORTED);
1191: ((MessageManagerImpl) messageProcessInfo
1192: .getMessageManager())
1193: .assignToQueue(MessageQueueManager.UNSORTED);
1194: messageQueue.addMessage(messageProcessInfo
1195: .getMessageManager());
1196: utw.commit();
1197: } else if ((result.isAcknowledged() && !result.getReply())
1198: || (result.isAcknowledged() && (message.getState() == Message.DELIVERED))) {
1199: log.info("Removing the completed text message : "
1200: + message.getMessageId());
1201: messageProcessInfo.getMessageManager().remove();
1202: messageProcessInfo.getMessageQueue().removeMessage(
1203: message.getMessageId());
1204: utw.commit();
1205: } else {
1206: Date nextDate = new Date();
1207: nextDate.setTime(nextDate.getTime() + delay);
1208: ((MessageImpl) result).setNextProcessDate(nextDate);
1209: messageProcessInfo.getMessageManager().updateMessage(
1210: result);
1211: utw.commit();
1212: messageProcessInfo.getMessageQueue().pushBackMessage(
1213: messageProcessInfo.getMessageManager());
1214: }
1215: } catch (java.lang.ClassCastException ex) {
1216: log
1217: .error(
1218: "Failed to deliver the text message ["
1219: + message.getMessageId()
1220: + "], "
1221: + "init the undeliverable process, as the target cannot be "
1222: + "spoken to correctly : "
1223: + ex.getMessage(), ex);
1224: message.addError(Message.ERROR,
1225: "Failed to deliver the text message : "
1226: + ex.getMessage());
1227: initUndeliverableProcess(message);
1228: } catch (com.rift.coad.util.connection.NameNotFound ex) {
1229: log.error("Failed to deliver the text message ["
1230: + message.getMessageId() + "], "
1231: + "init the undeliverable process, "
1232: + "as the target name cannot be found : "
1233: + ex.getMessage(), ex);
1234: message.addError(Message.ERROR,
1235: "Failed to deliver the text message : "
1236: + ex.getMessage());
1237: initUndeliverableProcess(message);
1238: } catch (Exception ex) {
1239: log.error(
1240: "Failed to deliver the text message ["
1241: + message.getMessageId() + "] : "
1242: + ex.getMessage(), ex);
1243: throw new MessageServiceException(
1244: "Failed to deliver the text message : "
1245: + ex.getMessage(), ex);
1246: } finally {
1247: utw.release();
1248: }
1249: }
1250:
1251: /**
1252: * This method delivers the reply text message to its target.
1253: *
1254: * @param reply The reply address for the message.
1255: * @param message The message to deliver.
1256: * @exception MessageServiceException
1257: */
1258: private void deliverReplyTextMessage(String reply, Message message)
1259: throws MessageServiceException {
1260: try {
1261: MessageHandler messageHandler = (MessageHandler) ConnectionManager
1262: .getInstance().getConnection(MessageHandler.class,
1263: reply);
1264: utw.begin();
1265: Message result = messageHandler.processMessage(message);
1266: result.incrementRetries();
1267: if (result.isAcknowledged()) {
1268: log.info("Removing the completed text message : "
1269: + message.getMessageId());
1270: messageProcessInfo.getMessageManager().remove();
1271: messageProcessInfo.getMessageQueue().removeMessage(
1272: message.getMessageId());
1273: utw.commit();
1274: } else {
1275: Date nextDate = new Date();
1276: nextDate.setTime(nextDate.getTime() + delay);
1277: ((MessageImpl) result).setNextProcessDate(nextDate);
1278: messageProcessInfo.getMessageManager().updateMessage(
1279: result);
1280: utw.commit();
1281: messageProcessInfo.getMessageQueue().pushBackMessage(
1282: messageProcessInfo.getMessageManager());
1283: }
1284: } catch (java.lang.ClassCastException ex) {
1285: log.error("Failed to deliver the text message : "
1286: + ex.getMessage(), ex);
1287: message.addError(Message.ERROR,
1288: "Failed to deliver the text message : "
1289: + ex.getMessage());
1290: initUndeliverableProcess(message);
1291: } catch (com.rift.coad.util.connection.NameNotFound ex) {
1292: log.error("Failed to deliver the text message : "
1293: + ex.getMessage(), ex);
1294: message.addError(Message.ERROR,
1295: "Failed to deliver the text message : "
1296: + ex.getMessage());
1297: initUndeliverableProcess(message);
1298: } catch (Exception ex) {
1299: log.error("Failed to deliver the text message : "
1300: + ex.getMessage(), ex);
1301: throw new MessageServiceException(
1302: "Failed to deliver the text message : "
1303: + ex.getMessage(), ex);
1304: } finally {
1305: utw.release();
1306: }
1307: }
1308:
1309: /**
1310: * This method prepends the JNDI URL base.
1311: *
1312: * @return The modified url.
1313: * @param url The url to modify.
1314: */
1315: private String downJNDIUrl(String url)
1316: throws MessageServiceException {
1317: try {
1318: String instanceBase = NamingConstants.SUBCONTEXT + "/"
1319: + namingDirector.getInstanceId() + "/";
1320: if (url.indexOf(PARENT_INSTANCE) == 0) {
1321: return url.substring(PARENT_INSTANCE.length());
1322: } else if (url.contains(namingDirector.getPrimaryJNDIUrl())
1323: || url.contains(namingDirector.getJNDIBase())
1324: || url.contains(instanceBase)
1325: || (url
1326: .indexOf(NamingConstants.JNDI_NETWORK_PREFIX) == 0)) {
1327: return url;
1328: } else if (!url.contains(namingDirector.getInstanceId())) {
1329: return instanceBase + url;
1330: } else {
1331: return NamingConstants.SUBCONTEXT + "/" + url;
1332: }
1333: } catch (Exception ex) {
1334: log.error("Failed to move down the url : "
1335: + ex.getMessage(), ex);
1336: throw new MessageServiceException(
1337: "Failed to move down the url : " + ex.getMessage(),
1338: ex);
1339: }
1340: }
1341:
1342: /**
1343: * This method prepends the JNDI URL base.
1344: *
1345: * @return The modified url.
1346: * @param url The url to modify.
1347: */
1348: private String upJNDIUrl(String target, String url)
1349: throws MessageServiceException {
1350: try {
1351: String updatedURL = url;
1352: String instanceBase = namingDirector.getInstanceId() + "/"
1353: + NamingConstants.SUBCONTEXT + "/";
1354: String subContextUrl = NamingConstants.SUBCONTEXT + "/";
1355: int pos = url.indexOf(subContextUrl);
1356: if (url.contains(instanceBase)) {
1357: updatedURL = url.substring(url.indexOf(instanceBase)
1358: + instanceBase.length());
1359: pos = updatedURL.indexOf(subContextUrl);
1360: if (url.equals(target) && (pos == 0)) {
1361: updatedURL = updatedURL
1362: .substring(updatedURL.indexOf("/", pos
1363: + subContextUrl.length()) + 1);
1364: }
1365: } else if (url.equals(target) && (pos == 0)) {
1366: updatedURL = url.substring(url.indexOf("/", pos
1367: + subContextUrl.length()) + 1);
1368: } else {
1369: updatedURL = PARENT_INSTANCE + url;
1370: }
1371: return updatedURL;
1372: } catch (Exception ex) {
1373: log.error(
1374: "Failed to modify the url to set it relative to the next "
1375: + "coadunation intance : "
1376: + ex.getMessage(), ex);
1377: throw new MessageServiceException(
1378: "Failed to modify the url to "
1379: + "set it relative to the next coadunation intance : "
1380: + ex.getMessage(), ex);
1381: }
1382: }
1383:
1384: /**
1385: * The message has been deemed undeliverable for some reason.
1386: *
1387: * @param message The message set as undeliverable
1388: * @excption MessageServiceException
1389: */
1390: private void initUndeliverableProcess(Message message)
1391: throws MessageServiceException {
1392: try {
1393: int currentState = message.getState();
1394: utw.begin();
1395: ((MessageImpl) message).setState(Message.UNDELIVERABLE);
1396: messageProcessInfo.getMessageManager().updateMessage(
1397: message);
1398: utw.commit();
1399: utw.release();
1400: if (currentState == Message.DELIVERED) {
1401: processUndeliverable(message);
1402: } else if (messageProcessInfo.getMessageQueue().getName()
1403: .equals(MessageQueueManager.UNSORTED)) {
1404: messageProcessInfo.getMessageQueue().pushBackMessage(
1405: messageProcessInfo.getMessageManager());
1406: } else {
1407: utw.begin();
1408: messageProcessInfo.getMessageQueue().removeMessage(
1409: message.getMessageId());
1410: MessageQueue messageQueue = MessageQueueManager
1411: .getInstance().getQueue(
1412: MessageQueueManager.UNSORTED);
1413: ((MessageManagerImpl) messageProcessInfo
1414: .getMessageManager())
1415: .assignToQueue(MessageQueueManager.UNSORTED);
1416: messageQueue.addMessage(messageProcessInfo
1417: .getMessageManager());
1418: utw.commit();
1419: }
1420:
1421: } catch (Exception ex) {
1422: log.error("Failed to init the undeliverable process :"
1423: + ex.getMessage(), ex);
1424: throw new MessageServiceException("Failed to init the "
1425: + "undeliverable process :" + ex.getMessage(), ex);
1426: } finally {
1427: utw.release();
1428: }
1429: }
1430:
1431: /**
1432: * This method is responsible for initializing the user session.
1433: *
1434: * @param message The message containing the user session information.
1435: * @exception MessageServiceException
1436: */
1437: private void initUserSession(Message message)
1438: throws MessageServiceException {
1439: try {
1440: Session session = new Session(message.getMessageCreater(),
1441: message.getSessionId(), new HashSet(message
1442: .getMessagePrincipals()));
1443: getServerInterceptor().createSession(session);
1444: } catch (Exception ex) {
1445: log.error("Failed to setup the user session : "
1446: + ex.getMessage(), ex);
1447: throw new MessageServiceException(
1448: "Failed to setup the user session :"
1449: + ex.getMessage(), ex);
1450: }
1451: }
1452:
1453: /**
1454: * This method is responsible for initializing the user session.
1455: */
1456: private void releaseUserSession() {
1457: try {
1458: getServerInterceptor().release();
1459: } catch (Exception ex) {
1460: log.error("Failed to release the user session : "
1461: + ex.getMessage(), ex);
1462: }
1463: }
1464: }
|