001: /******************************************************************************
002: * Copyright (C) Lars Ivar Almli. All rights reserved. *
003: * ---------------------------------------------------------------------------*
004: * This file is part of MActor. *
005: * *
006: * MActor is free software; you can redistribute it and/or modify *
007: * it under the terms of the GNU General Public License as published by *
008: * the Free Software Foundation; either version 2 of the License, or *
009: * (at your option) any later version. *
010: * *
011: * MActor is distributed in the hope that it will be useful, *
012: * but WITHOUT ANY WARRANTY; without even the implied warranty of *
013: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
014: * GNU General Public License for more details. *
015: * *
016: * You should have received a copy of the GNU General Public License *
017: * along with MActor; if not, write to the Free Software *
018: * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA *
019: ******************************************************************************/package org.mactor.brokers.mqseries;
020:
021: import java.io.ByteArrayInputStream;
022: import java.io.IOException;
023: import java.util.HashMap;
024: import java.util.Hashtable;
025: import java.util.LinkedList;
026: import java.util.List;
027: import java.util.Map;
028: import org.mactor.brokers.Message;
029: import org.mactor.brokers.PollingMessageBrokerTemplate;
030: import org.mactor.framework.MactorException;
031: import org.mactor.framework.ParseUtil;
032: import org.mactor.framework.spec.MessageBrokersConfig.MessageBrokerConfig;
033: import org.mactor.framework.spec.MessageBrokersConfig.MessageBrokerConfig.ChannelConfig;
034: import com.ibm.mq.MQC;
035: import com.ibm.mq.MQException;
036: import com.ibm.mq.MQGetMessageOptions;
037: import com.ibm.mq.MQMessage;
038: import com.ibm.mq.MQMsg2;
039: import com.ibm.mq.MQPutMessageOptions;
040: import com.ibm.mq.MQQueue;
041: import com.ibm.mq.MQQueueManager;
042:
043: /**
044: * A message broker for IBM MQ Series (requieres com.ibm.mq.jar and j2ee.jar in
045: * classpath)
046: * <p>
047: * Message broker config structure (sample):
048: *
049: * <pre>
050: * <message_broker_config xmlns="http://schemas.mactor.org/framework">
051: * <message_broker name="DemoMQSeriesBroker"
052: * archive_consumed_messages="true"
053: * archive_dead_letter_messages="true"
054: * archive_path="/tmp/archive_mq"
055: * message_read_interval_seconds="1" message_read_limit="40"
056: * broker_class="org.mactor.brokers.mqseries.MqSeriesMessageBroker">
057: *
058: * <channel name="OutgoingOrder">
059: * <value name="queue_manager">MQUTV</value>
060: * <value name="host">10.0.0.10</value>
061: * <value name="port">1414</value>
062: * <value name="queue">QA.UTVK.Order</value>
063: * <value name="channel">UTV.ORDER</value>
064: * </channel>
065: * <channel name="IncomingOrderStatus">
066: * <value name="queue_manager">MQUTV</value>
067: * <value name="host">10.0.0.10</value>
068: * <value name="port">1414</value>
069: * <value name="queue">QA.UTVK.OrderStatus</value>
070: * <value name="channel">UTV.ORDER</value>
071: * </channel>
072: * </message_broker>
073: * </message_broker_config>
074: * </pre>
075: *
076: * @author Lars Ivar Almli
077: */
078: public class MqSeriesMessageBroker extends PollingMessageBrokerTemplate {
079: private Map<String, MqConnectionWrapper> channelMap = new HashMap<String, MqConnectionWrapper>();
080:
081: private MessageBrokerConfig config;
082: static {
083: MQException.log = null;
084: }
085:
086: public MqSeriesMessageBroker(MessageBrokerConfig config) {
087: super (config);
088: this .config = config;
089: }
090:
091: private MqConnectionWrapper getMqConnectionWrapper(String channel)
092: throws MactorException {
093: MqConnectionWrapper w = channelMap.get(channel);
094: if (w == null) {
095: w = new MqConnectionWrapper(config
096: .getRequieredChannelConfig(channel));
097: channelMap.put(channel, w);
098: }
099: return w;
100: }
101:
102: protected List<Message> doGetMessages(String channel,
103: int maxMessageCount) throws MactorException {
104: List<Message> messages = new LinkedList<Message>();
105: MqConnectionWrapper cw = getMqConnectionWrapper(channel);
106: for (int i = 0; i < maxMessageCount; i++) {
107: Message m = cw.getMessage();
108: if (m == null)
109: break;
110: messages.add(m);
111: }
112: return messages;
113: }
114:
115: protected void doPublishMessage(String channel, Message message)
116: throws MactorException {
117: getMqConnectionWrapper(channel).sendMessage(message);
118: }
119:
120: private class MqConnectionWrapper {
121: private String channel;
122:
123: private String mqQueueManagerName;
124:
125: private String mqHost;
126:
127: private int mqPort;
128:
129: private String mqQueue;
130:
131: private String mqChannel;
132:
133: private MQQueue mqOutgoingQueue;
134:
135: private MQQueue mqIncomingQueue;
136:
137: private MQQueueManager mqQueueManager;
138:
139: public MqConnectionWrapper(ChannelConfig cf)
140: throws MactorException {
141: this .mqQueueManagerName = cf
142: .getRequieredValue("queue_manager");
143: this .mqHost = cf.getRequieredValue("host");
144: this .mqPort = ParseUtil.tryParseIntVal(cf
145: .getRequieredValue("port"));
146: this .mqQueue = cf.getRequieredValue("queue");
147: this .mqChannel = cf.getRequieredValue("channel");
148: this .channel = cf.getName();
149: }
150:
151: public void sendMessage(Message message) throws MactorException {
152: try {
153: MQMsg2 msg = new MQMsg2();
154: msg.setMessageData(message.getContent().trim()
155: .getBytes());
156: MQPutMessageOptions pmo = new MQPutMessageOptions();
157: getOutgoingQueue().putMsg2(msg, pmo);
158: } catch (MQException me) {
159: throw new MactorException(
160: "MQ Series error occured with completion code '"
161: + me.completionCode
162: + "' and reeason code '"
163: + me.reasonCode
164: + "' while attempting to send to channel '"
165: + channel + "'");
166: }
167: }
168:
169: public Message getMessage() throws MactorException {
170: try {
171: MQMessage rcvMessage = new MQMessage();
172: MQGetMessageOptions gmo = new MQGetMessageOptions();
173: gmo.options = MQC.MQGMO_WAIT;
174: gmo.waitInterval = 500;// 500ms
175: getIncomingQueue().get(rcvMessage, gmo);
176: int len = rcvMessage.getTotalMessageLength();
177: if (len > 0) {
178: byte[] buffer = new byte[rcvMessage
179: .getTotalMessageLength()];
180: rcvMessage.readFully(buffer);
181: return Message
182: .createMessage(new ByteArrayInputStream(
183: buffer));
184: }
185: } catch (MQException me) {
186: if (me.reasonCode == 2033) {// no messages
187: } else
188: throw new MactorException(
189: "MQ Series error occured with completion code '"
190: + me.completionCode
191: + "' and reeason code '"
192: + me.reasonCode
193: + "' while attempting to read from channel '"
194: + channel + "'");
195: } catch (IOException ioe) {
196: ioe.printStackTrace();
197: throw new MactorException(
198: "An IOException occured while trying to read a message from channel '"
199: + channel + "'", ioe);
200: }
201: return null;
202: }
203:
204: private Hashtable buildProperties() {
205: Hashtable props = new Hashtable();
206: props.put(MQC.TRANSPORT_PROPERTY,
207: MQC.TRANSPORT_MQSERIES_CLIENT);
208: props.put(MQC.HOST_NAME_PROPERTY, mqHost);
209: props.put(MQC.PORT_PROPERTY, new Integer(mqPort));
210: props.put(MQC.CHANNEL_PROPERTY, mqChannel);
211: return props;
212: }
213:
214: private MQQueueManager getManager() throws MQException {
215: if (mqQueueManager == null) {
216: mqQueueManager = new MQQueueManager(mqQueueManagerName,
217: buildProperties());
218: }
219: return mqQueueManager;
220: }
221:
222: private MQQueue getOutgoingQueue() throws MQException {
223: if (mqOutgoingQueue == null) {
224: mqOutgoingQueue = getManager().accessQueue(mqQueue,
225: MQC.MQOO_OUTPUT);
226: }
227: return mqOutgoingQueue;
228: }
229:
230: private MQQueue getIncomingQueue() throws MQException {
231: if (mqIncomingQueue == null) {
232: mqIncomingQueue = getManager().accessQueue(mqQueue,
233: MQC.MQOO_INPUT_AS_Q_DEF);
234: }
235: return mqIncomingQueue;
236: }
237:
238: @Override
239: protected void finalize() throws Throwable {
240: super .finalize();
241: close();
242: }
243:
244: public void close() {
245: try {
246: if (mqOutgoingQueue != null)
247: mqOutgoingQueue.close();
248: if (mqIncomingQueue != null)
249: mqIncomingQueue.close();
250: mqQueueManager.disconnect();
251: } catch (MQException ex) {
252: log
253: .info("A WebSphere MQ Error occured : Completion Code "
254: + ex.completionCode
255: + " Reason Code "
256: + ex.reasonCode);
257: } catch (Exception ex) {
258: log
259: .info("An IOException occured whilst writing to the message buffer: "
260: + ex);
261: }
262: }
263: }
264: }
|