001: /*
002: * <copyright>
003: *
004: * Copyright 1997-2004 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026: package org.cougaar.mts.jms;
027:
028: import java.io.Serializable;
029: import java.net.URI;
030: import java.util.HashMap;
031: import java.util.Map;
032:
033: import javax.jms.DeliveryMode;
034: import javax.jms.Destination;
035: import javax.jms.JMSException;
036: import javax.jms.Message;
037: import javax.jms.MessageProducer;
038: import javax.jms.ObjectMessage;
039: import javax.jms.Session;
040:
041: import org.cougaar.core.mts.MessageAttributes;
042: import org.cougaar.core.thread.SchedulableStatus;
043: import org.cougaar.mts.base.CommFailureException;
044: import org.cougaar.mts.base.MisdeliveredMessageException;
045: import org.cougaar.util.log.Logger;
046: import org.cougaar.util.log.Logging;
047:
048: /**
049: * This utility class does the low-level work to force the jms linkprotocol to
050: * behave like a synchronous rpc. In particular it blocks the sending thread
051: * until a reply for the outgoing message arrives, generates and sends replies
052: * for incoming messages, and processes received replies by waking the
053: * corresponding thread.
054: */
055: public class ReplySync {
056: public static final int DEFAULT_TIMEOUT = 5000;
057: private static final String ID_PROP = "MTS_MSG_ID";
058: private static final String IS_MTS_REPLY_PROP = "MTS_REPLY";
059: private static int ID = 0;
060:
061: protected final JMSLinkProtocol lp;
062: private final Map<Integer, Object> pending;
063: private final Map<Integer, Object> replyData;
064: private final int timeout;
065: protected final Logger log;
066:
067: public ReplySync(JMSLinkProtocol lp) {
068: this (lp, DEFAULT_TIMEOUT);
069: }
070:
071: public ReplySync(JMSLinkProtocol lp, int timeout) {
072: this .lp = lp;
073: this .pending = new HashMap<Integer, Object>();
074: this .replyData = new HashMap<Integer, Object>();
075: this .log = Logging.getLogger(getClass().getName());
076: this .timeout = timeout;
077: }
078:
079: protected void setMessageProperties(Message message, Integer id,
080: URI uri, Destination destination) throws JMSException {
081: message.setIntProperty(ID_PROP, id.intValue());
082: message.setBooleanProperty(IS_MTS_REPLY_PROP, false);
083: }
084:
085: public MessageAttributes sendMessage(Message message, URI uri,
086: Destination destination) throws JMSException,
087: CommFailureException, MisdeliveredMessageException {
088: message.setJMSReplyTo(lp.getServant());
089: message.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT);
090: Integer id = new Integer(++ID);
091: setMessageProperties(message, id, uri, destination);
092:
093: Object lock = new Object();
094: pending.put(id, lock);
095: long startTime = System.currentTimeMillis();
096: SchedulableStatus.beginNetIO("JMS RPC");
097: synchronized (lock) {
098: lp.getGenericProducer().send(destination, message);
099: while (true) {
100: try {
101: lock.wait(timeout); // TODO: timeout should be set
102: // dynamically
103: break;
104: } catch (InterruptedException ex) {
105:
106: }
107: }
108: }
109: SchedulableStatus.endBlocking();
110: long sendTime = System.currentTimeMillis() - startTime;
111: Object result = replyData.remove(id);
112: pending.remove(id);
113: if (result instanceof MessageAttributes) {
114: return (MessageAttributes) result;
115: } else if (result instanceof MisdeliveredMessageException) {
116: MisdeliveredMessageException ex = (MisdeliveredMessageException) result;
117: throw ex;
118: } else if (sendTime >= timeout) {
119: throw new CommFailureException(new RuntimeException(
120: "Timeout waiting for reply = " + sendTime));
121: } else {
122: throw new CommFailureException(new RuntimeException(
123: "Weird Reply" + result));
124: }
125: }
126:
127: protected void setReplyProperties(ObjectMessage omsg,
128: ObjectMessage replyMsg) throws JMSException {
129: replyMsg.setBooleanProperty(IS_MTS_REPLY_PROP, true);
130: replyMsg.setIntProperty(ID_PROP, omsg.getIntProperty(ID_PROP));
131: }
132:
133: protected Session getLinkProtocolSession() {
134: return lp.getSession();
135: }
136:
137: protected MessageProducer getLinkProtocolGenericProducer() {
138: return lp.getGenericProducer();
139: }
140:
141: public void replyToMessage(ObjectMessage omsg, Object replyData)
142: throws JMSException {
143: ObjectMessage replyMsg = getLinkProtocolSession()
144: .createObjectMessage((Serializable) replyData);
145: replyMsg.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT);
146: setReplyProperties(omsg, replyMsg);
147: Destination dest = omsg.getJMSReplyTo();
148: getLinkProtocolGenericProducer().send(dest, replyMsg);
149: }
150:
151: public boolean isReply(ObjectMessage msg) {
152: try {
153: boolean isReply = msg.getBooleanProperty(IS_MTS_REPLY_PROP);
154: if (log.isDebugEnabled()) {
155: log.debug("Value of " + IS_MTS_REPLY_PROP
156: + " property is " + isReply);
157: }
158: if (!isReply) {
159: return false;
160: }
161: Integer id = new Integer(msg.getIntProperty(ID_PROP));
162: if (log.isDebugEnabled()) {
163: log.debug("Value of " + ID_PROP + " property is " + id);
164: }
165: replyData.put(id, msg.getObject());
166: Object lock = pending.get(id);
167: if (lock != null) {
168: synchronized (lock) {
169: lock.notify();
170: }
171: } else {
172: if (log.isWarnEnabled()) {
173: log.warn("Got reply for message we timed out, id="
174: + id + " msg=" + msg);
175: }
176: }
177: return true;
178: } catch (JMSException e) {
179: log.error("Error checking reply status: " + e.getMessage(),
180: e);
181: return false;
182: }
183: }
184:
185: }
|