Source Code Cross Referenced for MessageProcessor.java in  » Net » Coadunation_1.0.1 » com » rift » coad » daemon » messageservice » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Net » Coadunation_1.0.1 » com.rift.coad.daemon.messageservice 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


0001:        /*
0002:         * MessageService: The message service daemon
0003:         * Copyright (C) 2007  Rift IT Contracting
0004:         *
0005:         * This library is free software; you can redistribute it and/or
0006:         * modify it under the terms of the GNU Lesser General Public
0007:         * License as published by the Free Software Foundation; either
0008:         * version 2.1 of the License, or (at your option) any later version.
0009:         *
0010:         * This library is distributed in the hope that it will be useful,
0011:         * but WITHOUT ANY WARRANTY; without even the implied warranty of
0012:         * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
0013:         * Lesser General Public License for more details.
0014:         *
0015:         * You should have received a copy of the GNU Lesser General Public
0016:         * License along with this library; if not, write to the Free Software
0017:         * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
0018:         *
0019:         * MessageProcessor.java
0020:         */
0021:
0022:        // package path
0023:        package com.rift.coad.daemon.messageservice;
0024:
0025:        // java imports
0026:        import com.rift.coad.daemon.messageservice.named.NamedQueueManagerImpl;
0027:        import java.lang.reflect.Method;
0028:        import java.util.ArrayList;
0029:        import java.util.Date;
0030:        import java.util.HashSet;
0031:        import java.util.List;
0032:        import java.util.Set;
0033:        import java.util.StringTokenizer;
0034:        import javax.naming.Context;
0035:        import javax.naming.InitialContext;
0036:
0037:        // logging import
0038:        import org.apache.log4j.Logger;
0039:
0040:        // coadunation imports
0041:        import com.rift.coad.daemon.messageservice.message.MessageImpl;
0042:        import com.rift.coad.daemon.messageservice.message.RPCMessageImpl;
0043:        import com.rift.coad.daemon.messageservice.message.MessageManagerImpl;
0044:        import com.rift.coad.daemon.messageservice.message.MessageManagerFactory;
0045:        import com.rift.coad.daemon.messageservice.named.NamedMemoryQueue;
0046:        import com.rift.coad.daemon.servicebroker.ServiceBroker;
0047:        import com.rift.coad.lib.common.RandomGuid;
0048:        import com.rift.coad.lib.configuration.Configuration;
0049:        import com.rift.coad.lib.configuration.ConfigurationFactory;
0050:        import com.rift.coad.lib.deployment.DeploymentMonitor;
0051:        import com.rift.coad.lib.naming.NamingDirector;
0052:        import com.rift.coad.lib.naming.NamingConstants;
0053:        import com.rift.coad.lib.interceptor.InterceptorWrapper;
0054:        import com.rift.coad.lib.interceptor.credentials.Session;
0055:        import com.rift.coad.lib.thread.pool.Task;
0056:        import com.rift.coad.lib.thread.pool.ThreadPoolManager;
0057:        import com.rift.coad.lib.thread.pool.PoolException;
0058:        import com.rift.coad.lib.deployment.BeanInfo;
0059:        import com.rift.coad.lib.deployment.bean.BeanConnector;
0060:        import com.rift.coad.lib.deployment.bean.BeanManager;
0061:        import com.rift.coad.lib.deployment.jmxbean.JMXBeanConnector;
0062:        import com.rift.coad.lib.deployment.jmxbean.JMXBeanManager;
0063:        import com.rift.coad.util.connection.ConnectionManager;
0064:        import com.rift.coad.util.transaction.UserTransactionWrapper;
0065:
0066:        /**
0067:         * This object is responsible for processing the message that are sent to the
0068:         * message service.
0069:         *
0070:         * @author Brett Chaldecott
0071:         */
0072:        public class MessageProcessor extends InterceptorWrapper implements 
0073:                Task {
0074:
0075:            // class constants
0076:            private final static long BACK_OFF_PERIOD = 1000;
0077:            private final static String RETRY_DELAY = "retry_delay";
0078:            private final static long DEFAULT_RETRY_DELAY = 60000;
0079:            private final static String PARENT_INSTANCE = "../";
0080:
0081:            // the logger reference
0082:            protected static Logger log = Logger
0083:                    .getLogger(MessageProcessor.class.getName());
0084:
0085:            // private member variables
0086:            private Context context = null;
0087:            private UserTransactionWrapper utw = null;
0088:            private MessageProcessInfo messageProcessInfo = null;
0089:            private NamingDirector namingDirector = null;
0090:            private long delay = 0;
0091:
0092:            /**
0093:             * Creates a new instance of MessageProcessor
0094:             *
0095:             * @exception Exception
0096:             */
0097:            public MessageProcessor() throws Exception {
0098:                try {
0099:                    context = new InitialContext();
0100:                    utw = new UserTransactionWrapper();
0101:                    namingDirector = NamingDirector.getInstance();
0102:                    Configuration config = ConfigurationFactory.getInstance()
0103:                            .getConfig(MessageProcessor.class);
0104:                    delay = config.getLong(RETRY_DELAY, DEFAULT_RETRY_DELAY);
0105:                } catch (Exception ex) {
0106:                    log.error("Failed init the message processor : "
0107:                            + ex.getMessage(), ex);
0108:                    throw new Exception("Failed init the message processor : "
0109:                            + ex.getMessage(), ex);
0110:                }
0111:            }
0112:
0113:            /**
0114:             * The implementation of the process method used by the coadunation
0115:             * threading pool.
0116:             *
0117:             * @param poolManager The reference to the thread pool manager.
0118:             * @exception Exception
0119:             */
0120:            public void process(ThreadPoolManager poolManager) throws Exception {
0121:                DeploymentMonitor.getInstance().waitUntilInitDeployComplete();
0122:                if (DeploymentMonitor.getInstance().isTerminated()) {
0123:                    return;
0124:                }
0125:                boolean foundMessage = getMessageManager();
0126:                poolManager.releaseThread();
0127:                if (foundMessage) {
0128:                    processMessage();
0129:                }
0130:            }
0131:
0132:            /**
0133:             * This method retrieves the next message from the message queue manager
0134:             * for processing.
0135:             *
0136:             * @return True if a message was found false if not.
0137:             */
0138:            private boolean getMessageManager() {
0139:                try {
0140:                    Date currentTime = new Date();
0141:                    Date delayTime = new Date();
0142:                    messageProcessInfo = MessageQueueManager.getInstance()
0143:                            .getNextMessage(currentTime);
0144:                    if (messageProcessInfo != null) {
0145:                        return true;
0146:                    }
0147:                    long difference = delayTime.getTime()
0148:                            - currentTime.getTime();
0149:                    if ((delayTime.getTime() == currentTime.getTime())
0150:                            || (difference > BACK_OFF_PERIOD)
0151:                            || (difference < 0)) {
0152:                        ProcessMonitor.getInstance().monitor(BACK_OFF_PERIOD);
0153:                    } else {
0154:                        ProcessMonitor.getInstance().monitor(difference);
0155:                    }
0156:                } catch (Exception ex) {
0157:                    log.error("Failed to retrieve a message : "
0158:                            + ex.getMessage(), ex);
0159:                }
0160:                return false;
0161:            }
0162:
0163:            /**
0164:             * This method is called to process a message.
0165:             */
0166:            private void processMessage() {
0167:                Message message = null;
0168:                try {
0169:                    message = getMessage();
0170:                    if (message.getState() == Message.UNDELIVERED) {
0171:                        processUndelivered(message);
0172:                    } else if (message.getState() == Message.DELIVERED) {
0173:                        processDelivered(message);
0174:                    } else if (message.getState() == Message.UNDELIVERABLE) {
0175:                        processUndeliverable(message);
0176:                    }
0177:                } catch (Exception ex) {
0178:                    log.error("Failed to process the message : "
0179:                            + ex.getMessage(), ex);
0180:                    if (message == null) {
0181:                        pushMessage(messageProcessInfo);
0182:                    } else {
0183:                        pushMessage(message, messageProcessInfo);
0184:                    }
0185:                }
0186:            }
0187:
0188:            /**
0189:             * This method returns the message contained within.
0190:             *
0191:             * @return Message The message.
0192:             * @exception MessageServiceException
0193:             */
0194:            private Message getMessage() throws MessageServiceException {
0195:                Message message = null;
0196:                try {
0197:                    utw.begin();
0198:                    MessageManager messageManager = messageProcessInfo
0199:                            .getMessageManager();
0200:                    message = messageManager.getMessage();
0201:                    utw.commit();
0202:                } catch (Exception ex) {
0203:                    log.error("Failed to retrieve the message : "
0204:                            + ex.getMessage(), ex);
0205:                    throw new MessageServiceException(
0206:                            "Failed to retrieve the message : "
0207:                                    + ex.getMessage(), ex);
0208:                } finally {
0209:                    utw.release();
0210:                }
0211:
0212:                return message;
0213:            }
0214:
0215:            /**
0216:             * This method will process the message.
0217:             *
0218:             * @param message The message to process.
0219:             * @exception MessageServiceException
0220:             */
0221:            private void processUndelivered(Message message)
0222:                    throws MessageServiceException {
0223:                try {
0224:                    if (message.getMessageType() == Message.POINT_TO_POINT) {
0225:                        if (checkIfTargetLocal(message)
0226:                                && checkIfMessageInQueue(message)) {
0227:                            deliverMessage(message);
0228:                        }
0229:                    } else if (message.getMessageType() == Message.POINT_TO_SERVICE) {
0230:                        if (checkIfServiceLocal(message)
0231:                                && checkIfMessageInQueue(message)) {
0232:                            deliverMessage(message);
0233:                        }
0234:                    } else if (message.getMessageType() == Message.POINT_TO_MULTI_SERVICE) {
0235:                        if (!namingDirector.isPrimary()) {
0236:                            deliverToParent(message);
0237:                        } else {
0238:                            cloneMessageForServices(message);
0239:                        }
0240:                    }
0241:                } catch (Exception ex) {
0242:                    log.error("Failed to process the undelivered message : "
0243:                            + ex.getMessage(), ex);
0244:                    throw new MessageServiceException(
0245:                            "Failed to process the undelivered message : "
0246:                                    + ex.getMessage(), ex);
0247:                }
0248:            }
0249:
0250:            /**
0251:             * This method will process the message.
0252:             *
0253:             * @param message The message to process.
0254:             * @exception MessageServiceException
0255:             */
0256:            private void processDelivered(Message message)
0257:                    throws MessageServiceException {
0258:                try {
0259:                    if (checkIfReplyLocal(message)
0260:                            && checkIfReplyMessageInQueue(message)) {
0261:                        deliverReplyMessage(message);
0262:                    }
0263:                } catch (Exception ex) {
0264:                    log.error("Failed to process the undelivered message : "
0265:                            + ex.getMessage(), ex);
0266:                    throw new MessageServiceException(
0267:                            "Failed to process the undelivered message : "
0268:                                    + ex.getMessage(), ex);
0269:                }
0270:            }
0271:
0272:            /**
0273:             * This method will process the undeliverable message.
0274:             *
0275:             * @param message The message to process.
0276:             * @exception MessageServiceException
0277:             */
0278:            private void processUndeliverable(Message message)
0279:                    throws MessageServiceException {
0280:                try {
0281:                    if (checkIfReplyLocal(message)) {
0282:                        utw.begin();
0283:                        if (NamedQueueManagerImpl.getInstance()
0284:                                .checkForNamedQueue(
0285:                                        MessageQueueManager.DEAD_LETTER, true)) {
0286:                            log.info("Assign message to dead letter queue.");
0287:                            messageProcessInfo.getMessageQueue().removeMessage(
0288:                                    message.getMessageId());
0289:                            ((MessageManagerImpl) messageProcessInfo
0290:                                    .getMessageManager())
0291:                                    .assignToQueue(MessageQueueManager.DEAD_LETTER);
0292:                            NamedMemoryQueue.getInstance(
0293:                                    MessageQueueManager.DEAD_LETTER)
0294:                                    .addMessage(
0295:                                            messageProcessInfo
0296:                                                    .getMessageManager());
0297:                            log
0298:                                    .info("Added the value to the dead letter queue");
0299:                        } else {
0300:                            log
0301:                                    .error("Failed to add to the dead letter queue.");
0302:                        }
0303:                        utw.commit();
0304:                    }
0305:                } catch (Exception ex) {
0306:                    log.error("Failed to process the Undeliverable message : "
0307:                            + ex.getMessage(), ex);
0308:                    throw new MessageServiceException(
0309:                            "Failed to process the Undeliverable message : "
0310:                                    + ex.getMessage(), ex);
0311:                } finally {
0312:                    utw.release();
0313:                }
0314:            }
0315:
0316:            /**
0317:             * This method pushes the message back onto the queue from which it was
0318:             * retrieved.
0319:             *
0320:             * @param messageProcessInfo The processing information for this thread.
0321:             */
0322:            private void pushMessage(MessageProcessInfo messageProcessInfo) {
0323:                try {
0324:                    messageProcessInfo.getMessageQueue().pushBackMessage(
0325:                            messageProcessInfo.getMessageManager());
0326:                    ProcessMonitor.getInstance().notifyProcessor();
0327:                } catch (Exception ex) {
0328:                    log.error(
0329:                            "Failed to push the message manager onto a queue : "
0330:                                    + ex.getMessage(), ex);
0331:                }
0332:            }
0333:
0334:            /**
0335:             * This method pushes the message back onto the queue from which it was
0336:             * retrieved.
0337:             *
0338:             * @param message The message to push back.
0339:             * @param messageProcessInfo The processing information for this thread.
0340:             */
0341:            private void pushMessage(Message message,
0342:                    MessageProcessInfo messageProcessInfo) {
0343:                try {
0344:                    try {
0345:                        utw.begin();
0346:                        Date nextDate = new Date();
0347:                        nextDate.setTime(nextDate.getTime() + delay);
0348:                        ((MessageImpl) message).setNextProcessDate(nextDate);
0349:                        messageProcessInfo.getMessageManager().updateMessage(
0350:                                message);
0351:                        utw.commit();
0352:                    } catch (Exception ex) {
0353:                        log.error("Failed to process the message : "
0354:                                + ex.getMessage(), ex);
0355:                    } finally {
0356:                        utw.release();
0357:                    }
0358:                    messageProcessInfo.getMessageQueue().pushBackMessage(
0359:                            messageProcessInfo.getMessageManager());
0360:                    ProcessMonitor.getInstance().notifyProcessor();
0361:                } catch (Exception ex) {
0362:                    log.error(
0363:                            "Failed to push the message manager onto a queue : "
0364:                                    + ex.getMessage(), ex);
0365:                }
0366:            }
0367:
0368:            /**
0369:             * This method checks if the target this message is going to is local to
0370:             * this Coadunation Instance.
0371:             *
0372:             * @return TRUE if local, FALSE if not.
0373:             * @param message The message to perform the test on.
0374:             */
0375:            private boolean checkIfTargetLocal(Message message)
0376:                    throws MessageServiceException {
0377:                try {
0378:                    String target = message.getTarget();
0379:                    if (target == null) {
0380:                        message.addError(Message.ERROR,
0381:                                "There is no target for this message");
0382:                        initUndeliverableProcess(message);
0383:                        return false;
0384:                    }
0385:
0386:                    // check if this fall withing this part of the tree or below
0387:                    String jndiBase = NamingDirector.getInstance()
0388:                            .getJNDIBase()
0389:                            + "/";
0390:                    String parentUrl = NamingDirector.getInstance()
0391:                            .getPrimaryJNDIUrl();
0392:                    String instanceURL = NamingDirector.getInstance()
0393:                            .getInstanceId()
0394:                            + "/";
0395:                    int pos = target.indexOf(jndiBase);
0396:                    int instancePos = target.indexOf(instanceURL);
0397:                    if ((((target.indexOf(parentUrl)) != -1) && (target
0398:                            .indexOf(jndiBase) == -1))
0399:                            || (target.indexOf(PARENT_INSTANCE) == 0)
0400:                            || ((target
0401:                                    .indexOf(NamingConstants.JNDI_NETWORK_PREFIX) == 0) && !NamingDirector
0402:                                    .getInstance().isPrimary())) {
0403:                        deliverToParent(message);
0404:                        return false;
0405:                    } else if (((instancePos != -1) && (target.indexOf(
0406:                            NamingConstants.SUBCONTEXT,
0407:                            (instancePos + instanceURL.length())) != -1))
0408:                            || (target.indexOf(NamingConstants.SUBCONTEXT) == 0)) {
0409:                        deliverToChild(message.getTarget(), message);
0410:                        return false;
0411:                    } else if (instancePos != -1) {
0412:                        utw.begin();
0413:                        target = target.substring(instancePos
0414:                                + instanceURL.length());
0415:                        message.setTarget(target);
0416:                        messageProcessInfo.getMessageManager().updateMessage(
0417:                                message);
0418:                        utw.commit();
0419:                        messageProcessInfo.getMessageQueue().pushBackMessage(
0420:                                messageProcessInfo.getMessageManager());
0421:                        ProcessMonitor.getInstance().notifyProcessor();
0422:                        return false;
0423:                    }
0424:                    return true;
0425:                } catch (MessageServiceException ex) {
0426:                    throw ex;
0427:                } catch (Exception ex) {
0428:                    log.error("Failed to check the message is local : "
0429:                            + ex.getMessage(), ex);
0430:                    throw new MessageServiceException(
0431:                            "Failed to check the message is local : "
0432:                                    + ex.getMessage(), ex);
0433:                } finally {
0434:                    utw.release();
0435:                }
0436:            }
0437:
0438:            /**
0439:             * This method checks if the target this message is going to is local to
0440:             * this Coadunation Instance.
0441:             *
0442:             * @return TRUE if local, FALSE if not.
0443:             * @param message The message to perform the test on.
0444:             */
0445:            private boolean checkIfReplyLocal(Message message)
0446:                    throws MessageServiceException {
0447:                try {
0448:                    String reply = message.getReplyTo();
0449:                    if (reply == null) {
0450:                        reply = message.getFrom();
0451:                        if (reply == null) {
0452:                            message.addError(Message.ERROR,
0453:                                    "There is no reply for this message");
0454:                            initUndeliverableProcess(message);
0455:                            return false;
0456:                        }
0457:                    }
0458:
0459:                    // check if this fall withing this part of the tree or below
0460:                    String jndiBase = NamingDirector.getInstance()
0461:                            .getJNDIBase()
0462:                            + "/";
0463:                    String parentUrl = NamingDirector.getInstance()
0464:                            .getPrimaryJNDIUrl();
0465:                    String instanceURL = NamingDirector.getInstance()
0466:                            .getInstanceId()
0467:                            + "/";
0468:                    int pos = reply.indexOf(jndiBase);
0469:                    int instancePos = reply.indexOf(instanceURL);
0470:                    if ((((reply.indexOf(parentUrl)) != -1) && (reply
0471:                            .indexOf(jndiBase) == -1))
0472:                            || (reply.indexOf(PARENT_INSTANCE) == 0)
0473:                            || ((reply
0474:                                    .indexOf(NamingConstants.JNDI_NETWORK_PREFIX) == 0) && !NamingDirector
0475:                                    .getInstance().isPrimary())) {
0476:                        deliverToParent(message);
0477:                        return false;
0478:                    } else if (((instancePos != -1) && (reply.indexOf(
0479:                            NamingConstants.SUBCONTEXT,
0480:                            (instancePos + instanceURL.length())) != -1))
0481:                            || (reply.indexOf(NamingConstants.SUBCONTEXT) == 0)) {
0482:                        deliverToChild(reply, message);
0483:                        return false;
0484:                    } else if (instancePos != -1) {
0485:                        utw.begin();
0486:                        reply = reply.substring(instancePos
0487:                                + instanceURL.length());
0488:                        if (message.getReplyTo() != null) {
0489:                            message.setReplyTo(reply);
0490:                        } else {
0491:                            message.setFrom(reply);
0492:                        }
0493:                        messageProcessInfo.getMessageManager().updateMessage(
0494:                                message);
0495:                        utw.commit();
0496:                        messageProcessInfo.getMessageQueue().pushBackMessage(
0497:                                messageProcessInfo.getMessageManager());
0498:                        ProcessMonitor.getInstance().notifyProcessor();
0499:                        return false;
0500:                    }
0501:                    return true;
0502:                } catch (MessageServiceException ex) {
0503:                    throw ex;
0504:                } catch (Exception ex) {
0505:                    log.error("Failed to check the message is local : "
0506:                            + ex.getMessage(), ex);
0507:                    throw new MessageServiceException(
0508:                            "Failed to check the message is local : "
0509:                                    + ex.getMessage(), ex);
0510:                }
0511:            }
0512:
0513:            /**
0514:             * This method checks if the target this message is going to is local to
0515:             * this Coadunation Instance.
0516:             *
0517:             * @return TRUE if local, FALSE if not.
0518:             * @param message The message to perform the test on.
0519:             */
0520:            private boolean checkIfServiceLocal(Message message)
0521:                    throws MessageServiceException {
0522:                try {
0523:                    String target = message.getTarget();
0524:                    if (target != null) {
0525:                        return checkIfTargetLocal(message);
0526:                    }
0527:
0528:                    String[] services = message.getServices();
0529:                    if (services == null) {
0530:                        message.addError(Message.ERROR,
0531:                                "There are no services for this message");
0532:                        initUndeliverableProcess(message);
0533:                        return false;
0534:                    }
0535:                    List serviceList = new ArrayList();
0536:                    for (int index = 0; index < services.length; index++) {
0537:                        serviceList.add(services[index]);
0538:                    }
0539:                    ServiceBroker serviceBroker = (ServiceBroker) ConnectionManager
0540:                            .getInstance().getConnection(ServiceBroker.class,
0541:                                    ServiceBroker.JNDI_URL);
0542:                    String service = serviceBroker
0543:                            .getServiceProvider(serviceList);
0544:                    if (service.length() != 0) {
0545:                        message.setTarget(service);
0546:                        utw.begin();
0547:                        messageProcessInfo.getMessageManager().updateMessage(
0548:                                message);
0549:                        utw.commit();
0550:                        messageProcessInfo.getMessageQueue().pushBackMessage(
0551:                                messageProcessInfo.getMessageManager());
0552:                    } else {
0553:                        deliverToParent(message);
0554:                    }
0555:                } catch (MessageServiceException ex) {
0556:                    throw ex;
0557:                } catch (Exception ex) {
0558:                    log.error("Failed to check if the services are local : "
0559:                            + ex.getMessage(), ex);
0560:                    throw new MessageServiceException(
0561:                            "Failed to check if the services are local : "
0562:                                    + ex.getMessage(), ex);
0563:                } finally {
0564:                    utw.release();
0565:                }
0566:                return false;
0567:            }
0568:
0569:            /**
0570:             * This method checks if the target this message is going to is local to
0571:             * this Coadunation Instance.
0572:             *
0573:             * @return TRUE if local, FALSE if not.
0574:             * @param message The message to perform the test on.
0575:             */
0576:            private boolean checkIfMessageInQueue(Message message)
0577:                    throws MessageServiceException {
0578:                try {
0579:                    if (messageProcessInfo.getMessageQueue().getName().equals(
0580:                            message.getTarget())) {
0581:                        return true;
0582:                    }
0583:                    utw.begin();
0584:                    if (message.getTarget().equals(
0585:                            MessageServiceManagerMBean.JNDI_URL)
0586:                            && (message.getTargetNamedQueue() != null)) {
0587:                        if (false == NamedQueueManagerImpl.getInstance()
0588:                                .checkForNamedQueue(
0589:                                        message.getTargetNamedQueue(), false)) {
0590:                            utw.release();
0591:                            message.addError(Message.ERROR, "The named queue ["
0592:                                    + message.getTargetNamedQueue()
0593:                                    + "] does not exist.");
0594:                            initUndeliverableProcess(message);
0595:                            return false;
0596:                        }
0597:                        messageProcessInfo.getMessageQueue().removeMessage(
0598:                                message.getMessageId());
0599:                        ((MessageManagerImpl) messageProcessInfo
0600:                                .getMessageManager()).assignToQueue(message
0601:                                .getTargetNamedQueue());
0602:                        NamedMemoryQueue.getInstance(
0603:                                message.getTargetNamedQueue()).addMessage(
0604:                                messageProcessInfo.getMessageManager());
0605:                    } else {
0606:                        MessageQueue messageQueue = MessageQueueManager
0607:                                .getInstance().getQueue(message.getTarget());
0608:                        messageProcessInfo.getMessageQueue().removeMessage(
0609:                                message.getMessageId());
0610:                        ((MessageManagerImpl) messageProcessInfo
0611:                                .getMessageManager()).assignToQueue(message
0612:                                .getTarget());
0613:                        messageQueue.addMessage(messageProcessInfo
0614:                                .getMessageManager());
0615:                    }
0616:                    utw.commit();
0617:                    return false;
0618:                } catch (Exception ex) {
0619:                    log.error(
0620:                            "Failed to check the target : " + ex.getMessage(),
0621:                            ex);
0622:                    throw new MessageServiceException(
0623:                            "Failed to check the target : " + ex.getMessage(),
0624:                            ex);
0625:                } finally {
0626:                    utw.release();
0627:                }
0628:            }
0629:
0630:            /**
0631:             * Check if this message is in the correct queue.
0632:             *
0633:             * @return TRUE if local, FALSE if not.
0634:             * @param message The message to perform the test on.
0635:             * @exception MessageServiceException
0636:             */
0637:            private boolean checkIfReplyMessageInQueue(Message message)
0638:                    throws MessageServiceException {
0639:                try {
0640:                    String reply = message.getReplyTo();
0641:                    if (reply == null) {
0642:                        reply = message.getFrom();
0643:                        if (reply == null) {
0644:                            message.addError(Message.ERROR,
0645:                                    "There is no reply for this message");
0646:                            initUndeliverableProcess(message);
0647:                            return false;
0648:                        }
0649:                    }
0650:
0651:                    if (messageProcessInfo.getMessageQueue().getName().equals(
0652:                            reply)) {
0653:                        return true;
0654:                    }
0655:                    utw.begin();
0656:                    if (reply.equals(MessageServiceManagerMBean.JNDI_URL)
0657:                            && (message.getTargetNamedQueue() != null)) {
0658:                        if (false == NamedQueueManagerImpl.getInstance()
0659:                                .checkForNamedQueue(
0660:                                        message.getReplyNamedQueue(), false)) {
0661:                            utw.release();
0662:                            message.addError(Message.ERROR, "The named queue ["
0663:                                    + message.getReplyNamedQueue()
0664:                                    + "] does not exist.");
0665:                            initUndeliverableProcess(message);
0666:                            return false;
0667:                        }
0668:                        messageProcessInfo.getMessageQueue().removeMessage(
0669:                                message.getMessageId());
0670:                        ((MessageManagerImpl) messageProcessInfo
0671:                                .getMessageManager()).assignToQueue(message
0672:                                .getReplyNamedQueue());
0673:                        NamedMemoryQueue.getInstance(
0674:                                message.getTargetNamedQueue()).addMessage(
0675:                                messageProcessInfo.getMessageManager());
0676:                    } else {
0677:                        MessageQueue messageQueue = MessageQueueManager
0678:                                .getInstance().getQueue(reply);
0679:                        messageProcessInfo.getMessageQueue().removeMessage(
0680:                                message.getMessageId());
0681:                        ((MessageManagerImpl) messageProcessInfo
0682:                                .getMessageManager()).assignToQueue(reply);
0683:                        messageQueue.addMessage(messageProcessInfo
0684:                                .getMessageManager());
0685:                    }
0686:                    utw.commit();
0687:                    return false;
0688:                } catch (Exception ex) {
0689:                    log.error("Failed to check the reply queue : "
0690:                            + ex.getMessage(), ex);
0691:                    throw new MessageServiceException(
0692:                            "Failed to check the reply queue : "
0693:                                    + ex.getMessage(), ex);
0694:                } finally {
0695:                    utw.release();
0696:                }
0697:            }
0698:
0699:            /**
0700:             * This method clones the original message so that it can be sent to all
0701:             * the daemons suppliung the services.
0702:             *
0703:             * @param message The message to clone.
0704:             * @exception MessageServiceException
0705:             */
0706:            private void cloneMessageForServices(Message message)
0707:                    throws MessageServiceException {
0708:                try {
0709:                    String[] services = message.getServices();
0710:                    List serviceList = new ArrayList();
0711:                    for (int index = 0; index < services.length; index++) {
0712:                        serviceList.add(services[index]);
0713:                    }
0714:                    ServiceBroker serviceBroker = (ServiceBroker) ConnectionManager
0715:                            .getInstance().getConnection(ServiceBroker.class,
0716:                                    ServiceBroker.JNDI_URL);
0717:                    List daemonList = serviceBroker
0718:                            .getServiceProviders(serviceList);
0719:                    if (daemonList.size() == 0) {
0720:                        message
0721:                                .addError(Message.ERROR,
0722:                                        "There are no daemon providing these services.");
0723:                        initUndeliverableProcess(message);
0724:                        return;
0725:                    }
0726:
0727:                    utw.begin();
0728:                    for (int index = 0; index < daemonList.size(); index++) {
0729:                        MessageImpl newMessage = (MessageImpl) ((MessageImpl) message)
0730:                                .clone();
0731:                        newMessage.setMessageId(RandomGuid.getInstance()
0732:                                .getGuid());
0733:                        newMessage.setTarget((String) daemonList.get(index));
0734:                        newMessage.setMessageType(Message.POINT_TO_POINT);
0735:                        newMessage.setNextProcessDate(new Date());
0736:                        MessageManager messageManager = MessageManagerFactory
0737:                                .getInstance().getMessageManager(newMessage);
0738:                        MessageQueue messageQueue = MessageQueueManager
0739:                                .getInstance().getQueue(
0740:                                        MessageQueueManager.UNSORTED);
0741:                        ((MessageManagerImpl) messageManager)
0742:                                .assignToQueue(MessageQueueManager.UNSORTED);
0743:                        messageQueue.addMessage(messageManager);
0744:                    }
0745:                    messageProcessInfo.getMessageManager().remove();
0746:                    messageProcessInfo.getMessageQueue().removeMessage(
0747:                            message.getMessageId());
0748:                    utw.commit();
0749:                } catch (Exception ex) {
0750:                    log.error("Failed to clone the messages : "
0751:                            + ex.getMessage(), ex);
0752:                    throw new MessageServiceException(
0753:                            "Failed to clone the messages : " + ex.getMessage(),
0754:                            ex);
0755:                } finally {
0756:                    utw.release();
0757:                }
0758:            }
0759:
0760:            /**
0761:             * This method delivers the message to another coadunation instance.
0762:             *
0763:             * @param message The reference to the message object.
0764:             * @exception MessageServiceException
0765:             */
0766:            private void deliverToParent(Message message)
0767:                    throws MessageServiceException {
0768:                try {
0769:                    if (namingDirector.isPrimary()) {
0770:                        message.addError(Message.ERROR,
0771:                                "The primary has no parent cannot go further.");
0772:                        initUndeliverableProcess(message);
0773:                        return;
0774:                    }
0775:                    Message messageCopy = (Message) ((MessageImpl) message)
0776:                            .clone();
0777:                    if (message.getTarget() != null) {
0778:                        messageCopy.setFrom(downJNDIUrl(message.getTarget()));
0779:                    }
0780:                    if (message.getReplyTo() != null) {
0781:                        messageCopy
0782:                                .setReplyTo(downJNDIUrl(message.getReplyTo()));
0783:                    }
0784:                    if (message.getFrom() != null) {
0785:                        messageCopy.setFrom(downJNDIUrl(message.getFrom()));
0786:                    }
0787:
0788:                    log.debug("Deliver message to parent : "
0789:                            + message.getMessageId());
0790:                    utw.begin();
0791:                    MessageStore messageStore = (MessageStore) ConnectionManager
0792:                            .getInstance().getConnection(
0793:                                    MessageStore.class,
0794:                                    namingDirector.getPrimaryJNDIUrl() + "/"
0795:                                            + MessageStore.JNDI_URL);
0796:                    messageProcessInfo.getMessageManager().remove();
0797:                    messageProcessInfo.getMessageQueue().removeMessage(
0798:                            message.getMessageId());
0799:                    log
0800:                            .debug("The message has been deliverd to parent committing : "
0801:                                    + message.getMessageId());
0802:                    IDLock.getInstance().lock(message.getMessageId());
0803:                    messageStore.addMessage(messageCopy);
0804:                    utw.commit();
0805:                    log.debug("Delivered message to parent : "
0806:                            + message.getMessageId());
0807:                } catch (Exception ex) {
0808:                    log.error("Failed to deliver to a the parent : "
0809:                            + ex.getMessage(), ex);
0810:                    throw new MessageServiceException(
0811:                            "Failed to deliver to a the parent : "
0812:                                    + ex.getMessage(), ex);
0813:                } finally {
0814:                    utw.release();
0815:                }
0816:            }
0817:
0818:            /**
0819:             * This method delivers the message to another coadunation instance.
0820:             *
0821:             * @param target The target of the message.
0822:             * @param message The reference to the message object.
0823:             */
0824:            private void deliverToChild(String target, Message message)
0825:                    throws MessageServiceException {
0826:                try {
0827:                    Message messageCopy = (Message) ((MessageImpl) message)
0828:                            .clone();
0829:                    if (message.getTarget() != null) {
0830:                        messageCopy.setTarget(upJNDIUrl(target, message
0831:                                .getTarget()));
0832:                    }
0833:                    if (message.getReplyTo() != null) {
0834:                        messageCopy.setReplyTo(upJNDIUrl(target, message
0835:                                .getReplyTo()));
0836:                    }
0837:                    if (message.getFrom() != null) {
0838:                        messageCopy
0839:                                .setFrom(upJNDIUrl(target, message.getFrom()));
0840:                    }
0841:
0842:                    String subContextUrl = NamingConstants.SUBCONTEXT + "/";
0843:                    if (target.contains(namingDirector.getInstanceId())) {
0844:                        subContextUrl = namingDirector.getInstanceId() + "/"
0845:                                + NamingConstants.SUBCONTEXT + "/";
0846:                    }
0847:                    int pos = target.indexOf(subContextUrl);
0848:                    if (pos == -1) {
0849:                        message.addError(Message.ERROR,
0850:                                "Cannot find the sub reference information : "
0851:                                        + target);
0852:                        initUndeliverableProcess(message);
0853:                        return;
0854:                    }
0855:                    String subContext = target.substring(pos
0856:                            + subContextUrl.length());
0857:                    subContext = NamingConstants.SUBCONTEXT + "/"
0858:                            + subContext.substring(0, subContext.indexOf('/'))
0859:                            + "/" + MessageStore.JNDI_URL;
0860:                    log.debug("Deliver message to child : "
0861:                            + message.getMessageId());
0862:                    utw.begin();
0863:                    MessageStore messageStore = (MessageStore) ConnectionManager
0864:                            .getInstance().getConnection(MessageStore.class,
0865:                                    subContext);
0866:                    messageProcessInfo.getMessageManager().remove();
0867:                    messageProcessInfo.getMessageQueue().removeMessage(
0868:                            message.getMessageId());
0869:                    IDLock.getInstance().lock(message.getMessageId());
0870:                    messageStore.addMessage(messageCopy);
0871:                    log
0872:                            .debug("The message has been deliverd to child committing : "
0873:                                    + message.getMessageId());
0874:                    utw.commit();
0875:                    log.debug("Delivered message to child : "
0876:                            + message.getMessageId());
0877:                } catch (Exception ex) {
0878:                    log.error("Failed to deliver to a the child : "
0879:                            + ex.getMessage(), ex);
0880:                    throw new MessageServiceException(
0881:                            "Failed to deliver to a the child : "
0882:                                    + ex.getMessage(), ex);
0883:                } finally {
0884:                    utw.release();
0885:                }
0886:            }
0887:
0888:            /**
0889:             * This method delivers the message.
0890:             *
0891:             * @param message The message to deliver
0892:             * @exception MessageServiceException
0893:             */
0894:            private void deliverMessage(Message message)
0895:                    throws MessageServiceException {
0896:                initUserSession(message);
0897:                try {
0898:                    if (message instanceof  RPCMessage) {
0899:                        deliverRPCMessage(message.getTarget(), message);
0900:                    } else if (message instanceof  TextMessage) {
0901:                        deliverTextMessage(message.getTarget(), message);
0902:                    }
0903:                } finally {
0904:                    releaseUserSession();
0905:                }
0906:            }
0907:
0908:            /**
0909:             * This method delivers the reply message.
0910:             *
0911:             * @param message The message to deliver
0912:             * @exception MessageServiceException
0913:             */
0914:            private void deliverReplyMessage(Message message)
0915:                    throws MessageServiceException {
0916:                initUserSession(message);
0917:                try {
0918:                    String reply = message.getReplyTo();
0919:                    if (reply == null) {
0920:                        reply = message.getFrom();
0921:                        if (reply == null) {
0922:                            message.addError(Message.ERROR,
0923:                                    "There is no reply for this message");
0924:                            initUndeliverableProcess(message);
0925:                            return;
0926:                        }
0927:                    }
0928:                    if (message instanceof  RPCMessage) {
0929:                        deliverReplyRPCMessage(reply, message);
0930:                    } else if (message instanceof  TextMessage) {
0931:                        deliverReplyTextMessage(reply, message);
0932:                    }
0933:                } finally {
0934:                    releaseUserSession();
0935:                }
0936:            }
0937:
0938:            /**
0939:             * This method delivers the rpc message to its target.
0940:             *
0941:             * @param message The message to deliver.
0942:             * @exception MessageServiceException
0943:             */
0944:            private void deliverRPCMessage(String target, Message message)
0945:                    throws MessageServiceException {
0946:                ClassLoader original = null;
0947:                try {
0948:                    Object ref = null;
0949:                    if (((ref = BeanConnector.getInstance().getBean(target)) == null)
0950:                            && ((ref = JMXBeanConnector.getInstance()
0951:                                    .getJMXBean(target)) == null)) {
0952:                        message.addError(Message.ERROR, "The target [" + target
0953:                                + "] does not exist.");
0954:                        initUndeliverableProcess(message);
0955:                        return;
0956:                    }
0957:                    original = Thread.currentThread().getContextClassLoader();
0958:                    Thread.currentThread().setContextClassLoader(
0959:                            ref.getClass().getClassLoader());
0960:
0961:                    RPCMessageImpl rpcMessageImpl = (RPCMessageImpl) ((RPCMessageImpl) message)
0962:                            .clone();
0963:                    Method method = null;
0964:                    try {
0965:                        method = ref.getClass().getMethod(
0966:                                rpcMessageImpl.getMethodName(),
0967:                                rpcMessageImpl.getArgumentTypes());
0968:                    } catch (Exception ex) {
0969:                        log.error("Failed to find the method on the target : "
0970:                                + ex.getMessage(), ex);
0971:                        if (original != null) {
0972:                            Thread.currentThread().setContextClassLoader(
0973:                                    original);
0974:                            original = null;
0975:                        }
0976:                        message.addError(Message.ERROR,
0977:                                "Failed to find the method on the target : "
0978:                                        + ex.getMessage());
0979:                        initUndeliverableProcess(message);
0980:                        return;
0981:                    }
0982:                    utw.begin();
0983:                    try {
0984:                        Object result = method.invoke(ref, rpcMessageImpl
0985:                                .getArguments());
0986:                        ((RPCMessage) message).setResult(result);
0987:
0988:                        // reset the class loader
0989:                        if (original != null) {
0990:                            Thread.currentThread().setContextClassLoader(
0991:                                    original);
0992:                            original = null;
0993:                        }
0994:                    } catch (Throwable ex) {
0995:                        log.error("Caught an exception : " + ex.getMessage(),
0996:                                ex);
0997:                        // reset the class loader
0998:                        if (original != null) {
0999:                            Thread.currentThread().setContextClassLoader(
1000:                                    original);
1001:                            original = null;
1002:                        }
1003:                        // force the rollback of any changes
1004:                        utw.release();
1005:                        // start a new transaction
1006:                        utw.begin();
1007:
1008:                        // deal with invocation exception
1009:                        if (ex instanceof  java.lang.reflect.InvocationTargetException) {
1010:                            ex = ex.getCause();
1011:                        }
1012:
1013:                        // if this is a remote exception
1014:                        if (ex instanceof  java.rmi.RemoteException) {
1015:                            log.info("This is a remote exception and results "
1016:                                    + "in a retry");
1017:                            message.incrementRetries();
1018:                            Date nextDate = new Date();
1019:                            nextDate.setTime(nextDate.getTime() + delay);
1020:                            ((MessageImpl) message)
1021:                                    .setNextProcessDate(nextDate);
1022:                            messageProcessInfo.getMessageManager()
1023:                                    .updateMessage(message);
1024:                            utw.commit();
1025:                            messageProcessInfo.getMessageQueue()
1026:                                    .pushBackMessage(
1027:                                            messageProcessInfo
1028:                                                    .getMessageManager());
1029:
1030:                            return;
1031:                        }
1032:                        ((RPCMessage) message).setThrowable(ex);
1033:                    }
1034:                    // deal with reply
1035:                    if (message.getReply()) {
1036:                        log.info("Init the process to deliver to the sender : "
1037:                                + message.getMessageId());
1038:                        ((RPCMessageImpl) message).setState(Message.DELIVERED);
1039:                        messageProcessInfo.getMessageManager().updateMessage(
1040:                                message);
1041:                        messageProcessInfo.getMessageQueue().removeMessage(
1042:                                message.getMessageId());
1043:                        MessageQueue messageQueue = MessageQueueManager
1044:                                .getInstance().getQueue(
1045:                                        MessageQueueManager.UNSORTED);
1046:                        ((MessageManagerImpl) messageProcessInfo
1047:                                .getMessageManager())
1048:                                .assignToQueue(MessageQueueManager.UNSORTED);
1049:                        messageQueue.addMessage(messageProcessInfo
1050:                                .getMessageManager());
1051:                    } else {
1052:                        log.info("Removing the completed rpc message : "
1053:                                + message.getMessageId());
1054:                        messageProcessInfo.getMessageManager().remove();
1055:                        messageProcessInfo.getMessageQueue().removeMessage(
1056:                                message.getMessageId());
1057:                    }
1058:                    utw.commit();
1059:                } catch (Exception ex) {
1060:                    log.error("Failed to deliver the RPC Message : "
1061:                            + ex.getMessage(), ex);
1062:                    throw new MessageServiceException(
1063:                            "Failed to deliver the RPC Message : "
1064:                                    + ex.getMessage(), ex);
1065:                } finally {
1066:                    utw.release();
1067:                    if (original != null) {
1068:                        Thread.currentThread().setContextClassLoader(original);
1069:                    }
1070:                }
1071:            }
1072:
1073:            /**
1074:             * This method delivers the rpc reply message.
1075:             *
1076:             * @param reply The reply address for the message.
1077:             * @param message The message to deliver.
1078:             * @exception MessageServiceException
1079:             */
1080:            private void deliverReplyRPCMessage(String reply, Message message)
1081:                    throws MessageServiceException {
1082:                ClassLoader original = null;
1083:                try {
1084:                    Object ref = null;
1085:                    if (((ref = BeanConnector.getInstance().getBean(reply)) == null)
1086:                            && ((ref = JMXBeanConnector.getInstance()
1087:                                    .getJMXBean(reply)) == null)) {
1088:                        message.addError(Message.ERROR, "The reply [" + reply
1089:                                + "] does not exist.");
1090:                        initUndeliverableProcess(message);
1091:                        return;
1092:                    }
1093:                    original = Thread.currentThread().getContextClassLoader();
1094:                    Thread.currentThread().setContextClassLoader(
1095:                            ref.getClass().getClassLoader());
1096:
1097:                    utw.begin();
1098:                    RPCMessage rpcMessage = (RPCMessage) message;
1099:                    try {
1100:                        if (rpcMessage.generatedException()) {
1101:                            Method method = ref.getClass().getMethod(
1102:                                    "onFailure",
1103:                                    new Class[] { String.class, String.class,
1104:                                            Throwable.class });
1105:                            Throwable ex = rpcMessage.getThrowable();
1106:                            if (ex instanceof  java.lang.reflect.InvocationTargetException) {
1107:                                ex = ((java.lang.reflect.InvocationTargetException) ex)
1108:                                        .getCause();
1109:                            }
1110:                            method.invoke(ref, new Object[] {
1111:                                    rpcMessage.getMessageId(),
1112:                                    rpcMessage.getCorrelationId(), ex });
1113:                        } else {
1114:                            Method method = ref.getClass().getMethod(
1115:                                    "onSuccess",
1116:                                    new Class[] { String.class, String.class,
1117:                                            Object.class });
1118:                            method.invoke(ref, new Object[] {
1119:                                    rpcMessage.getMessageId(),
1120:                                    rpcMessage.getCorrelationId(),
1121:                                    rpcMessage.getResult() });
1122:                        }
1123:                    } catch (Exception ex) {
1124:                        log.error("Failed to deliver the message [" + reply
1125:                                + "] to the AsyncCallbackHandler method : "
1126:                                + ex.getMessage(), ex);
1127:                        utw.release();
1128:                        message
1129:                                .addError(
1130:                                        Message.ERROR,
1131:                                        "Failed to deliver the message ["
1132:                                                + reply
1133:                                                + "] to the AsyncCallbackHandler method : "
1134:                                                + ex.getMessage());
1135:                        initUndeliverableProcess(message);
1136:                        return;
1137:                    }
1138:                    if (original != null) {
1139:                        Thread.currentThread().setContextClassLoader(original);
1140:                        original = null;
1141:                    }
1142:                    log.info("Removing the completed rpc message : "
1143:                            + rpcMessage.getMessageId());
1144:                    messageProcessInfo.getMessageManager().remove();
1145:                    messageProcessInfo.getMessageQueue().removeMessage(
1146:                            rpcMessage.getMessageId());
1147:
1148:                    utw.commit();
1149:                } catch (Exception ex) {
1150:                    log.error("Failed to deliver the reply RPC Message : "
1151:                            + ex.getMessage(), ex);
1152:                    throw new MessageServiceException(
1153:                            "Failed to deliver the reply RPC Message : "
1154:                                    + ex.getMessage(), ex);
1155:                } finally {
1156:                    utw.release();
1157:                    if (original != null) {
1158:                        Thread.currentThread().setContextClassLoader(original);
1159:                    }
1160:                }
1161:            }
1162:
1163:            /**
1164:             * This method delivers the text message to its target.
1165:             *
1166:             * @param target The target to deliver the message to.
1167:             * @param message The message to deliver.
1168:             * @exception MessageServiceException
1169:             */
1170:            private void deliverTextMessage(String target, Message message)
1171:                    throws MessageServiceException {
1172:                try {
1173:                    MessageHandler messageHandler = (MessageHandler) ConnectionManager
1174:                            .getInstance().getConnection(MessageHandler.class,
1175:                                    target);
1176:                    utw.begin();
1177:                    Message result = messageHandler.processMessage(message);
1178:                    result.incrementRetries();
1179:                    if (result.isAcknowledged() && result.getReply()
1180:                            && (message.getState() == Message.UNDELIVERED)) {
1181:                        log.info("Init the process to deliver to the sender : "
1182:                                + message.getMessageId());
1183:                        ((MessageImpl) result).setState(Message.DELIVERED);
1184:                        messageProcessInfo.getMessageManager().updateMessage(
1185:                                result);
1186:                        messageProcessInfo.getMessageQueue().removeMessage(
1187:                                message.getMessageId());
1188:                        MessageQueue messageQueue = MessageQueueManager
1189:                                .getInstance().getQueue(
1190:                                        MessageQueueManager.UNSORTED);
1191:                        ((MessageManagerImpl) messageProcessInfo
1192:                                .getMessageManager())
1193:                                .assignToQueue(MessageQueueManager.UNSORTED);
1194:                        messageQueue.addMessage(messageProcessInfo
1195:                                .getMessageManager());
1196:                        utw.commit();
1197:                    } else if ((result.isAcknowledged() && !result.getReply())
1198:                            || (result.isAcknowledged() && (message.getState() == Message.DELIVERED))) {
1199:                        log.info("Removing the completed text message : "
1200:                                + message.getMessageId());
1201:                        messageProcessInfo.getMessageManager().remove();
1202:                        messageProcessInfo.getMessageQueue().removeMessage(
1203:                                message.getMessageId());
1204:                        utw.commit();
1205:                    } else {
1206:                        Date nextDate = new Date();
1207:                        nextDate.setTime(nextDate.getTime() + delay);
1208:                        ((MessageImpl) result).setNextProcessDate(nextDate);
1209:                        messageProcessInfo.getMessageManager().updateMessage(
1210:                                result);
1211:                        utw.commit();
1212:                        messageProcessInfo.getMessageQueue().pushBackMessage(
1213:                                messageProcessInfo.getMessageManager());
1214:                    }
1215:                } catch (java.lang.ClassCastException ex) {
1216:                    log
1217:                            .error(
1218:                                    "Failed to deliver the text message ["
1219:                                            + message.getMessageId()
1220:                                            + "], "
1221:                                            + "init the undeliverable process, as the target cannot be "
1222:                                            + "spoken to correctly : "
1223:                                            + ex.getMessage(), ex);
1224:                    message.addError(Message.ERROR,
1225:                            "Failed to deliver the text message : "
1226:                                    + ex.getMessage());
1227:                    initUndeliverableProcess(message);
1228:                } catch (com.rift.coad.util.connection.NameNotFound ex) {
1229:                    log.error("Failed to deliver the text message ["
1230:                            + message.getMessageId() + "], "
1231:                            + "init the undeliverable process, "
1232:                            + "as the target name cannot be found : "
1233:                            + ex.getMessage(), ex);
1234:                    message.addError(Message.ERROR,
1235:                            "Failed to deliver the text message : "
1236:                                    + ex.getMessage());
1237:                    initUndeliverableProcess(message);
1238:                } catch (Exception ex) {
1239:                    log.error(
1240:                            "Failed to deliver the text message ["
1241:                                    + message.getMessageId() + "] : "
1242:                                    + ex.getMessage(), ex);
1243:                    throw new MessageServiceException(
1244:                            "Failed to deliver the text message : "
1245:                                    + ex.getMessage(), ex);
1246:                } finally {
1247:                    utw.release();
1248:                }
1249:            }
1250:
1251:            /**
1252:             * This method delivers the reply text message to its target.
1253:             *
1254:             * @param reply The reply address for the message.
1255:             * @param message The message to deliver.
1256:             * @exception MessageServiceException
1257:             */
1258:            private void deliverReplyTextMessage(String reply, Message message)
1259:                    throws MessageServiceException {
1260:                try {
1261:                    MessageHandler messageHandler = (MessageHandler) ConnectionManager
1262:                            .getInstance().getConnection(MessageHandler.class,
1263:                                    reply);
1264:                    utw.begin();
1265:                    Message result = messageHandler.processMessage(message);
1266:                    result.incrementRetries();
1267:                    if (result.isAcknowledged()) {
1268:                        log.info("Removing the completed text message : "
1269:                                + message.getMessageId());
1270:                        messageProcessInfo.getMessageManager().remove();
1271:                        messageProcessInfo.getMessageQueue().removeMessage(
1272:                                message.getMessageId());
1273:                        utw.commit();
1274:                    } else {
1275:                        Date nextDate = new Date();
1276:                        nextDate.setTime(nextDate.getTime() + delay);
1277:                        ((MessageImpl) result).setNextProcessDate(nextDate);
1278:                        messageProcessInfo.getMessageManager().updateMessage(
1279:                                result);
1280:                        utw.commit();
1281:                        messageProcessInfo.getMessageQueue().pushBackMessage(
1282:                                messageProcessInfo.getMessageManager());
1283:                    }
1284:                } catch (java.lang.ClassCastException ex) {
1285:                    log.error("Failed to deliver the text message : "
1286:                            + ex.getMessage(), ex);
1287:                    message.addError(Message.ERROR,
1288:                            "Failed to deliver the text message : "
1289:                                    + ex.getMessage());
1290:                    initUndeliverableProcess(message);
1291:                } catch (com.rift.coad.util.connection.NameNotFound ex) {
1292:                    log.error("Failed to deliver the text message : "
1293:                            + ex.getMessage(), ex);
1294:                    message.addError(Message.ERROR,
1295:                            "Failed to deliver the text message : "
1296:                                    + ex.getMessage());
1297:                    initUndeliverableProcess(message);
1298:                } catch (Exception ex) {
1299:                    log.error("Failed to deliver the text message : "
1300:                            + ex.getMessage(), ex);
1301:                    throw new MessageServiceException(
1302:                            "Failed to deliver the text message : "
1303:                                    + ex.getMessage(), ex);
1304:                } finally {
1305:                    utw.release();
1306:                }
1307:            }
1308:
1309:            /**
1310:             * This method prepends the JNDI URL base.
1311:             *
1312:             * @return The modified url.
1313:             * @param url The url to modify.
1314:             */
1315:            private String downJNDIUrl(String url)
1316:                    throws MessageServiceException {
1317:                try {
1318:                    String instanceBase = NamingConstants.SUBCONTEXT + "/"
1319:                            + namingDirector.getInstanceId() + "/";
1320:                    if (url.indexOf(PARENT_INSTANCE) == 0) {
1321:                        return url.substring(PARENT_INSTANCE.length());
1322:                    } else if (url.contains(namingDirector.getPrimaryJNDIUrl())
1323:                            || url.contains(namingDirector.getJNDIBase())
1324:                            || url.contains(instanceBase)
1325:                            || (url
1326:                                    .indexOf(NamingConstants.JNDI_NETWORK_PREFIX) == 0)) {
1327:                        return url;
1328:                    } else if (!url.contains(namingDirector.getInstanceId())) {
1329:                        return instanceBase + url;
1330:                    } else {
1331:                        return NamingConstants.SUBCONTEXT + "/" + url;
1332:                    }
1333:                } catch (Exception ex) {
1334:                    log.error("Failed to move down the url : "
1335:                            + ex.getMessage(), ex);
1336:                    throw new MessageServiceException(
1337:                            "Failed to move down the url : " + ex.getMessage(),
1338:                            ex);
1339:                }
1340:            }
1341:
1342:            /**
1343:             * This method prepends the JNDI URL base.
1344:             *
1345:             * @return The modified url.
1346:             * @param url The url to modify.
1347:             */
1348:            private String upJNDIUrl(String target, String url)
1349:                    throws MessageServiceException {
1350:                try {
1351:                    String updatedURL = url;
1352:                    String instanceBase = namingDirector.getInstanceId() + "/"
1353:                            + NamingConstants.SUBCONTEXT + "/";
1354:                    String subContextUrl = NamingConstants.SUBCONTEXT + "/";
1355:                    int pos = url.indexOf(subContextUrl);
1356:                    if (url.contains(instanceBase)) {
1357:                        updatedURL = url.substring(url.indexOf(instanceBase)
1358:                                + instanceBase.length());
1359:                        pos = updatedURL.indexOf(subContextUrl);
1360:                        if (url.equals(target) && (pos == 0)) {
1361:                            updatedURL = updatedURL
1362:                                    .substring(updatedURL.indexOf("/", pos
1363:                                            + subContextUrl.length()) + 1);
1364:                        }
1365:                    } else if (url.equals(target) && (pos == 0)) {
1366:                        updatedURL = url.substring(url.indexOf("/", pos
1367:                                + subContextUrl.length()) + 1);
1368:                    } else {
1369:                        updatedURL = PARENT_INSTANCE + url;
1370:                    }
1371:                    return updatedURL;
1372:                } catch (Exception ex) {
1373:                    log.error(
1374:                            "Failed to modify the url to set it relative to the next "
1375:                                    + "coadunation intance : "
1376:                                    + ex.getMessage(), ex);
1377:                    throw new MessageServiceException(
1378:                            "Failed to modify the url to "
1379:                                    + "set it relative to the next coadunation intance : "
1380:                                    + ex.getMessage(), ex);
1381:                }
1382:            }
1383:
1384:            /**
1385:             * The message has been deemed undeliverable for some reason.
1386:             *
1387:             * @param message The message set as undeliverable
1388:             * @excption MessageServiceException
1389:             */
1390:            private void initUndeliverableProcess(Message message)
1391:                    throws MessageServiceException {
1392:                try {
1393:                    int currentState = message.getState();
1394:                    utw.begin();
1395:                    ((MessageImpl) message).setState(Message.UNDELIVERABLE);
1396:                    messageProcessInfo.getMessageManager().updateMessage(
1397:                            message);
1398:                    utw.commit();
1399:                    utw.release();
1400:                    if (currentState == Message.DELIVERED) {
1401:                        processUndeliverable(message);
1402:                    } else if (messageProcessInfo.getMessageQueue().getName()
1403:                            .equals(MessageQueueManager.UNSORTED)) {
1404:                        messageProcessInfo.getMessageQueue().pushBackMessage(
1405:                                messageProcessInfo.getMessageManager());
1406:                    } else {
1407:                        utw.begin();
1408:                        messageProcessInfo.getMessageQueue().removeMessage(
1409:                                message.getMessageId());
1410:                        MessageQueue messageQueue = MessageQueueManager
1411:                                .getInstance().getQueue(
1412:                                        MessageQueueManager.UNSORTED);
1413:                        ((MessageManagerImpl) messageProcessInfo
1414:                                .getMessageManager())
1415:                                .assignToQueue(MessageQueueManager.UNSORTED);
1416:                        messageQueue.addMessage(messageProcessInfo
1417:                                .getMessageManager());
1418:                        utw.commit();
1419:                    }
1420:
1421:                } catch (Exception ex) {
1422:                    log.error("Failed to init the undeliverable process :"
1423:                            + ex.getMessage(), ex);
1424:                    throw new MessageServiceException("Failed to init the "
1425:                            + "undeliverable process :" + ex.getMessage(), ex);
1426:                } finally {
1427:                    utw.release();
1428:                }
1429:            }
1430:
1431:            /**
1432:             * This method is responsible for initializing the user session.
1433:             *
1434:             * @param message The message containing the user session information.
1435:             * @exception MessageServiceException
1436:             */
1437:            private void initUserSession(Message message)
1438:                    throws MessageServiceException {
1439:                try {
1440:                    Session session = new Session(message.getMessageCreater(),
1441:                            message.getSessionId(), new HashSet(message
1442:                                    .getMessagePrincipals()));
1443:                    getServerInterceptor().createSession(session);
1444:                } catch (Exception ex) {
1445:                    log.error("Failed to setup the user session : "
1446:                            + ex.getMessage(), ex);
1447:                    throw new MessageServiceException(
1448:                            "Failed to setup the user session :"
1449:                                    + ex.getMessage(), ex);
1450:                }
1451:            }
1452:
1453:            /**
1454:             * This method is responsible for initializing the user session.
1455:             */
1456:            private void releaseUserSession() {
1457:                try {
1458:                    getServerInterceptor().release();
1459:                } catch (Exception ex) {
1460:                    log.error("Failed to release the user session : "
1461:                            + ex.getMessage(), ex);
1462:                }
1463:            }
1464:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.