Source Code Cross Referenced for MessageManagerImpl.java in  » Net » Coadunation_1.0.1 » com » rift » coad » daemon » messageservice » message » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Net » Coadunation_1.0.1 » com.rift.coad.daemon.messageservice.message 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


0001:        /*
0002:         * MessageService: The message service daemon
0003:         * Copyright (C) 2006  Rift IT Contracting
0004:         *
0005:         * This library is free software; you can redistribute it and/or
0006:         * modify it under the terms of the GNU Lesser General Public
0007:         * License as published by the Free Software Foundation; either
0008:         * version 2.1 of the License, or (at your option) any later version.
0009:         *
0010:         * This library is distributed in the hope that it will be useful,
0011:         * but WITHOUT ANY WARRANTY; without even the implied warranty of
0012:         * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
0013:         * Lesser General Public License for more details.
0014:         *
0015:         * You should have received a copy of the GNU Lesser General Public
0016:         * License along with this library; if not, write to the Free Software
0017:         * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
0018:         *
0019:         * MessageManagerImpl.java
0020:         */
0021:
0022:        // the package path
0023:        package com.rift.coad.daemon.messageservice.message;
0024:
0025:        // java imports
0026:        import java.sql.Timestamp;
0027:        import java.util.ArrayList;
0028:        import java.util.Date;
0029:        import java.util.Enumeration;
0030:        import java.util.HashSet;
0031:        import java.util.Iterator;
0032:        import java.util.List;
0033:        import java.util.Set;
0034:        import javax.transaction.xa.XAException;
0035:        import javax.transaction.xa.XAResource;
0036:        import javax.transaction.xa.Xid;
0037:
0038:        // hibernate imports
0039:        import org.hibernate.*;
0040:        import org.hibernate.cfg.*;
0041:
0042:        // logging import
0043:        import org.apache.log4j.Logger;
0044:
0045:        // coadunation imports
0046:        import com.rift.coad.daemon.messageservice.Message;
0047:        import com.rift.coad.daemon.messageservice.MessageServiceManager;
0048:        import com.rift.coad.daemon.messageservice.RPCMessage;
0049:        import com.rift.coad.daemon.messageservice.TextMessage;
0050:        import com.rift.coad.daemon.messageservice.MessageManager;
0051:        import com.rift.coad.daemon.messageservice.MessageServiceException;
0052:        import com.rift.coad.daemon.messageservice.InvalidProperty;
0053:        import com.rift.coad.daemon.messageservice.db.*;
0054:        import com.rift.coad.daemon.messageservice.named.NamedQueueManagerImpl;
0055:        import com.rift.coad.hibernate.util.HibernateUtil;
0056:        import com.rift.coad.lib.common.ObjectSerializer;
0057:        import com.rift.coad.util.transaction.TransactionManager;
0058:        import com.rift.coad.util.change.Change;
0059:        import com.rift.coad.util.change.ChangeException;
0060:        import com.rift.coad.util.change.ChangeLog;
0061:
0062:        /**
0063:         * This object implements the message manager interface.
0064:         *
0065:         * @author Brett Chaldecott
0066:         */
0067:        public class MessageManagerImpl implements  MessageManager {
0068:
0069:            /**
0070:             * This object represents an add message change
0071:             */
0072:            public static class AddMessageChange implements  Change {
0073:                // private member variables
0074:                private MessageImpl newMessage = null;
0075:
0076:                /**
0077:                 * The constructor of the add message change object.
0078:                 *
0079:                 * @param message The message that represents this change.
0080:                 * @exception MessageServiceException
0081:                 */
0082:                public AddMessageChange(MessageImpl newMessage)
0083:                        throws MessageServiceException {
0084:                    try {
0085:                        this .newMessage = (MessageImpl) newMessage.clone();
0086:                    } catch (Exception ex) {
0087:                        log.error(
0088:                                "Failed to create a new add message change : "
0089:                                        + ex.getMessage(), ex);
0090:                        throw new MessageServiceException(
0091:                                "Failed to create a new add message change : "
0092:                                        + ex.getMessage(), ex);
0093:                    }
0094:                }
0095:
0096:                /**
0097:                 * The definition of the apply method.
0098:                 */
0099:                public void applyChanges() throws ChangeException {
0100:                    try {
0101:                        //MessageTransactionLock.getInstance().lock();
0102:                        Session session = HibernateUtil.getInstance(
0103:                                MessageServiceManager.class).getSession();
0104:
0105:                        // set the reply flag
0106:                        int reply = 0;
0107:                        if (newMessage.getReply()) {
0108:                            reply = 1;
0109:                        }
0110:                        int acknowledged = 0;
0111:                        if (newMessage.isAcknowledged()) {
0112:                            acknowledged = 1;
0113:                        }
0114:
0115:                        // check the type of messge
0116:                        int messageType = TEXT_MESSAGE;
0117:                        if (newMessage instanceof  RPCMessage) {
0118:                            messageType = RPC_MESSAGE;
0119:                        }
0120:
0121:                        // instanciate the basic message
0122:                        com.rift.coad.daemon.messageservice.db.Message message = new com.rift.coad.daemon.messageservice.db.Message(
0123:                                newMessage.getMessageId(), newMessage
0124:                                        .getMessageCreater(), newMessage
0125:                                        .getSessionId(), messageType,
0126:                                newMessage.getMessageType(), newMessage
0127:                                        .getPriority(), reply, newMessage
0128:                                        .getFrom(), acknowledged, 0, newMessage
0129:                                        .getRetries());
0130:                        if ((newMessage.getTarget() != null)
0131:                                && (newMessage.getTarget().length() != 0)) {
0132:                            message.setTarget(newMessage.getTarget());
0133:                        }
0134:                        if (newMessage.getFrom() == null) {
0135:                            throw new InvalidProperty(
0136:                                    "The from address must be set");
0137:                        }
0138:                        message.setFromUrl(newMessage.getFrom());
0139:                        if ((newMessage.getReplyTo() != null)
0140:                                && (newMessage.getReplyTo().length() != 0)) {
0141:                            message.setReplyUrl(newMessage.getReplyTo());
0142:                        }
0143:                        if ((newMessage.getTargetNamedQueue() != null)
0144:                                && (newMessage.getTargetNamedQueue().length() != 0)) {
0145:                            message.setTargetNamedQueue(newMessage
0146:                                    .getTargetNamedQueue());
0147:                        }
0148:                        if ((newMessage.getReplyNamedQueue() != null)
0149:                                && (newMessage.getReplyNamedQueue().length() != 0)) {
0150:                            message.setReplyNamedQueue(newMessage
0151:                                    .getReplyNamedQueue());
0152:                        }
0153:                        if ((newMessage.getCorrelationId() != null)
0154:                                && (newMessage.getCorrelationId().length() != 0)) {
0155:                            message.setCorrelationId(newMessage
0156:                                    .getCorrelationId());
0157:                        }
0158:                        message.setCreated(new Timestamp(newMessage
0159:                                .getCreated().getTime()));
0160:                        message.setProcessed(new Timestamp(newMessage
0161:                                .getProcessedDate().getTime()));
0162:                        message.setNextProcess(new Timestamp(
0163:                                ((MessageImpl) newMessage).getNextProcessDate()
0164:                                        .getTime()));
0165:                        message.setMessageState(newMessage.getState());
0166:                        session.persist(message);
0167:
0168:                        if (newMessage instanceof  RPCMessage) {
0169:                            RPCMessage rpcMessage = (RPCMessage) newMessage;
0170:                            MessageRpcBody rpcBody = new MessageRpcBody();
0171:                            rpcBody.setMessage(message);
0172:                            rpcBody.setMessageId(message.getId());
0173:                            rpcBody.setXml(rpcMessage.getMethodBodyXML());
0174:                            if (rpcMessage.generatedException()) {
0175:                                rpcBody
0176:                                        .setExceptionValue(((RPCMessageImpl) rpcMessage)
0177:                                                .getThrowableBytes().clone());
0178:                            }
0179:                            if (rpcMessage.getResult() != null) {
0180:                                rpcBody
0181:                                        .setResultValue(((RPCMessageImpl) rpcMessage)
0182:                                                .getResultBytes().clone());
0183:                            }
0184:                            session.persist(rpcBody);
0185:                        } else if (newMessage instanceof  TextMessage) {
0186:                            TextMessage textMessage = (TextMessage) newMessage;
0187:                            MessageTxtBody txtBody = new MessageTxtBody();
0188:                            txtBody.setMessage(message);
0189:                            txtBody.setMessageId(message.getId());
0190:                            txtBody.setBody(textMessage.getTextBody());
0191:                            session.persist(txtBody);
0192:                        } else {
0193:                            log.error("The message type ["
0194:                                    + newMessage.getClass().getName()
0195:                                    + "] is not recognised.");
0196:                            throw new MessageServiceException(
0197:                                    "The message type ["
0198:                                            + newMessage.getClass().getName()
0199:                                            + "] is not recognised.");
0200:                        }
0201:
0202:                        // the message information
0203:                        if (newMessage.getServices() != null) {
0204:                            String[] services = newMessage.getServices();
0205:                            for (int index = 0; index < services.length; index++) {
0206:                                MessageService messageServices = new MessageService();
0207:                                messageServices.setMessage(message);
0208:                                messageServices.setService(services[index]);
0209:                                session.persist(messageServices);
0210:                            }
0211:                        }
0212:
0213:                        // the message properties
0214:                        for (Enumeration enumerat = newMessage
0215:                                .getPropertyNames(); enumerat.hasMoreElements();) {
0216:                            String key = (String) enumerat.nextElement();
0217:                            Object value = newMessage.getPropertyValue(key);
0218:                            MessageProperty property = new MessageProperty();
0219:                            property.setMessage(message);
0220:                            property.setName(key);
0221:                            if (value instanceof  Boolean) {
0222:                                if (((Boolean) value).booleanValue()) {
0223:                                    property.setBoolValue(new Integer(1));
0224:                                } else {
0225:                                    property.setBoolValue(new Integer(0));
0226:                                }
0227:                            } else if (value instanceof  Byte) {
0228:                                property.setByteValue(new Integer(
0229:                                        ((Byte) value).intValue()));
0230:                            } else if (value instanceof  Integer) {
0231:                                property.setIntValue((Integer) value);
0232:                            } else if (value instanceof  Long) {
0233:                                property.setLongValue((Long) value);
0234:                            } else if (value instanceof  Double) {
0235:                                property.setDoubleValue((Double) value);
0236:                            } else if (value instanceof  Float) {
0237:                                property.setFloatValue((Float) value);
0238:                            } else if (value instanceof  String) {
0239:                                property.setStringValue((String) value);
0240:                            } else if (value instanceof  byte[]) {
0241:                                property.setObjectValue((byte[]) value);
0242:                            }
0243:                            session.persist(property);
0244:                        }
0245:
0246:                        // the message information
0247:                        if (newMessage.getMessagePrincipals() == null) {
0248:                            throw new InvalidProperty("Must supply principals");
0249:                        }
0250:                        List principals = newMessage.getMessagePrincipals();
0251:                        for (Iterator iter = principals.iterator(); iter
0252:                                .hasNext();) {
0253:                            MessagePrincipal principal = new MessagePrincipal();
0254:                            principal.setMessage(message);
0255:                            String principalStr = (String) iter.next();
0256:                            //log.info("Add principal : " + principalStr);
0257:                            principal.setPrincipalValue(principalStr);
0258:                            session.persist(principal);
0259:                        }
0260:                        List errors = newMessage.getErrors();
0261:                        for (Iterator iter = errors.iterator(); iter.hasNext();) {
0262:                            com.rift.coad.daemon.messageservice.MessageError messageError = (com.rift.coad.daemon.messageservice.MessageError) iter
0263:                                    .next();
0264:                            com.rift.coad.daemon.messageservice.db.MessageError dbMessageError = new com.rift.coad.daemon.messageservice.db.MessageError();
0265:                            dbMessageError.setMessage(message);
0266:                            dbMessageError.setErrorDate(new java.sql.Timestamp(
0267:                                    messageError.getErrorDate().getTime()));
0268:                            dbMessageError.setErrorLevel(messageError
0269:                                    .getLevel());
0270:                            dbMessageError.setMsg(messageError.getMSG());
0271:                            session.persist(dbMessageError);
0272:                        }
0273:
0274:                    } catch (Exception ex) {
0275:                        log.error("Failed to apply the changes : "
0276:                                + ex.getMessage(), ex);
0277:                        throw new ChangeException(
0278:                                "Failed to apply the changes : "
0279:                                        + ex.getMessage(), ex);
0280:                    }
0281:
0282:                }
0283:            }
0284:
0285:            /**
0286:             * This object represents queue assignment
0287:             */
0288:            public static class AssignMessageToQueueChange implements  Change {
0289:                // private member variables
0290:                private String messageId = null;
0291:                private String queueName = null;
0292:
0293:                /**
0294:                 * The constructor of the assign message to queue object.
0295:                 */
0296:                public AssignMessageToQueueChange(String messageId,
0297:                        String queueName) {
0298:                    this .messageId = new String(messageId);
0299:                    this .queueName = new String(queueName);
0300:                }
0301:
0302:                /**
0303:                 * The definition of the apply method.
0304:                 */
0305:                public void applyChanges() throws ChangeException {
0306:                    try {
0307:                        //MessageTransactionLock.getInstance().lock();
0308:                        Session session = HibernateUtil.getInstance(
0309:                                MessageServiceManager.class).getSession();
0310:                        List entries = session
0311:                                .createQuery(
0312:                                        "FROM MessageQueue as queue WHERE queue.messageQueueName = ?")
0313:                                .setString(0, queueName).list();
0314:
0315:                        if (entries.size() != 1) {
0316:                            log.error("There is no queue by the name of : "
0317:                                    + queueName);
0318:                            throw new MessageServiceException(
0319:                                    "There is no queue by the name of : "
0320:                                            + queueName);
0321:                        }
0322:                        com.rift.coad.daemon.messageservice.db.MessageQueue messageQueue = (com.rift.coad.daemon.messageservice.db.MessageQueue) entries
0323:                                .get(0);
0324:                        com.rift.coad.daemon.messageservice.db.Message message = (com.rift.coad.daemon.messageservice.db.Message) session
0325:                                .get(
0326:                                        com.rift.coad.daemon.messageservice.db.Message.class,
0327:                                        messageId);
0328:                        message.setMessageQueue(messageQueue);
0329:                    } catch (Exception ex) {
0330:                        log.error("Failed to apply the changes : "
0331:                                + ex.getMessage(), ex);
0332:                        throw new ChangeException(
0333:                                "Failed to apply the changes : "
0334:                                        + ex.getMessage(), ex);
0335:                    }
0336:                }
0337:            }
0338:
0339:            /**
0340:             * This object represents an update message change
0341:             */
0342:            public static class UpdateMessageChange implements  Change {
0343:                // private member variables
0344:                private MessageImpl updatedMessage = null;
0345:
0346:                /**
0347:                 * The constructor of the update message change object.
0348:                 *
0349:                 * @param updatedMessage The updated message object.
0350:                 * @exception MessageServiceException
0351:                 */
0352:                public UpdateMessageChange(MessageImpl updatedMessage)
0353:                        throws MessageServiceException {
0354:                    try {
0355:                        this .updatedMessage = (MessageImpl) updatedMessage
0356:                                .clone();
0357:                    } catch (Exception ex) {
0358:                        log.error("Failed to clone the updated message : "
0359:                                + ex.getMessage(), ex);
0360:                        throw new MessageServiceException(
0361:                                "Failed to clone the updated message : "
0362:                                        + ex.getMessage(), ex);
0363:                    }
0364:                }
0365:
0366:                /**
0367:                 * The definition of the apply method.
0368:                 */
0369:                public void applyChanges() throws ChangeException {
0370:                    try {
0371:                        //MessageTransactionLock.getInstance().lock();
0372:                        Session session = HibernateUtil.getInstance(
0373:                                MessageServiceManager.class).getSession();
0374:
0375:                        com.rift.coad.daemon.messageservice.db.Message message = (com.rift.coad.daemon.messageservice.db.Message) session
0376:                                .get(
0377:                                        com.rift.coad.daemon.messageservice.db.Message.class,
0378:                                        updatedMessage.getMessageId());
0379:
0380:                        if (!(updatedMessage instanceof  MessageImpl)) {
0381:                            throw new MessageServiceException(
0382:                                    "The incorrect message object has been passed "
0383:                                            + "into update");
0384:                        }
0385:                        MessageImpl messageImpl = (MessageImpl) updatedMessage;
0386:                        if ((updatedMessage.getTarget() != null)
0387:                                && (updatedMessage.getTarget().length() != 0)) {
0388:                            message.setTarget(updatedMessage.getTarget());
0389:                        }
0390:                        if (updatedMessage.getFrom() == null) {
0391:                            throw new InvalidProperty(
0392:                                    "The from address must be set");
0393:                        }
0394:                        message.setFromUrl(updatedMessage.getFrom());
0395:                        if ((updatedMessage.getReplyTo() != null)
0396:                                && (updatedMessage.getReplyTo().length() != 0)) {
0397:                            message.setReplyUrl(updatedMessage.getReplyTo());
0398:                        }
0399:                        if ((updatedMessage.getTargetNamedQueue() != null)
0400:                                && (updatedMessage.getTargetNamedQueue()
0401:                                        .length() != 0)) {
0402:                            message.setTargetNamedQueue(updatedMessage
0403:                                    .getTargetNamedQueue());
0404:                        }
0405:                        if ((updatedMessage.getReplyNamedQueue() != null)
0406:                                && (updatedMessage.getReplyNamedQueue()
0407:                                        .length() != 0)) {
0408:                            message.setReplyNamedQueue(updatedMessage
0409:                                    .getReplyNamedQueue());
0410:                        }
0411:                        if ((updatedMessage.getCorrelationId() != null)
0412:                                && (updatedMessage.getCorrelationId().length() != 0)) {
0413:                            message.setCorrelationId(updatedMessage
0414:                                    .getCorrelationId());
0415:                        }
0416:                        message.setCreated(new Timestamp(updatedMessage
0417:                                .getCreated().getTime()));
0418:                        message.setProcessed(new Timestamp(updatedMessage
0419:                                .getProcessedDate().getTime()));
0420:                        message.setNextProcess(new Timestamp(
0421:                                ((MessageImpl) updatedMessage)
0422:                                        .getNextProcessDate().getTime()));
0423:                        message.setMessageState(updatedMessage.getState());
0424:                        message.setRetries(updatedMessage.getRetries());
0425:                        message.setMessageRoutingType(updatedMessage
0426:                                .getMessageType());
0427:                        message.setPriority(updatedMessage.getPriority());
0428:                        if (messageImpl.isAcknowledged()) {
0429:                            message.setAcknowledged(1);
0430:                        } else {
0431:                            message.setAcknowledged(0);
0432:                        }
0433:
0434:                        // the message properties
0435:                        session.createQuery(
0436:                                "DELETE FROM MessageProperty as property WHERE "
0437:                                        + "property.message.id = ?").setString(
0438:                                0, updatedMessage.getMessageId())
0439:                                .executeUpdate();
0440:                        //message.getMessageProperties().clear();
0441:                        for (Enumeration enumerat = updatedMessage
0442:                                .getPropertyNames(); enumerat.hasMoreElements();) {
0443:                            String key = (String) enumerat.nextElement();
0444:                            Object value = updatedMessage.getPropertyValue(key);
0445:                            MessageProperty property = new MessageProperty();
0446:                            property.setMessage(message);
0447:                            property.setName(key);
0448:                            if (value instanceof  Boolean) {
0449:                                if (((Boolean) value).booleanValue()) {
0450:                                    property.setBoolValue(new Integer(1));
0451:                                } else {
0452:                                    property.setBoolValue(new Integer(0));
0453:                                }
0454:                            } else if (value instanceof  Byte) {
0455:                                property.setByteValue(new Integer(
0456:                                        ((Byte) value).intValue()));
0457:                            } else if (value instanceof  Integer) {
0458:                                property.setIntValue((Integer) value);
0459:                            } else if (value instanceof  Long) {
0460:                                property.setLongValue((Long) value);
0461:                            } else if (value instanceof  Double) {
0462:                                property.setDoubleValue((Double) value);
0463:                            } else if (value instanceof  Float) {
0464:                                property.setFloatValue((Float) value);
0465:                            } else if (value instanceof  String) {
0466:                                property.setStringValue((String) value);
0467:                            } else if (value instanceof  byte[]) {
0468:                                property.setObjectValue((byte[]) value);
0469:                            }
0470:                            session.persist(property);
0471:                        }
0472:
0473:                        // the message properties
0474:                        session.createQuery(
0475:                                "DELETE FROM MessageError as error WHERE "
0476:                                        + "error.message.id = ?").setString(0,
0477:                                updatedMessage.getMessageId()).executeUpdate();
0478:                        List errors = updatedMessage.getErrors();
0479:                        for (Iterator iter = errors.iterator(); iter.hasNext();) {
0480:                            com.rift.coad.daemon.messageservice.MessageError messageError = (com.rift.coad.daemon.messageservice.MessageError) iter
0481:                                    .next();
0482:                            com.rift.coad.daemon.messageservice.db.MessageError dbMessageError = new com.rift.coad.daemon.messageservice.db.MessageError();
0483:                            dbMessageError.setMessage(message);
0484:                            dbMessageError.setErrorDate(new java.sql.Timestamp(
0485:                                    messageError.getErrorDate().getTime()));
0486:                            dbMessageError.setErrorLevel(messageError
0487:                                    .getLevel());
0488:                            dbMessageError.setMsg(messageError.getMSG());
0489:                            session.persist(dbMessageError);
0490:                        }
0491:
0492:                        if (updatedMessage instanceof  RPCMessage) {
0493:                            RPCMessage rpcMessage = (RPCMessage) updatedMessage;
0494:                            MessageRpcBody rpcBody = (MessageRpcBody) session
0495:                                    .get(MessageRpcBody.class, message.getId());
0496:                            if (rpcMessage.generatedException()) {
0497:                                rpcBody
0498:                                        .setExceptionValue(((RPCMessageImpl) rpcMessage)
0499:                                                .getThrowableBytes().clone());
0500:                            }
0501:                            if (((RPCMessageImpl) rpcMessage).getResultBytes() != null) {
0502:                                rpcBody
0503:                                        .setResultValue(((RPCMessageImpl) rpcMessage)
0504:                                                .getResultBytes().clone());
0505:                            }
0506:                        } else if (updatedMessage instanceof  TextMessage) {
0507:                            TextMessage textMessage = (TextMessage) updatedMessage;
0508:                            MessageTxtBody txtBody = (MessageTxtBody) session
0509:                                    .get(MessageTxtBody.class, message.getId());
0510:                            txtBody.setBody(textMessage.getTextBody());
0511:                        } else {
0512:                            log.error("The message type ["
0513:                                    + updatedMessage.getClass().getName()
0514:                                    + "] is not recognised.");
0515:                            throw new ChangeException("The message type ["
0516:                                    + updatedMessage.getClass().getName()
0517:                                    + "] is not recognised.");
0518:                        }
0519:
0520:                    } catch (ChangeException ex) {
0521:                        throw ex;
0522:                    } catch (Exception ex) {
0523:                        log.error("Failed to update the message because : "
0524:                                + ex.getMessage(), ex);
0525:                        throw new ChangeException(
0526:                                "Failed to update the message because : "
0527:                                        + ex.getMessage(), ex);
0528:                    }
0529:                }
0530:            }
0531:
0532:            /**
0533:             * This object represents a remove message change
0534:             */
0535:            public static class RemoveMessageChange implements  Change {
0536:                // private member variables
0537:                private String messageId = null;
0538:
0539:                /**
0540:                 * The constructor of the remove message change object.
0541:                 *
0542:                 * @param messageId The message id for this object.
0543:                 */
0544:                public RemoveMessageChange(String messageId) {
0545:                    this .messageId = new String(messageId);
0546:                }
0547:
0548:                /**
0549:                 * The definition of the apply method.
0550:                 */
0551:                public void applyChanges() throws ChangeException {
0552:                    try {
0553:                        //MessageTransactionLock.getInstance().lock();
0554:                        Session session = HibernateUtil.getInstance(
0555:                                MessageServiceManager.class).getSession();
0556:                        session.createQuery(
0557:                                "DELETE FROM MessageRpcBody as body WHERE "
0558:                                        + "body.message.id = ?").setString(0,
0559:                                messageId).executeUpdate();
0560:                        session.createQuery(
0561:                                "DELETE FROM MessageTxtBody as body WHERE "
0562:                                        + "body.message.id = ?").setString(0,
0563:                                messageId).executeUpdate();
0564:                        session.createQuery(
0565:                                "DELETE FROM MessageService as service WHERE "
0566:                                        + "service.message.id = ?").setString(
0567:                                0, messageId).executeUpdate();
0568:                        session.createQuery(
0569:                                "DELETE FROM MessageProperty as property WHERE "
0570:                                        + "property.message.id = ?").setString(
0571:                                0, messageId).executeUpdate();
0572:                        session.createQuery(
0573:                                "DELETE FROM MessagePrincipal as principal WHERE "
0574:                                        + "principal.message.id = ?")
0575:                                .setString(0, messageId).executeUpdate();
0576:                        session.createQuery(
0577:                                "DELETE FROM MessageError as error WHERE "
0578:                                        + "error.message.id = ?").setString(0,
0579:                                messageId).executeUpdate();
0580:                        session.createQuery(
0581:                                "DELETE FROM Message as msg WHERE msg.id = ?")
0582:                                .setString(0, messageId).executeUpdate();
0583:                    } catch (Exception ex) {
0584:                        log.error(
0585:                                "Failed to failed to remove the message from the db : "
0586:                                        + ex.getMessage(), ex);
0587:                        throw new ChangeException(
0588:                                "Failed to failed to remove the message from the db : "
0589:                                        + ex.getMessage(), ex);
0590:                    }
0591:                }
0592:            }
0593:
0594:            // class constants
0595:            public final static int TEXT_MESSAGE = 1;
0596:            public final static int RPC_MESSAGE = 2;
0597:
0598:            // the logger reference
0599:            protected static Logger log = Logger
0600:                    .getLogger(MessageManagerImpl.class.getName());
0601:
0602:            // private member variable
0603:            private String id = null;
0604:            private Date nextProcessTime = null;
0605:            private String messageQueueName = null;
0606:            private MessageImpl masterMessageImpl = null;
0607:
0608:            // transaction variables
0609:            private Date originalNextProcessTime = null;
0610:            private String originalMessageQueueName = null;
0611:            private MessageImpl originalMessageImpl = null;
0612:
0613:            /**
0614:             * Creates a new instance of MessageManagerImpl
0615:             *
0616:             * @param id The id of the message.
0617:             */
0618:            public MessageManagerImpl(String id) throws MessageServiceException {
0619:                this .id = id;
0620:                originalMessageImpl = masterMessageImpl = loadMessage();
0621:            }
0622:
0623:            /**
0624:             * Creates a new instance of MessageManagerImpl
0625:             *
0626:             * @param message The new message to create.
0627:             */
0628:            public MessageManagerImpl(Message newMessage)
0629:                    throws MessageServiceException {
0630:                try {
0631:                    this .id = newMessage.getMessageId();
0632:                    nextProcessTime = ((MessageImpl) newMessage)
0633:                            .getNextProcessDate();
0634:                    originalNextProcessTime = ((MessageImpl) newMessage)
0635:                            .getNextProcessDate();
0636:                    originalMessageImpl = (MessageImpl) newMessage;
0637:                    masterMessageImpl = (MessageImpl) newMessage;
0638:                    ChangeLog.getInstance().addChange(
0639:                            new AddMessageChange((MessageImpl) newMessage));
0640:                    TransactionManager.getInstance().bindResource(this , true);
0641:                } catch (MessageServiceException ex) {
0642:                    throw ex;
0643:                } catch (Exception ex) {
0644:                    log.error("Failed to create the message "
0645:                            + "from the database : " + ex.getMessage(), ex);
0646:                    throw new MessageServiceException(
0647:                            "Failed to create the message "
0648:                                    + "in the database : " + ex.getMessage(),
0649:                            ex);
0650:                }
0651:            }
0652:
0653:            /**
0654:             * This method returns the id of this messsage.
0655:             *
0656:             * @return The id of the message this object is managing.
0657:             */
0658:            public String getID() {
0659:                return id;
0660:            }
0661:
0662:            /**
0663:             * This method returns the message object.
0664:             *
0665:             * @return The message object.
0666:             * @exception MessageServiceException
0667:             */
0668:            public Message getMessage() throws MessageServiceException {
0669:                return masterMessageImpl;
0670:            }
0671:
0672:            /**
0673:             * This method returns the message object.
0674:             *
0675:             * @return The message object.
0676:             * @exception MessageServiceException
0677:             */
0678:            public void assignToQueue(String queueName)
0679:                    throws MessageServiceException {
0680:                try {
0681:                    TransactionManager.getInstance().bindResource(this , true);
0682:                    ChangeLog.getInstance().addChange(
0683:                            new AssignMessageToQueueChange(this .id, queueName));
0684:                    this .messageQueueName = queueName;
0685:                } catch (Exception ex) {
0686:                    log.error(
0687:                            "Failed to assign this object to a queue because : "
0688:                                    + ex.getMessage(), ex);
0689:                    throw new MessageServiceException(
0690:                            "Failed to assign this object to a queue because : "
0691:                                    + ex.getMessage(), ex);
0692:                }
0693:            }
0694:
0695:            /**
0696:             * This method updates the message object.
0697:             *
0698:             * @param updatedMessage The updated message object.
0699:             * @exception MessageServiceException
0700:             */
0701:            public void updateMessage(Message updatedMessage)
0702:                    throws MessageServiceException {
0703:                try {
0704:                    TransactionManager.getInstance().bindResource(this , true);
0705:                    ChangeLog.getInstance().addChange(
0706:                            new UpdateMessageChange(
0707:                                    (MessageImpl) updatedMessage));
0708:                    nextProcessTime = ((MessageImpl) updatedMessage)
0709:                            .getNextProcessDate();
0710:                    masterMessageImpl = (MessageImpl) updatedMessage;
0711:                } catch (MessageServiceException ex) {
0712:                    throw ex;
0713:                } catch (Exception ex) {
0714:                    log.error("Failed to update the message because : "
0715:                            + ex.getMessage(), ex);
0716:                    throw new MessageServiceException(
0717:                            "Failed to update the message because : "
0718:                                    + ex.getMessage(), ex);
0719:                }
0720:            }
0721:
0722:            /**
0723:             * This method is responsible from removing this message from the db.
0724:             *
0725:             * @exception MessageServiceException
0726:             */
0727:            public void remove() throws MessageServiceException {
0728:                try {
0729:                    TransactionManager.getInstance().bindResource(this , true);
0730:                    ChangeLog.getInstance().addChange(
0731:                            new RemoveMessageChange(this .id));
0732:                } catch (Exception ex) {
0733:                    log.error("Failed to failed to remove the message : "
0734:                            + ex.getMessage(), ex);
0735:                    throw new MessageServiceException(
0736:                            "Failed to failed to remove the message : "
0737:                                    + ex.getMessage(), ex);
0738:                }
0739:            }
0740:
0741:            /**
0742:             * This method returns the next process time for this message.
0743:             *
0744:             * @return The date message.
0745:             * @exception MessageServiceException
0746:             */
0747:            public Date nextProcessTime() {
0748:                return nextProcessTime;
0749:            }
0750:
0751:            /**
0752:             * This message returns the priority.
0753:             */
0754:            public int getPriority() {
0755:                return this .masterMessageImpl.getPriority();
0756:            }
0757:
0758:            /**
0759:             * This method returns the name of the messaqe queue to which this message
0760:             * is assigned.
0761:             *
0762:             * @return The name of the message queue that this message is assigned to.
0763:             */
0764:            public String getMessageQueueName() {
0765:                return this .messageQueueName;
0766:            }
0767:
0768:            /**
0769:             * This method is called to commit the specified transaction.
0770:             *
0771:             * @param xid The id of the transaction to commit.
0772:             * @param onePhase If true a one phase commit should be used.
0773:             * @exception XAException
0774:             */
0775:            public synchronized void commit(Xid xid, boolean onePhase)
0776:                    throws XAException {
0777:                if (nextProcessTime != null) {
0778:                    this .originalNextProcessTime = nextProcessTime;
0779:                }
0780:                if (this .messageQueueName != null) {
0781:                    this .originalMessageQueueName = messageQueueName;
0782:                }
0783:                if (this .masterMessageImpl != null) {
0784:                    this .originalMessageImpl = masterMessageImpl;
0785:                }
0786:            }
0787:
0788:            /**
0789:             * The resource manager has dissociated this object from the transaction.
0790:             *
0791:             * @param xid The id of the transaction that is getting ended.
0792:             * @param flags The flags associated with this operation.
0793:             * @exception XAException
0794:             */
0795:            public void end(Xid xid, int flags) throws XAException {
0796:            }
0797:
0798:            /**
0799:             * The transaction has been completed and must be forgotten.
0800:             *
0801:             * @param xid The id of the transaction to forget.
0802:             * @exception XAException
0803:             */
0804:            public void forget(Xid xid) throws XAException {
0805:                if (nextProcessTime != null) {
0806:                    this .originalNextProcessTime = nextProcessTime;
0807:                }
0808:                if (this .messageQueueName != null) {
0809:                    this .originalMessageQueueName = messageQueueName;
0810:                }
0811:                if (this .masterMessageImpl != null) {
0812:                    this .originalMessageImpl = masterMessageImpl;
0813:                }
0814:            }
0815:
0816:            /**
0817:             * This method returns the transaction timeout for this object.
0818:             *
0819:             * @return The int containing the transaction timeout.
0820:             * @exception XAException
0821:             */
0822:            public int getTransactionTimeout() throws XAException {
0823:                return -1;
0824:            }
0825:
0826:            /**
0827:             * This method returns true if this object is the resource manager getting
0828:             * queried.
0829:             *
0830:             * @return TRUE if this is the resource manager, FALSE if not.
0831:             * @param xaResource The resource to perform the check against.
0832:             * @exception XAException
0833:             */
0834:            public boolean isSameRM(XAResource xAResource) throws XAException {
0835:                return this  == xAResource;
0836:            }
0837:
0838:            /**
0839:             * This is called before a transaction is committed.
0840:             *
0841:             * @return The results of the transaction.
0842:             * @param xid The id of the transaction to check against.
0843:             * @exception XAException
0844:             */
0845:            public int prepare(Xid xid) throws XAException {
0846:                return XAResource.XA_OK;
0847:            }
0848:
0849:            /**
0850:             * This method returns the list of transaction branches for this resource
0851:             * manager.
0852:             *
0853:             * @return The list of resource branches.
0854:             * @param flags The flags
0855:             * @exception XAException
0856:             */
0857:            public Xid[] recover(int flags) throws XAException {
0858:                return null;
0859:            }
0860:
0861:            /**
0862:             * This method is called to roll back the specified transaction.
0863:             *
0864:             * @param xid The id of the transaction to roll back.
0865:             * @exception XAException
0866:             */
0867:            public void rollback(Xid xid) throws XAException {
0868:                nextProcessTime = originalNextProcessTime;
0869:                messageQueueName = originalMessageQueueName;
0870:                masterMessageImpl = originalMessageImpl;
0871:            }
0872:
0873:            /**
0874:             * This method sets the transaction timeout for this resource manager.
0875:             *
0876:             * @return TRUE if the transaction timeout can be set successfully.
0877:             * @param transactionTimeout The new transaction timeout value.
0878:             * @exception XAException
0879:             */
0880:            public boolean setTransactionTimeout(int transactionTimeout)
0881:                    throws XAException {
0882:                return true;
0883:            }
0884:
0885:            /**
0886:             * This method is called to start a transaction on a resource manager.
0887:             *
0888:             * @param xid The id of the new transaction.
0889:             * @param flags The flags associated with the transaction.
0890:             * @exception XAException
0891:             */
0892:            public void start(Xid xid, int flags) throws XAException {
0893:
0894:            }
0895:
0896:            /**
0897:             * The compare to interface used to order this object in the queues.
0898:             *
0899:             * @return -1,0,1 depending on the order of the object.
0900:             * @param o The object to perform the comparison on.
0901:             */
0902:            public int compareTo(Object o) {
0903:                MessageManagerImpl msg = (MessageManagerImpl) o;
0904:                if (msg.nextProcessTime().getTime() > nextProcessTime()
0905:                        .getTime()) {
0906:                    return -1;
0907:                } else if (nextProcessTime().getTime() > msg.nextProcessTime()
0908:                        .getTime()) {
0909:                    return 1;
0910:                } else if (msg.getPriority() > getPriority()) {
0911:                    return -1;
0912:                } else if (getPriority() > msg.getPriority()) {
0913:                    return 1;
0914:                }
0915:                return 0;
0916:            }
0917:
0918:            /**
0919:             * This method returns the message object.
0920:             *
0921:             * @return The message object.
0922:             * @exception MessageServiceException
0923:             */
0924:            private MessageImpl loadMessage() throws MessageServiceException {
0925:                try {
0926:                    //MessageTransactionLock.getInstance().lock();
0927:                    Session session = HibernateUtil.getInstance(
0928:                            MessageServiceManager.class).getSession();
0929:
0930:                    com.rift.coad.daemon.messageservice.db.Message message = (com.rift.coad.daemon.messageservice.db.Message) session
0931:                            .get(
0932:                                    com.rift.coad.daemon.messageservice.db.Message.class,
0933:                                    id);
0934:
0935:                    MessageImpl result = null;
0936:                    if (message.getMessageType() == MessageManagerImpl.RPC_MESSAGE) {
0937:                        RPCMessageImpl rpcMessage = new RPCMessageImpl(message
0938:                                .getId(), new Date(message.getCreated()
0939:                                .getTime()), message.getRetries(), new Date(
0940:                                message.getProcessed().getTime()), message
0941:                                .getMessageCreator(), message.getSessionId(),
0942:                                null, message.getFromUrl(), message
0943:                                        .getMessageRoutingType(), message
0944:                                        .getMessageState());
0945:                        MessageRpcBody rpcBody = (MessageRpcBody) session.get(
0946:                                MessageRpcBody.class, message.getId());
0947:                        rpcMessage.setMethodBodyXML(rpcBody.getXml());
0948:                        if (rpcBody.getExceptionValue() != null) {
0949:                            rpcMessage.setThrowableBytes(rpcBody
0950:                                    .getExceptionValue().clone());
0951:                        }
0952:                        if (rpcBody.getResultValue() != null) {
0953:                            rpcMessage.setResultBytes(rpcBody.getResultValue()
0954:                                    .clone());
0955:                        }
0956:                        result = rpcMessage;
0957:                    } else {
0958:
0959:                        TextMessageImpl txtMessage = new TextMessageImpl(
0960:                                message.getId(), new Date(message.getCreated()
0961:                                        .getTime()), message.getRetries(),
0962:                                new Date(message.getProcessed().getTime()),
0963:                                message.getMessageCreator(), message
0964:                                        .getSessionId(), null, message
0965:                                        .getFromUrl(), message
0966:                                        .getMessageRoutingType(), message
0967:                                        .getMessageState());
0968:                        MessageTxtBody txtBody = (MessageTxtBody) session.get(
0969:                                MessageTxtBody.class, message.getId());
0970:                        txtMessage.setTextBody(txtBody.getBody());
0971:                        result = txtMessage;
0972:                    }
0973:
0974:                    // set from
0975:                    result.setFrom(message.getFromUrl());
0976:                    result.setPriority(message.getPriority());
0977:                    result.setNextProcessDate(new Date(message.getNextProcess()
0978:                            .getTime()));
0979:                    result.setProcessedDate(new Date(message.getProcessed()
0980:                            .getTime()));
0981:
0982:                    // set the target
0983:                    if (message.getTarget() != null) {
0984:                        result.setTarget(message.getTarget());
0985:                    }
0986:                    // set the reply flag
0987:                    if (message.getReply() == 1) {
0988:                        result.setReply(true);
0989:                    } else {
0990:                        result.setReply(false);
0991:                    }
0992:                    // set the reply to address
0993:                    if (message.getReplyUrl() != null) {
0994:                        result.setReplyTo(message.getReplyUrl());
0995:                    }
0996:                    // set the named queue
0997:                    if (message.getTargetNamedQueue() != null) {
0998:                        result.setTargetNamedQueue(message
0999:                                .getTargetNamedQueue());
1000:                    }
1001:                    if (message.getReplyNamedQueue() != null) {
1002:                        result.setReplyNamedQueue(message.getReplyNamedQueue());
1003:                    }
1004:                    // the correlation id
1005:                    if (message.getCorrelationId() != null) {
1006:                        result.setCorrelationId(message.getCorrelationId());
1007:                    }
1008:
1009:                    // set the services
1010:                    List dbServices = session.createQuery(
1011:                            "FROM MessageService as service WHERE "
1012:                                    + "service.message.id = ?").setString(0,
1013:                            message.getId()).list();
1014:                    String[] services = new String[dbServices.size()];
1015:                    int index = 0;
1016:                    for (Iterator iter = dbServices.iterator(); iter.hasNext(); index++) {
1017:                        services[index] = ((MessageService) iter.next())
1018:                                .getService();
1019:                    }
1020:                    result.setServices(services);
1021:
1022:                    // set properties
1023:                    List dbProperties = session.createQuery(
1024:                            "FROM MessageProperty as property WHERE "
1025:                                    + "property.message.id = ?").setString(0,
1026:                            message.getId()).list();
1027:                    for (Iterator iter = dbProperties.iterator(); iter
1028:                            .hasNext();) {
1029:                        MessageProperty property = (MessageProperty) iter
1030:                                .next();
1031:                        if (property.getBoolValue() != null) {
1032:                            result.setBooleanProperty(property.getName(),
1033:                                    ((Integer) property.getBoolValue())
1034:                                            .intValue() == 1 ? true : false);
1035:                        } else if (property.getByteValue() != null) {
1036:                            result.setByteProperty(property.getName(),
1037:                                    ((Integer) property.getByteValue())
1038:                                            .byteValue());
1039:                        } else if (property.getIntValue() != null) {
1040:                            result.setPropertyValue(property.getName(),
1041:                                    property.getIntValue());
1042:                        } else if (property.getLongValue() != null) {
1043:                            result.setPropertyValue(property.getName(),
1044:                                    property.getLongValue());
1045:                        } else if (property.getDoubleValue() != null) {
1046:                            result.setPropertyValue(property.getName(),
1047:                                    property.getDoubleValue());
1048:                        } else if (property.getFloatValue() != null) {
1049:                            result.setPropertyValue(property.getName(),
1050:                                    property.getFloatValue());
1051:                        } else if (property.getObjectValue() != null) {
1052:                            result.setPropertyValue(property.getName(),
1053:                                    property.getObjectValue());
1054:                        } else if (property.getStringValue() != null) {
1055:                            result.setPropertyValue(property.getName(),
1056:                                    property.getStringValue());
1057:                        }
1058:
1059:                    }
1060:
1061:                    List principals = new ArrayList();
1062:                    List dbPrincipals = session.createQuery(
1063:                            "FROM MessagePrincipal as principal WHERE "
1064:                                    + "principal.message.id = ?").setString(0,
1065:                            message.getId()).list();
1066:                    for (Iterator iter = dbPrincipals.iterator(); iter
1067:                            .hasNext();) {
1068:                        MessagePrincipal principal = (MessagePrincipal) iter
1069:                                .next();
1070:                        principals.add(principal.getPrincipalValue());
1071:                    }
1072:                    result.setMessagePrincipals(principals);
1073:
1074:                    List dbErrors = session.createQuery(
1075:                            "FROM MessageError as error WHERE "
1076:                                    + "error.message.id = ?").setString(0,
1077:                            message.getId()).list();
1078:                    for (Iterator iter = dbErrors.iterator(); iter.hasNext();) {
1079:                        com.rift.coad.daemon.messageservice.db.MessageError dbMessageError = (com.rift.coad.daemon.messageservice.db.MessageError) iter
1080:                                .next();
1081:                        com.rift.coad.daemon.messageservice.MessageError messageError = new com.rift.coad.daemon.messageservice.MessageError(
1082:                                new Date(dbMessageError.getErrorDate()
1083:                                        .getTime()), dbMessageError
1084:                                        .getErrorLevel(), dbMessageError
1085:                                        .getMsg());
1086:                        ((MessageImpl) result).addError(messageError);
1087:                    }
1088:
1089:                    if (message.getMessageQueue() != null) {
1090:                        originalMessageQueueName = messageQueueName = message
1091:                                .getMessageQueue().getMessageQueueName();
1092:                    }
1093:                    if (message.getNextProcess() != null) {
1094:                        this .nextProcessTime = this .originalNextProcessTime = new Date(
1095:                                message.getNextProcess().getTime());
1096:                    }
1097:
1098:                    return result;
1099:                } catch (Exception ex) {
1100:                    log.error("Failed to load the message because : "
1101:                            + ex.getMessage(), ex);
1102:                    throw new MessageServiceException(
1103:                            "Failed to load the message because : "
1104:                                    + ex.getMessage(), ex);
1105:                }
1106:            }
1107:
1108:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.