0001: /*
0002: * MessageService: The message service daemon
0003: * Copyright (C) 2006 Rift IT Contracting
0004: *
0005: * This library is free software; you can redistribute it and/or
0006: * modify it under the terms of the GNU Lesser General Public
0007: * License as published by the Free Software Foundation; either
0008: * version 2.1 of the License, or (at your option) any later version.
0009: *
0010: * This library is distributed in the hope that it will be useful,
0011: * but WITHOUT ANY WARRANTY; without even the implied warranty of
0012: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
0013: * Lesser General Public License for more details.
0014: *
0015: * You should have received a copy of the GNU Lesser General Public
0016: * License along with this library; if not, write to the Free Software
0017: * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
0018: *
0019: * MessageManagerImpl.java
0020: */
0021:
0022: // the package path
0023: package com.rift.coad.daemon.messageservice.message;
0024:
0025: // java imports
0026: import java.sql.Timestamp;
0027: import java.util.ArrayList;
0028: import java.util.Date;
0029: import java.util.Enumeration;
0030: import java.util.HashSet;
0031: import java.util.Iterator;
0032: import java.util.List;
0033: import java.util.Set;
0034: import javax.transaction.xa.XAException;
0035: import javax.transaction.xa.XAResource;
0036: import javax.transaction.xa.Xid;
0037:
0038: // hibernate imports
0039: import org.hibernate.*;
0040: import org.hibernate.cfg.*;
0041:
0042: // logging import
0043: import org.apache.log4j.Logger;
0044:
0045: // coadunation imports
0046: import com.rift.coad.daemon.messageservice.Message;
0047: import com.rift.coad.daemon.messageservice.MessageServiceManager;
0048: import com.rift.coad.daemon.messageservice.RPCMessage;
0049: import com.rift.coad.daemon.messageservice.TextMessage;
0050: import com.rift.coad.daemon.messageservice.MessageManager;
0051: import com.rift.coad.daemon.messageservice.MessageServiceException;
0052: import com.rift.coad.daemon.messageservice.InvalidProperty;
0053: import com.rift.coad.daemon.messageservice.db.*;
0054: import com.rift.coad.daemon.messageservice.named.NamedQueueManagerImpl;
0055: import com.rift.coad.hibernate.util.HibernateUtil;
0056: import com.rift.coad.lib.common.ObjectSerializer;
0057: import com.rift.coad.util.transaction.TransactionManager;
0058: import com.rift.coad.util.change.Change;
0059: import com.rift.coad.util.change.ChangeException;
0060: import com.rift.coad.util.change.ChangeLog;
0061:
0062: /**
0063: * This object implements the message manager interface.
0064: *
0065: * @author Brett Chaldecott
0066: */
0067: public class MessageManagerImpl implements MessageManager {
0068:
0069: /**
0070: * This object represents an add message change
0071: */
0072: public static class AddMessageChange implements Change {
0073: // private member variables
0074: private MessageImpl newMessage = null;
0075:
0076: /**
0077: * The constructor of the add message change object.
0078: *
0079: * @param message The message that represents this change.
0080: * @exception MessageServiceException
0081: */
0082: public AddMessageChange(MessageImpl newMessage)
0083: throws MessageServiceException {
0084: try {
0085: this .newMessage = (MessageImpl) newMessage.clone();
0086: } catch (Exception ex) {
0087: log.error(
0088: "Failed to create a new add message change : "
0089: + ex.getMessage(), ex);
0090: throw new MessageServiceException(
0091: "Failed to create a new add message change : "
0092: + ex.getMessage(), ex);
0093: }
0094: }
0095:
0096: /**
0097: * The definition of the apply method.
0098: */
0099: public void applyChanges() throws ChangeException {
0100: try {
0101: //MessageTransactionLock.getInstance().lock();
0102: Session session = HibernateUtil.getInstance(
0103: MessageServiceManager.class).getSession();
0104:
0105: // set the reply flag
0106: int reply = 0;
0107: if (newMessage.getReply()) {
0108: reply = 1;
0109: }
0110: int acknowledged = 0;
0111: if (newMessage.isAcknowledged()) {
0112: acknowledged = 1;
0113: }
0114:
0115: // check the type of messge
0116: int messageType = TEXT_MESSAGE;
0117: if (newMessage instanceof RPCMessage) {
0118: messageType = RPC_MESSAGE;
0119: }
0120:
0121: // instanciate the basic message
0122: com.rift.coad.daemon.messageservice.db.Message message = new com.rift.coad.daemon.messageservice.db.Message(
0123: newMessage.getMessageId(), newMessage
0124: .getMessageCreater(), newMessage
0125: .getSessionId(), messageType,
0126: newMessage.getMessageType(), newMessage
0127: .getPriority(), reply, newMessage
0128: .getFrom(), acknowledged, 0, newMessage
0129: .getRetries());
0130: if ((newMessage.getTarget() != null)
0131: && (newMessage.getTarget().length() != 0)) {
0132: message.setTarget(newMessage.getTarget());
0133: }
0134: if (newMessage.getFrom() == null) {
0135: throw new InvalidProperty(
0136: "The from address must be set");
0137: }
0138: message.setFromUrl(newMessage.getFrom());
0139: if ((newMessage.getReplyTo() != null)
0140: && (newMessage.getReplyTo().length() != 0)) {
0141: message.setReplyUrl(newMessage.getReplyTo());
0142: }
0143: if ((newMessage.getTargetNamedQueue() != null)
0144: && (newMessage.getTargetNamedQueue().length() != 0)) {
0145: message.setTargetNamedQueue(newMessage
0146: .getTargetNamedQueue());
0147: }
0148: if ((newMessage.getReplyNamedQueue() != null)
0149: && (newMessage.getReplyNamedQueue().length() != 0)) {
0150: message.setReplyNamedQueue(newMessage
0151: .getReplyNamedQueue());
0152: }
0153: if ((newMessage.getCorrelationId() != null)
0154: && (newMessage.getCorrelationId().length() != 0)) {
0155: message.setCorrelationId(newMessage
0156: .getCorrelationId());
0157: }
0158: message.setCreated(new Timestamp(newMessage
0159: .getCreated().getTime()));
0160: message.setProcessed(new Timestamp(newMessage
0161: .getProcessedDate().getTime()));
0162: message.setNextProcess(new Timestamp(
0163: ((MessageImpl) newMessage).getNextProcessDate()
0164: .getTime()));
0165: message.setMessageState(newMessage.getState());
0166: session.persist(message);
0167:
0168: if (newMessage instanceof RPCMessage) {
0169: RPCMessage rpcMessage = (RPCMessage) newMessage;
0170: MessageRpcBody rpcBody = new MessageRpcBody();
0171: rpcBody.setMessage(message);
0172: rpcBody.setMessageId(message.getId());
0173: rpcBody.setXml(rpcMessage.getMethodBodyXML());
0174: if (rpcMessage.generatedException()) {
0175: rpcBody
0176: .setExceptionValue(((RPCMessageImpl) rpcMessage)
0177: .getThrowableBytes().clone());
0178: }
0179: if (rpcMessage.getResult() != null) {
0180: rpcBody
0181: .setResultValue(((RPCMessageImpl) rpcMessage)
0182: .getResultBytes().clone());
0183: }
0184: session.persist(rpcBody);
0185: } else if (newMessage instanceof TextMessage) {
0186: TextMessage textMessage = (TextMessage) newMessage;
0187: MessageTxtBody txtBody = new MessageTxtBody();
0188: txtBody.setMessage(message);
0189: txtBody.setMessageId(message.getId());
0190: txtBody.setBody(textMessage.getTextBody());
0191: session.persist(txtBody);
0192: } else {
0193: log.error("The message type ["
0194: + newMessage.getClass().getName()
0195: + "] is not recognised.");
0196: throw new MessageServiceException(
0197: "The message type ["
0198: + newMessage.getClass().getName()
0199: + "] is not recognised.");
0200: }
0201:
0202: // the message information
0203: if (newMessage.getServices() != null) {
0204: String[] services = newMessage.getServices();
0205: for (int index = 0; index < services.length; index++) {
0206: MessageService messageServices = new MessageService();
0207: messageServices.setMessage(message);
0208: messageServices.setService(services[index]);
0209: session.persist(messageServices);
0210: }
0211: }
0212:
0213: // the message properties
0214: for (Enumeration enumerat = newMessage
0215: .getPropertyNames(); enumerat.hasMoreElements();) {
0216: String key = (String) enumerat.nextElement();
0217: Object value = newMessage.getPropertyValue(key);
0218: MessageProperty property = new MessageProperty();
0219: property.setMessage(message);
0220: property.setName(key);
0221: if (value instanceof Boolean) {
0222: if (((Boolean) value).booleanValue()) {
0223: property.setBoolValue(new Integer(1));
0224: } else {
0225: property.setBoolValue(new Integer(0));
0226: }
0227: } else if (value instanceof Byte) {
0228: property.setByteValue(new Integer(
0229: ((Byte) value).intValue()));
0230: } else if (value instanceof Integer) {
0231: property.setIntValue((Integer) value);
0232: } else if (value instanceof Long) {
0233: property.setLongValue((Long) value);
0234: } else if (value instanceof Double) {
0235: property.setDoubleValue((Double) value);
0236: } else if (value instanceof Float) {
0237: property.setFloatValue((Float) value);
0238: } else if (value instanceof String) {
0239: property.setStringValue((String) value);
0240: } else if (value instanceof byte[]) {
0241: property.setObjectValue((byte[]) value);
0242: }
0243: session.persist(property);
0244: }
0245:
0246: // the message information
0247: if (newMessage.getMessagePrincipals() == null) {
0248: throw new InvalidProperty("Must supply principals");
0249: }
0250: List principals = newMessage.getMessagePrincipals();
0251: for (Iterator iter = principals.iterator(); iter
0252: .hasNext();) {
0253: MessagePrincipal principal = new MessagePrincipal();
0254: principal.setMessage(message);
0255: String principalStr = (String) iter.next();
0256: //log.info("Add principal : " + principalStr);
0257: principal.setPrincipalValue(principalStr);
0258: session.persist(principal);
0259: }
0260: List errors = newMessage.getErrors();
0261: for (Iterator iter = errors.iterator(); iter.hasNext();) {
0262: com.rift.coad.daemon.messageservice.MessageError messageError = (com.rift.coad.daemon.messageservice.MessageError) iter
0263: .next();
0264: com.rift.coad.daemon.messageservice.db.MessageError dbMessageError = new com.rift.coad.daemon.messageservice.db.MessageError();
0265: dbMessageError.setMessage(message);
0266: dbMessageError.setErrorDate(new java.sql.Timestamp(
0267: messageError.getErrorDate().getTime()));
0268: dbMessageError.setErrorLevel(messageError
0269: .getLevel());
0270: dbMessageError.setMsg(messageError.getMSG());
0271: session.persist(dbMessageError);
0272: }
0273:
0274: } catch (Exception ex) {
0275: log.error("Failed to apply the changes : "
0276: + ex.getMessage(), ex);
0277: throw new ChangeException(
0278: "Failed to apply the changes : "
0279: + ex.getMessage(), ex);
0280: }
0281:
0282: }
0283: }
0284:
0285: /**
0286: * This object represents queue assignment
0287: */
0288: public static class AssignMessageToQueueChange implements Change {
0289: // private member variables
0290: private String messageId = null;
0291: private String queueName = null;
0292:
0293: /**
0294: * The constructor of the assign message to queue object.
0295: */
0296: public AssignMessageToQueueChange(String messageId,
0297: String queueName) {
0298: this .messageId = new String(messageId);
0299: this .queueName = new String(queueName);
0300: }
0301:
0302: /**
0303: * The definition of the apply method.
0304: */
0305: public void applyChanges() throws ChangeException {
0306: try {
0307: //MessageTransactionLock.getInstance().lock();
0308: Session session = HibernateUtil.getInstance(
0309: MessageServiceManager.class).getSession();
0310: List entries = session
0311: .createQuery(
0312: "FROM MessageQueue as queue WHERE queue.messageQueueName = ?")
0313: .setString(0, queueName).list();
0314:
0315: if (entries.size() != 1) {
0316: log.error("There is no queue by the name of : "
0317: + queueName);
0318: throw new MessageServiceException(
0319: "There is no queue by the name of : "
0320: + queueName);
0321: }
0322: com.rift.coad.daemon.messageservice.db.MessageQueue messageQueue = (com.rift.coad.daemon.messageservice.db.MessageQueue) entries
0323: .get(0);
0324: com.rift.coad.daemon.messageservice.db.Message message = (com.rift.coad.daemon.messageservice.db.Message) session
0325: .get(
0326: com.rift.coad.daemon.messageservice.db.Message.class,
0327: messageId);
0328: message.setMessageQueue(messageQueue);
0329: } catch (Exception ex) {
0330: log.error("Failed to apply the changes : "
0331: + ex.getMessage(), ex);
0332: throw new ChangeException(
0333: "Failed to apply the changes : "
0334: + ex.getMessage(), ex);
0335: }
0336: }
0337: }
0338:
0339: /**
0340: * This object represents an update message change
0341: */
0342: public static class UpdateMessageChange implements Change {
0343: // private member variables
0344: private MessageImpl updatedMessage = null;
0345:
0346: /**
0347: * The constructor of the update message change object.
0348: *
0349: * @param updatedMessage The updated message object.
0350: * @exception MessageServiceException
0351: */
0352: public UpdateMessageChange(MessageImpl updatedMessage)
0353: throws MessageServiceException {
0354: try {
0355: this .updatedMessage = (MessageImpl) updatedMessage
0356: .clone();
0357: } catch (Exception ex) {
0358: log.error("Failed to clone the updated message : "
0359: + ex.getMessage(), ex);
0360: throw new MessageServiceException(
0361: "Failed to clone the updated message : "
0362: + ex.getMessage(), ex);
0363: }
0364: }
0365:
0366: /**
0367: * The definition of the apply method.
0368: */
0369: public void applyChanges() throws ChangeException {
0370: try {
0371: //MessageTransactionLock.getInstance().lock();
0372: Session session = HibernateUtil.getInstance(
0373: MessageServiceManager.class).getSession();
0374:
0375: com.rift.coad.daemon.messageservice.db.Message message = (com.rift.coad.daemon.messageservice.db.Message) session
0376: .get(
0377: com.rift.coad.daemon.messageservice.db.Message.class,
0378: updatedMessage.getMessageId());
0379:
0380: if (!(updatedMessage instanceof MessageImpl)) {
0381: throw new MessageServiceException(
0382: "The incorrect message object has been passed "
0383: + "into update");
0384: }
0385: MessageImpl messageImpl = (MessageImpl) updatedMessage;
0386: if ((updatedMessage.getTarget() != null)
0387: && (updatedMessage.getTarget().length() != 0)) {
0388: message.setTarget(updatedMessage.getTarget());
0389: }
0390: if (updatedMessage.getFrom() == null) {
0391: throw new InvalidProperty(
0392: "The from address must be set");
0393: }
0394: message.setFromUrl(updatedMessage.getFrom());
0395: if ((updatedMessage.getReplyTo() != null)
0396: && (updatedMessage.getReplyTo().length() != 0)) {
0397: message.setReplyUrl(updatedMessage.getReplyTo());
0398: }
0399: if ((updatedMessage.getTargetNamedQueue() != null)
0400: && (updatedMessage.getTargetNamedQueue()
0401: .length() != 0)) {
0402: message.setTargetNamedQueue(updatedMessage
0403: .getTargetNamedQueue());
0404: }
0405: if ((updatedMessage.getReplyNamedQueue() != null)
0406: && (updatedMessage.getReplyNamedQueue()
0407: .length() != 0)) {
0408: message.setReplyNamedQueue(updatedMessage
0409: .getReplyNamedQueue());
0410: }
0411: if ((updatedMessage.getCorrelationId() != null)
0412: && (updatedMessage.getCorrelationId().length() != 0)) {
0413: message.setCorrelationId(updatedMessage
0414: .getCorrelationId());
0415: }
0416: message.setCreated(new Timestamp(updatedMessage
0417: .getCreated().getTime()));
0418: message.setProcessed(new Timestamp(updatedMessage
0419: .getProcessedDate().getTime()));
0420: message.setNextProcess(new Timestamp(
0421: ((MessageImpl) updatedMessage)
0422: .getNextProcessDate().getTime()));
0423: message.setMessageState(updatedMessage.getState());
0424: message.setRetries(updatedMessage.getRetries());
0425: message.setMessageRoutingType(updatedMessage
0426: .getMessageType());
0427: message.setPriority(updatedMessage.getPriority());
0428: if (messageImpl.isAcknowledged()) {
0429: message.setAcknowledged(1);
0430: } else {
0431: message.setAcknowledged(0);
0432: }
0433:
0434: // the message properties
0435: session.createQuery(
0436: "DELETE FROM MessageProperty as property WHERE "
0437: + "property.message.id = ?").setString(
0438: 0, updatedMessage.getMessageId())
0439: .executeUpdate();
0440: //message.getMessageProperties().clear();
0441: for (Enumeration enumerat = updatedMessage
0442: .getPropertyNames(); enumerat.hasMoreElements();) {
0443: String key = (String) enumerat.nextElement();
0444: Object value = updatedMessage.getPropertyValue(key);
0445: MessageProperty property = new MessageProperty();
0446: property.setMessage(message);
0447: property.setName(key);
0448: if (value instanceof Boolean) {
0449: if (((Boolean) value).booleanValue()) {
0450: property.setBoolValue(new Integer(1));
0451: } else {
0452: property.setBoolValue(new Integer(0));
0453: }
0454: } else if (value instanceof Byte) {
0455: property.setByteValue(new Integer(
0456: ((Byte) value).intValue()));
0457: } else if (value instanceof Integer) {
0458: property.setIntValue((Integer) value);
0459: } else if (value instanceof Long) {
0460: property.setLongValue((Long) value);
0461: } else if (value instanceof Double) {
0462: property.setDoubleValue((Double) value);
0463: } else if (value instanceof Float) {
0464: property.setFloatValue((Float) value);
0465: } else if (value instanceof String) {
0466: property.setStringValue((String) value);
0467: } else if (value instanceof byte[]) {
0468: property.setObjectValue((byte[]) value);
0469: }
0470: session.persist(property);
0471: }
0472:
0473: // the message properties
0474: session.createQuery(
0475: "DELETE FROM MessageError as error WHERE "
0476: + "error.message.id = ?").setString(0,
0477: updatedMessage.getMessageId()).executeUpdate();
0478: List errors = updatedMessage.getErrors();
0479: for (Iterator iter = errors.iterator(); iter.hasNext();) {
0480: com.rift.coad.daemon.messageservice.MessageError messageError = (com.rift.coad.daemon.messageservice.MessageError) iter
0481: .next();
0482: com.rift.coad.daemon.messageservice.db.MessageError dbMessageError = new com.rift.coad.daemon.messageservice.db.MessageError();
0483: dbMessageError.setMessage(message);
0484: dbMessageError.setErrorDate(new java.sql.Timestamp(
0485: messageError.getErrorDate().getTime()));
0486: dbMessageError.setErrorLevel(messageError
0487: .getLevel());
0488: dbMessageError.setMsg(messageError.getMSG());
0489: session.persist(dbMessageError);
0490: }
0491:
0492: if (updatedMessage instanceof RPCMessage) {
0493: RPCMessage rpcMessage = (RPCMessage) updatedMessage;
0494: MessageRpcBody rpcBody = (MessageRpcBody) session
0495: .get(MessageRpcBody.class, message.getId());
0496: if (rpcMessage.generatedException()) {
0497: rpcBody
0498: .setExceptionValue(((RPCMessageImpl) rpcMessage)
0499: .getThrowableBytes().clone());
0500: }
0501: if (((RPCMessageImpl) rpcMessage).getResultBytes() != null) {
0502: rpcBody
0503: .setResultValue(((RPCMessageImpl) rpcMessage)
0504: .getResultBytes().clone());
0505: }
0506: } else if (updatedMessage instanceof TextMessage) {
0507: TextMessage textMessage = (TextMessage) updatedMessage;
0508: MessageTxtBody txtBody = (MessageTxtBody) session
0509: .get(MessageTxtBody.class, message.getId());
0510: txtBody.setBody(textMessage.getTextBody());
0511: } else {
0512: log.error("The message type ["
0513: + updatedMessage.getClass().getName()
0514: + "] is not recognised.");
0515: throw new ChangeException("The message type ["
0516: + updatedMessage.getClass().getName()
0517: + "] is not recognised.");
0518: }
0519:
0520: } catch (ChangeException ex) {
0521: throw ex;
0522: } catch (Exception ex) {
0523: log.error("Failed to update the message because : "
0524: + ex.getMessage(), ex);
0525: throw new ChangeException(
0526: "Failed to update the message because : "
0527: + ex.getMessage(), ex);
0528: }
0529: }
0530: }
0531:
0532: /**
0533: * This object represents a remove message change
0534: */
0535: public static class RemoveMessageChange implements Change {
0536: // private member variables
0537: private String messageId = null;
0538:
0539: /**
0540: * The constructor of the remove message change object.
0541: *
0542: * @param messageId The message id for this object.
0543: */
0544: public RemoveMessageChange(String messageId) {
0545: this .messageId = new String(messageId);
0546: }
0547:
0548: /**
0549: * The definition of the apply method.
0550: */
0551: public void applyChanges() throws ChangeException {
0552: try {
0553: //MessageTransactionLock.getInstance().lock();
0554: Session session = HibernateUtil.getInstance(
0555: MessageServiceManager.class).getSession();
0556: session.createQuery(
0557: "DELETE FROM MessageRpcBody as body WHERE "
0558: + "body.message.id = ?").setString(0,
0559: messageId).executeUpdate();
0560: session.createQuery(
0561: "DELETE FROM MessageTxtBody as body WHERE "
0562: + "body.message.id = ?").setString(0,
0563: messageId).executeUpdate();
0564: session.createQuery(
0565: "DELETE FROM MessageService as service WHERE "
0566: + "service.message.id = ?").setString(
0567: 0, messageId).executeUpdate();
0568: session.createQuery(
0569: "DELETE FROM MessageProperty as property WHERE "
0570: + "property.message.id = ?").setString(
0571: 0, messageId).executeUpdate();
0572: session.createQuery(
0573: "DELETE FROM MessagePrincipal as principal WHERE "
0574: + "principal.message.id = ?")
0575: .setString(0, messageId).executeUpdate();
0576: session.createQuery(
0577: "DELETE FROM MessageError as error WHERE "
0578: + "error.message.id = ?").setString(0,
0579: messageId).executeUpdate();
0580: session.createQuery(
0581: "DELETE FROM Message as msg WHERE msg.id = ?")
0582: .setString(0, messageId).executeUpdate();
0583: } catch (Exception ex) {
0584: log.error(
0585: "Failed to failed to remove the message from the db : "
0586: + ex.getMessage(), ex);
0587: throw new ChangeException(
0588: "Failed to failed to remove the message from the db : "
0589: + ex.getMessage(), ex);
0590: }
0591: }
0592: }
0593:
0594: // class constants
0595: public final static int TEXT_MESSAGE = 1;
0596: public final static int RPC_MESSAGE = 2;
0597:
0598: // the logger reference
0599: protected static Logger log = Logger
0600: .getLogger(MessageManagerImpl.class.getName());
0601:
0602: // private member variable
0603: private String id = null;
0604: private Date nextProcessTime = null;
0605: private String messageQueueName = null;
0606: private MessageImpl masterMessageImpl = null;
0607:
0608: // transaction variables
0609: private Date originalNextProcessTime = null;
0610: private String originalMessageQueueName = null;
0611: private MessageImpl originalMessageImpl = null;
0612:
0613: /**
0614: * Creates a new instance of MessageManagerImpl
0615: *
0616: * @param id The id of the message.
0617: */
0618: public MessageManagerImpl(String id) throws MessageServiceException {
0619: this .id = id;
0620: originalMessageImpl = masterMessageImpl = loadMessage();
0621: }
0622:
0623: /**
0624: * Creates a new instance of MessageManagerImpl
0625: *
0626: * @param message The new message to create.
0627: */
0628: public MessageManagerImpl(Message newMessage)
0629: throws MessageServiceException {
0630: try {
0631: this .id = newMessage.getMessageId();
0632: nextProcessTime = ((MessageImpl) newMessage)
0633: .getNextProcessDate();
0634: originalNextProcessTime = ((MessageImpl) newMessage)
0635: .getNextProcessDate();
0636: originalMessageImpl = (MessageImpl) newMessage;
0637: masterMessageImpl = (MessageImpl) newMessage;
0638: ChangeLog.getInstance().addChange(
0639: new AddMessageChange((MessageImpl) newMessage));
0640: TransactionManager.getInstance().bindResource(this , true);
0641: } catch (MessageServiceException ex) {
0642: throw ex;
0643: } catch (Exception ex) {
0644: log.error("Failed to create the message "
0645: + "from the database : " + ex.getMessage(), ex);
0646: throw new MessageServiceException(
0647: "Failed to create the message "
0648: + "in the database : " + ex.getMessage(),
0649: ex);
0650: }
0651: }
0652:
0653: /**
0654: * This method returns the id of this messsage.
0655: *
0656: * @return The id of the message this object is managing.
0657: */
0658: public String getID() {
0659: return id;
0660: }
0661:
0662: /**
0663: * This method returns the message object.
0664: *
0665: * @return The message object.
0666: * @exception MessageServiceException
0667: */
0668: public Message getMessage() throws MessageServiceException {
0669: return masterMessageImpl;
0670: }
0671:
0672: /**
0673: * This method returns the message object.
0674: *
0675: * @return The message object.
0676: * @exception MessageServiceException
0677: */
0678: public void assignToQueue(String queueName)
0679: throws MessageServiceException {
0680: try {
0681: TransactionManager.getInstance().bindResource(this , true);
0682: ChangeLog.getInstance().addChange(
0683: new AssignMessageToQueueChange(this .id, queueName));
0684: this .messageQueueName = queueName;
0685: } catch (Exception ex) {
0686: log.error(
0687: "Failed to assign this object to a queue because : "
0688: + ex.getMessage(), ex);
0689: throw new MessageServiceException(
0690: "Failed to assign this object to a queue because : "
0691: + ex.getMessage(), ex);
0692: }
0693: }
0694:
0695: /**
0696: * This method updates the message object.
0697: *
0698: * @param updatedMessage The updated message object.
0699: * @exception MessageServiceException
0700: */
0701: public void updateMessage(Message updatedMessage)
0702: throws MessageServiceException {
0703: try {
0704: TransactionManager.getInstance().bindResource(this , true);
0705: ChangeLog.getInstance().addChange(
0706: new UpdateMessageChange(
0707: (MessageImpl) updatedMessage));
0708: nextProcessTime = ((MessageImpl) updatedMessage)
0709: .getNextProcessDate();
0710: masterMessageImpl = (MessageImpl) updatedMessage;
0711: } catch (MessageServiceException ex) {
0712: throw ex;
0713: } catch (Exception ex) {
0714: log.error("Failed to update the message because : "
0715: + ex.getMessage(), ex);
0716: throw new MessageServiceException(
0717: "Failed to update the message because : "
0718: + ex.getMessage(), ex);
0719: }
0720: }
0721:
0722: /**
0723: * This method is responsible from removing this message from the db.
0724: *
0725: * @exception MessageServiceException
0726: */
0727: public void remove() throws MessageServiceException {
0728: try {
0729: TransactionManager.getInstance().bindResource(this , true);
0730: ChangeLog.getInstance().addChange(
0731: new RemoveMessageChange(this .id));
0732: } catch (Exception ex) {
0733: log.error("Failed to failed to remove the message : "
0734: + ex.getMessage(), ex);
0735: throw new MessageServiceException(
0736: "Failed to failed to remove the message : "
0737: + ex.getMessage(), ex);
0738: }
0739: }
0740:
0741: /**
0742: * This method returns the next process time for this message.
0743: *
0744: * @return The date message.
0745: * @exception MessageServiceException
0746: */
0747: public Date nextProcessTime() {
0748: return nextProcessTime;
0749: }
0750:
0751: /**
0752: * This message returns the priority.
0753: */
0754: public int getPriority() {
0755: return this .masterMessageImpl.getPriority();
0756: }
0757:
0758: /**
0759: * This method returns the name of the messaqe queue to which this message
0760: * is assigned.
0761: *
0762: * @return The name of the message queue that this message is assigned to.
0763: */
0764: public String getMessageQueueName() {
0765: return this .messageQueueName;
0766: }
0767:
0768: /**
0769: * This method is called to commit the specified transaction.
0770: *
0771: * @param xid The id of the transaction to commit.
0772: * @param onePhase If true a one phase commit should be used.
0773: * @exception XAException
0774: */
0775: public synchronized void commit(Xid xid, boolean onePhase)
0776: throws XAException {
0777: if (nextProcessTime != null) {
0778: this .originalNextProcessTime = nextProcessTime;
0779: }
0780: if (this .messageQueueName != null) {
0781: this .originalMessageQueueName = messageQueueName;
0782: }
0783: if (this .masterMessageImpl != null) {
0784: this .originalMessageImpl = masterMessageImpl;
0785: }
0786: }
0787:
0788: /**
0789: * The resource manager has dissociated this object from the transaction.
0790: *
0791: * @param xid The id of the transaction that is getting ended.
0792: * @param flags The flags associated with this operation.
0793: * @exception XAException
0794: */
0795: public void end(Xid xid, int flags) throws XAException {
0796: }
0797:
0798: /**
0799: * The transaction has been completed and must be forgotten.
0800: *
0801: * @param xid The id of the transaction to forget.
0802: * @exception XAException
0803: */
0804: public void forget(Xid xid) throws XAException {
0805: if (nextProcessTime != null) {
0806: this .originalNextProcessTime = nextProcessTime;
0807: }
0808: if (this .messageQueueName != null) {
0809: this .originalMessageQueueName = messageQueueName;
0810: }
0811: if (this .masterMessageImpl != null) {
0812: this .originalMessageImpl = masterMessageImpl;
0813: }
0814: }
0815:
0816: /**
0817: * This method returns the transaction timeout for this object.
0818: *
0819: * @return The int containing the transaction timeout.
0820: * @exception XAException
0821: */
0822: public int getTransactionTimeout() throws XAException {
0823: return -1;
0824: }
0825:
0826: /**
0827: * This method returns true if this object is the resource manager getting
0828: * queried.
0829: *
0830: * @return TRUE if this is the resource manager, FALSE if not.
0831: * @param xaResource The resource to perform the check against.
0832: * @exception XAException
0833: */
0834: public boolean isSameRM(XAResource xAResource) throws XAException {
0835: return this == xAResource;
0836: }
0837:
0838: /**
0839: * This is called before a transaction is committed.
0840: *
0841: * @return The results of the transaction.
0842: * @param xid The id of the transaction to check against.
0843: * @exception XAException
0844: */
0845: public int prepare(Xid xid) throws XAException {
0846: return XAResource.XA_OK;
0847: }
0848:
0849: /**
0850: * This method returns the list of transaction branches for this resource
0851: * manager.
0852: *
0853: * @return The list of resource branches.
0854: * @param flags The flags
0855: * @exception XAException
0856: */
0857: public Xid[] recover(int flags) throws XAException {
0858: return null;
0859: }
0860:
0861: /**
0862: * This method is called to roll back the specified transaction.
0863: *
0864: * @param xid The id of the transaction to roll back.
0865: * @exception XAException
0866: */
0867: public void rollback(Xid xid) throws XAException {
0868: nextProcessTime = originalNextProcessTime;
0869: messageQueueName = originalMessageQueueName;
0870: masterMessageImpl = originalMessageImpl;
0871: }
0872:
0873: /**
0874: * This method sets the transaction timeout for this resource manager.
0875: *
0876: * @return TRUE if the transaction timeout can be set successfully.
0877: * @param transactionTimeout The new transaction timeout value.
0878: * @exception XAException
0879: */
0880: public boolean setTransactionTimeout(int transactionTimeout)
0881: throws XAException {
0882: return true;
0883: }
0884:
0885: /**
0886: * This method is called to start a transaction on a resource manager.
0887: *
0888: * @param xid The id of the new transaction.
0889: * @param flags The flags associated with the transaction.
0890: * @exception XAException
0891: */
0892: public void start(Xid xid, int flags) throws XAException {
0893:
0894: }
0895:
0896: /**
0897: * The compare to interface used to order this object in the queues.
0898: *
0899: * @return -1,0,1 depending on the order of the object.
0900: * @param o The object to perform the comparison on.
0901: */
0902: public int compareTo(Object o) {
0903: MessageManagerImpl msg = (MessageManagerImpl) o;
0904: if (msg.nextProcessTime().getTime() > nextProcessTime()
0905: .getTime()) {
0906: return -1;
0907: } else if (nextProcessTime().getTime() > msg.nextProcessTime()
0908: .getTime()) {
0909: return 1;
0910: } else if (msg.getPriority() > getPriority()) {
0911: return -1;
0912: } else if (getPriority() > msg.getPriority()) {
0913: return 1;
0914: }
0915: return 0;
0916: }
0917:
0918: /**
0919: * This method returns the message object.
0920: *
0921: * @return The message object.
0922: * @exception MessageServiceException
0923: */
0924: private MessageImpl loadMessage() throws MessageServiceException {
0925: try {
0926: //MessageTransactionLock.getInstance().lock();
0927: Session session = HibernateUtil.getInstance(
0928: MessageServiceManager.class).getSession();
0929:
0930: com.rift.coad.daemon.messageservice.db.Message message = (com.rift.coad.daemon.messageservice.db.Message) session
0931: .get(
0932: com.rift.coad.daemon.messageservice.db.Message.class,
0933: id);
0934:
0935: MessageImpl result = null;
0936: if (message.getMessageType() == MessageManagerImpl.RPC_MESSAGE) {
0937: RPCMessageImpl rpcMessage = new RPCMessageImpl(message
0938: .getId(), new Date(message.getCreated()
0939: .getTime()), message.getRetries(), new Date(
0940: message.getProcessed().getTime()), message
0941: .getMessageCreator(), message.getSessionId(),
0942: null, message.getFromUrl(), message
0943: .getMessageRoutingType(), message
0944: .getMessageState());
0945: MessageRpcBody rpcBody = (MessageRpcBody) session.get(
0946: MessageRpcBody.class, message.getId());
0947: rpcMessage.setMethodBodyXML(rpcBody.getXml());
0948: if (rpcBody.getExceptionValue() != null) {
0949: rpcMessage.setThrowableBytes(rpcBody
0950: .getExceptionValue().clone());
0951: }
0952: if (rpcBody.getResultValue() != null) {
0953: rpcMessage.setResultBytes(rpcBody.getResultValue()
0954: .clone());
0955: }
0956: result = rpcMessage;
0957: } else {
0958:
0959: TextMessageImpl txtMessage = new TextMessageImpl(
0960: message.getId(), new Date(message.getCreated()
0961: .getTime()), message.getRetries(),
0962: new Date(message.getProcessed().getTime()),
0963: message.getMessageCreator(), message
0964: .getSessionId(), null, message
0965: .getFromUrl(), message
0966: .getMessageRoutingType(), message
0967: .getMessageState());
0968: MessageTxtBody txtBody = (MessageTxtBody) session.get(
0969: MessageTxtBody.class, message.getId());
0970: txtMessage.setTextBody(txtBody.getBody());
0971: result = txtMessage;
0972: }
0973:
0974: // set from
0975: result.setFrom(message.getFromUrl());
0976: result.setPriority(message.getPriority());
0977: result.setNextProcessDate(new Date(message.getNextProcess()
0978: .getTime()));
0979: result.setProcessedDate(new Date(message.getProcessed()
0980: .getTime()));
0981:
0982: // set the target
0983: if (message.getTarget() != null) {
0984: result.setTarget(message.getTarget());
0985: }
0986: // set the reply flag
0987: if (message.getReply() == 1) {
0988: result.setReply(true);
0989: } else {
0990: result.setReply(false);
0991: }
0992: // set the reply to address
0993: if (message.getReplyUrl() != null) {
0994: result.setReplyTo(message.getReplyUrl());
0995: }
0996: // set the named queue
0997: if (message.getTargetNamedQueue() != null) {
0998: result.setTargetNamedQueue(message
0999: .getTargetNamedQueue());
1000: }
1001: if (message.getReplyNamedQueue() != null) {
1002: result.setReplyNamedQueue(message.getReplyNamedQueue());
1003: }
1004: // the correlation id
1005: if (message.getCorrelationId() != null) {
1006: result.setCorrelationId(message.getCorrelationId());
1007: }
1008:
1009: // set the services
1010: List dbServices = session.createQuery(
1011: "FROM MessageService as service WHERE "
1012: + "service.message.id = ?").setString(0,
1013: message.getId()).list();
1014: String[] services = new String[dbServices.size()];
1015: int index = 0;
1016: for (Iterator iter = dbServices.iterator(); iter.hasNext(); index++) {
1017: services[index] = ((MessageService) iter.next())
1018: .getService();
1019: }
1020: result.setServices(services);
1021:
1022: // set properties
1023: List dbProperties = session.createQuery(
1024: "FROM MessageProperty as property WHERE "
1025: + "property.message.id = ?").setString(0,
1026: message.getId()).list();
1027: for (Iterator iter = dbProperties.iterator(); iter
1028: .hasNext();) {
1029: MessageProperty property = (MessageProperty) iter
1030: .next();
1031: if (property.getBoolValue() != null) {
1032: result.setBooleanProperty(property.getName(),
1033: ((Integer) property.getBoolValue())
1034: .intValue() == 1 ? true : false);
1035: } else if (property.getByteValue() != null) {
1036: result.setByteProperty(property.getName(),
1037: ((Integer) property.getByteValue())
1038: .byteValue());
1039: } else if (property.getIntValue() != null) {
1040: result.setPropertyValue(property.getName(),
1041: property.getIntValue());
1042: } else if (property.getLongValue() != null) {
1043: result.setPropertyValue(property.getName(),
1044: property.getLongValue());
1045: } else if (property.getDoubleValue() != null) {
1046: result.setPropertyValue(property.getName(),
1047: property.getDoubleValue());
1048: } else if (property.getFloatValue() != null) {
1049: result.setPropertyValue(property.getName(),
1050: property.getFloatValue());
1051: } else if (property.getObjectValue() != null) {
1052: result.setPropertyValue(property.getName(),
1053: property.getObjectValue());
1054: } else if (property.getStringValue() != null) {
1055: result.setPropertyValue(property.getName(),
1056: property.getStringValue());
1057: }
1058:
1059: }
1060:
1061: List principals = new ArrayList();
1062: List dbPrincipals = session.createQuery(
1063: "FROM MessagePrincipal as principal WHERE "
1064: + "principal.message.id = ?").setString(0,
1065: message.getId()).list();
1066: for (Iterator iter = dbPrincipals.iterator(); iter
1067: .hasNext();) {
1068: MessagePrincipal principal = (MessagePrincipal) iter
1069: .next();
1070: principals.add(principal.getPrincipalValue());
1071: }
1072: result.setMessagePrincipals(principals);
1073:
1074: List dbErrors = session.createQuery(
1075: "FROM MessageError as error WHERE "
1076: + "error.message.id = ?").setString(0,
1077: message.getId()).list();
1078: for (Iterator iter = dbErrors.iterator(); iter.hasNext();) {
1079: com.rift.coad.daemon.messageservice.db.MessageError dbMessageError = (com.rift.coad.daemon.messageservice.db.MessageError) iter
1080: .next();
1081: com.rift.coad.daemon.messageservice.MessageError messageError = new com.rift.coad.daemon.messageservice.MessageError(
1082: new Date(dbMessageError.getErrorDate()
1083: .getTime()), dbMessageError
1084: .getErrorLevel(), dbMessageError
1085: .getMsg());
1086: ((MessageImpl) result).addError(messageError);
1087: }
1088:
1089: if (message.getMessageQueue() != null) {
1090: originalMessageQueueName = messageQueueName = message
1091: .getMessageQueue().getMessageQueueName();
1092: }
1093: if (message.getNextProcess() != null) {
1094: this .nextProcessTime = this .originalNextProcessTime = new Date(
1095: message.getNextProcess().getTime());
1096: }
1097:
1098: return result;
1099: } catch (Exception ex) {
1100: log.error("Failed to load the message because : "
1101: + ex.getMessage(), ex);
1102: throw new MessageServiceException(
1103: "Failed to load the message because : "
1104: + ex.getMessage(), ex);
1105: }
1106: }
1107:
1108: }
|