001: package org.mactor.brokers.tibrv;
002:
003: import org.mactor.brokers.AbstractMessageBroker;
004: import org.mactor.brokers.Message;
005: import org.mactor.brokers.MessageBroker;
006: import org.mactor.framework.MactorException;
007: import org.mactor.framework.spec.MessageBrokersConfig.MessageBrokerConfig;
008: import org.mactor.framework.spec.MessageBrokersConfig.MessageBrokerConfig.ChannelConfig;
009:
010: import com.tibco.tibrv.Tibrv;
011: import com.tibco.tibrv.TibrvException;
012: import com.tibco.tibrv.TibrvListener;
013: import com.tibco.tibrv.TibrvMsg;
014: import com.tibco.tibrv.TibrvMsgCallback;
015: import com.tibco.tibrv.TibrvMsgField;
016: import com.tibco.tibrv.TibrvRvdTransport;
017: import com.tibco.tibrv.TibrvXml;
018:
019: /**
020: * A message broker for Tibco Rendezvous (suitable for Tibco BusinessWorks projects)
021: *
022: * <p>
023: * Message broker config structure (sample):
024: * <pre>
025: * <message_broker_config xmlns="http://schemas.mactor.org/framework">
026: * <message_broker name="DemoTibRvBroker"
027: * archive_consumed_messages="true"
028: * archive_dead_letter_messages="true"
029: * archive_path="/tmp/archive_tibrv"
030: * message_read_interval_seconds="1" message_read_limit="40"
031: * broker_class="org.mactor.brokers.tibrv.TibcoRvMessageBroker">
032: *
033: * <value name="service">7500</value>
034: * <value name="network">localhost</value>
035: * <value name="daemon"></value>
036: *
037: * <channel name="OutgoingOrder">
038: * <value name="subject">business.Order</value>
039: * </channel>
040: * <channel name="IncomingOrderStatus">
041: * <value name="subject">business.OrderStatus</value>
042: * </channel>
043: * </message_broker>
044: * </message_broker_config>
045: * </pre>
046: *
047: * </p>
048: * (requieres tibrvj.jar in classpath)
049: *
050: * @author Lars Ivar Almli
051: * @see MessageBroker
052: */
053: public class TibcoRvMessageBroker extends AbstractMessageBroker {
054: private TibrvRvdTransport transport;
055:
056: private static final String FIELD_NAME = "xml";
057:
058: public TibcoRvMessageBroker(MessageBrokerConfig config)
059: throws MactorException {
060: super (config);
061: try {
062: Tibrv.open(Tibrv.IMPL_NATIVE);
063: this .transport = new TibrvRvdTransport(config
064: .getValue("service"), config.getValue("network"),
065: config.getValue("daemon"));
066: startDipatchThread();
067: } catch (TibrvException tre) {
068: throw new MactorException(
069: "Failed to initialize TibcoRvMessageBroker", tre);
070: }
071: }
072:
073: @Override
074: protected void finalize() throws Throwable {
075: super .finalize();
076: Tibrv.close();
077: }
078:
079: public void publish(String channel, Message message)
080: throws MactorException {
081: sendMessage(channel, message, false);
082: }
083:
084: public Message publishWithResponse(String channel, Message message)
085: throws MactorException {
086: return sendMessage(channel, message, true);
087: }
088:
089: @Override
090: protected void onFirstSubscribe(String channel)
091: throws MactorException {
092: registerListener(channel);
093: }
094:
095: private Message sendMessage(String channel, Message message,
096: boolean expectResponse) throws MactorException {
097: ChannelConfig cc = config.getRequieredChannelConfig(channel);
098: String subject = cc.getRequieredValue("subject");
099: try {
100: TibrvMsg msg = new TibrvMsg();
101: msg.setSendSubject(subject);
102: msg.update(FIELD_NAME, new TibrvXml(message.getContent()
103: .getBytes()));
104: if (expectResponse) {
105: if (log.isDebugEnabled()) {
106: log.debug("Sending message on channel: '" + channel
107: + "', subject: '" + subject
108: + "', message: '" + msg
109: + "'. Expecting reponse message..");
110: }
111: TibrvMsg responseMessage = transport
112: .sendRequest(msg, 0);
113: return messageFromMessage(responseMessage);
114: } else {
115: if (log.isDebugEnabled()) {
116: log.debug("Sending message on channel: '" + channel
117: + "', subject: '" + subject
118: + "', message: '" + msg
119: + "'. No reponse message expected");
120: }
121: transport.send(msg);
122: return null;
123: }
124: } catch (TibrvException tre) {
125: throw new MactorException(tre);
126: }
127: }
128:
129: private void startDipatchThread() {
130: Thread t = new Thread(new Runnable() {
131: public void run() {
132: while (true) {
133: try {
134: Tibrv.defaultQueue().dispatch();
135: } catch (TibrvException e) {
136: log
137: .error(
138: "Error while dispatching message from default queue",
139: e);
140: } catch (InterruptedException ie) {
141: log
142: .info("Interrupted. Terminating dispatch loop..");
143: return;
144: }
145: }
146: }
147: });
148: t.start();
149: }
150:
151: private Message messageFromMessage(TibrvMsg msg)
152: throws MactorException, TibrvException {
153: if (msg == null)
154: throw new MactorException("Received an empty message");
155: TibrvMsgField field = msg.getField(FIELD_NAME);
156: if (field == null)
157: throw new MactorException("Received a message without a '"
158: + FIELD_NAME + "' field");
159: if (!(field.data instanceof TibrvXml)) {
160: throw new MactorException("Received a message where the '"
161: + FIELD_NAME + "' field does not contain XML");
162: }
163: return Message.createMessage(new String(((TibrvXml) field.data)
164: .getBytes()));
165: }
166:
167: private synchronized void registerListener(String channel)
168: throws MactorException {
169: ChannelConfig cc = config.getRequieredChannelConfig(channel);
170: String subject = cc.getRequieredValue("subject");
171: try {
172: new TibrvListener(Tibrv.defaultQueue(),
173: new SubjectListener(channel), transport, subject,
174: null);
175: } catch (TibrvException tre) {
176: throw new MactorException(
177: "Failed to create listener for channel '" + channel
178: + "'", tre);
179: }
180: }
181:
182: private class SubjectListener implements TibrvMsgCallback {
183: String channel;
184:
185: SubjectListener(String channel) {
186: this .channel = channel;
187: }
188:
189: public void onMsg(TibrvListener listener, TibrvMsg msg) {
190: try {
191: if (log.isDebugEnabled()) {
192: log.debug("Received message on channel: '"
193: + channel + "', subject: '"
194: + msg.getSendSubject()
195: + "', with reply subject: '"
196: + msg.getReplySubject() + "', message: '"
197: + msg + "'");
198: }
199: Message m = messageFromMessage(msg);
200: Message resultMessage = raiseOnMessage(channel, m, true);
201: if (resultMessage != null
202: && msg.getReplySubject() != null) {
203: TibrvMsg reyplyMsg = new TibrvMsg();
204: reyplyMsg.update(FIELD_NAME, new TibrvXml(
205: resultMessage.getContent().getBytes()));
206: transport.sendReply(reyplyMsg, msg);
207: }
208: } catch (MactorException me) {
209: log.info(
210: "Exception while processing message from channel '"
211: + channel + "'", me);
212: } catch (TibrvException tre) {
213: log.warn(
214: "Tibco exception while processing message from channel '"
215: + channel + "'", tre);
216: }
217: }
218: }
219: }
|