001: package org.obe.util;
002:
003: import org.apache.commons.cli.BasicParser;
004: import org.apache.commons.cli.CommandLine;
005: import org.apache.commons.cli.Options;
006: import org.apache.commons.cli.ParseException;
007: import org.obe.client.api.ClientConfig;
008:
009: import javax.jms.*;
010: import javax.naming.Context;
011: import javax.naming.InitialContext;
012: import javax.naming.NamingException;
013: import java.io.*;
014: import java.util.*;
015:
016: public class JMSClient implements MessageListener {
017: private static final String JMS_QUEUE_CON_FACTORY = ClientConfig
018: .getJMSQueueConnectionFactory();
019: private static final String JMS_TOPIC_CON_FACTORY = ClientConfig
020: .getJMSTopicConnectionFactory();
021: private static final String NEWLINE = System
022: .getProperty("line.separator");
023: private static final int TEXT_TYPE = 0;
024: private static final int OBJECT_TYPE = 1;
025: private static final int BYTES_TYPE = 2;
026: private static final int MAP_TYPE = 3;
027: private static final int BUF_SIZE = 4096;
028:
029: public static void main(String[] args) {
030: try {
031: CommandLine cl = parseArgs(args);
032: args = cl.getArgs();
033:
034: String action = args[0];
035: String destination = args[1];
036: String hostURL = args.length == 3 ? args[2] : ClientConfig
037: .getServerHostURL();
038:
039: JMSClient jmstest = new JMSClient();
040: if (action.equalsIgnoreCase("publish")
041: || action.equalsIgnoreCase("send")) {
042:
043: String s;
044:
045: int msgType = TEXT_TYPE;
046: if ((s = cl.getOptionValue('t')) != null) {
047: if (s.equalsIgnoreCase("text")) {
048: msgType = TEXT_TYPE;
049: } else if (s.equalsIgnoreCase("object")) {
050: msgType = OBJECT_TYPE;
051: } else if (s.equalsIgnoreCase("bytes")) {
052: msgType = BYTES_TYPE;
053: } else if (s.equalsIgnoreCase("map")) {
054: msgType = MAP_TYPE;
055: } else {
056: throw new IllegalArgumentException(
057: "invalid message type");
058: }
059: }
060:
061: Object content = null;
062: if ((s = cl.getOptionValue('f')) != null) {
063: FileInputStream in = new FileInputStream(s);
064: switch (msgType) {
065: case TEXT_TYPE:
066: content = readText(in);
067: break;
068: case OBJECT_TYPE:
069: content = readObject(in);
070: break;
071: case BYTES_TYPE:
072: content = readBytes(in);
073: break;
074: case MAP_TYPE:
075: content = readProperties(in);
076: break;
077: }
078: }
079:
080: Properties props = null;
081: if ((s = cl.getOptionValue('p')) != null) {
082: FileInputStream in = new FileInputStream(s);
083: props = readProperties(in);
084: }
085:
086: if (action.equalsIgnoreCase("send"))
087: jmstest.send(hostURL, destination, content,
088: msgType, props);
089: else if (action.equalsIgnoreCase("publish"))
090: jmstest.publish(hostURL, destination, content,
091: msgType, props);
092: } else if (action.equalsIgnoreCase("subscribe")) {
093: jmstest.subscribe(hostURL, destination);
094: } else if (action.equalsIgnoreCase("receive")) {
095: jmstest.receive(hostURL, destination);
096: } else {
097: usage();
098: }
099: } catch (Exception e) {
100: e.printStackTrace();
101: System.exit(2);
102: }
103: }
104:
105: private static String readText(InputStream in) throws IOException {
106: try {
107: // N.B. This technique assumes the default charset. It may not work
108: // correctly with, for example, XML files that declare (and actually
109: // use) a different encoding.
110: Reader rdr = new InputStreamReader(in);
111: StringWriter out = new StringWriter();
112: char[] buf = new char[BUF_SIZE];
113: int i;
114: while ((i = rdr.read(buf)) > 0)
115: out.write(buf, 0, i);
116: out.close();
117: return out.toString();
118: } finally {
119: in.close();
120: }
121: }
122:
123: private static byte[] readBytes(InputStream in) throws IOException {
124: try {
125: ByteArrayOutputStream out = new ByteArrayOutputStream();
126: byte[] buf = new byte[BUF_SIZE];
127: int i;
128: while ((i = in.read(buf)) > 0)
129: out.write(buf, 0, i);
130: out.close();
131: return out.toByteArray();
132: } finally {
133: in.close();
134: }
135: }
136:
137: private static Properties readProperties(InputStream in)
138: throws IOException {
139:
140: try {
141: Properties props = new Properties();
142: props.load(in);
143: return props;
144: } finally {
145: in.close();
146: }
147: }
148:
149: private static Object readObject(InputStream in)
150: throws IOException, ClassNotFoundException {
151:
152: try {
153: ObjectInputStream ois = new ObjectInputStream(in);
154: Object obj = ois.readObject();
155: ois.close();
156: return obj;
157: } finally {
158: in.close();
159: }
160: }
161:
162: private static CommandLine parseArgs(String[] args) {
163: CommandLine cl = null;
164: try {
165: BasicParser parser = new BasicParser();
166: Options options = new Options();
167: options.addOption("f", true, "message file");
168: options.addOption("t", true,
169: "message type: text|object|bytes|map");
170: options.addOption("p", true, "message properties file");
171: cl = parser.parse(options, args, true);
172: int argc = cl.getArgs().length;
173: if (argc < 2 || argc > 3)
174: usage();
175: } catch (ParseException e) {
176: usage();
177: }
178: return cl;
179: }
180:
181: private static void usage() {
182: System.out
183: .println("usage: org.obe.util.JMSClient [-f <msg-file>] [-t <msg-type>] [-p <props-file>] <action> <destination> [<host-url>]");
184: System.out.println(" send/publish only:");
185: System.out
186: .println(" msg-file ::= file from which to read the message");
187: System.out
188: .println(" msg-type ::= text | object | bytes | map");
189: System.out
190: .println(" props-file ::= message properties file");
191: System.out.println();
192: System.out
193: .println(" action ::= send|receive | publish|subscribe");
194: System.out
195: .println(" destination ::= queue-name | topic-name");
196: System.out
197: .println(" (Specify JMS connection factory JNDI names in obe.properties)");
198: System.exit(1);
199: }
200:
201: private static Message createMessage(int msgType, Session session,
202: Object content, Map properties) throws JMSException {
203:
204: Message msg = null;
205: switch (msgType) {
206: case TEXT_TYPE:
207: msg = session.createTextMessage((String) content);
208: break;
209: case OBJECT_TYPE:
210: msg = session.createObjectMessage((Serializable) content);
211: break;
212: case BYTES_TYPE:
213: BytesMessage bytesMsg = session.createBytesMessage();
214: bytesMsg.writeBytes((byte[]) content);
215: msg = bytesMsg;
216: break;
217: case MAP_TYPE:
218: MapMessage mapMsg = session.createMapMessage();
219: for (Iterator i = ((Map) content).entrySet().iterator(); i
220: .hasNext();) {
221:
222: Map.Entry entry = (Map.Entry) i.next();
223: mapMsg.setObject((String) entry.getKey(), entry
224: .getValue());
225: }
226: msg = mapMsg;
227: break;
228: }
229: if (properties != null) {
230: for (Iterator iter = properties.entrySet().iterator(); iter
231: .hasNext();) {
232: Map.Entry entry = (Map.Entry) iter.next();
233: msg.setObjectProperty((String) entry.getKey(), entry
234: .getValue());
235: }
236: }
237: return msg;
238: }
239:
240: private static void publish(String url, String destJndiName,
241: Object content, int msgType, Map properties)
242: throws NamingException, JMSException, IOException {
243:
244: Context ctx = getInitialContext(url);
245: TopicConnectionFactory tcf = (TopicConnectionFactory) ctx
246: .lookup(JMS_TOPIC_CON_FACTORY);
247: Topic topic = (Topic) ctx.lookup(destJndiName);
248: ctx.close();
249: TopicConnection tcon = tcf.createTopicConnection();
250: TopicSession tsession = tcon.createTopicSession(false, 1);
251: TopicPublisher tpub = tsession.createPublisher(topic);
252:
253: if (content != null) {
254: Message msg = createMessage(msgType, tsession, content,
255: properties);
256: System.out.println("Publishing message");
257: tpub.publish(msg);
258: } else {
259: TextMessage msg = tsession.createTextMessage();
260: System.out.println("Publishing to JMS Topic: " + topic);
261: System.out
262: .println("Enter text<\\n><\\n> to publish message, or ! to terminate");
263: System.out.flush();
264: BufferedReader in = new BufferedReader(
265: new InputStreamReader(System.in));
266: StringBuffer sb = new StringBuffer();
267: String line;
268: while (!"!".equals(line = in.readLine())) {
269: if (line != null && line.length() != 0) {
270: sb.append(line);
271: sb.append(NEWLINE);
272: } else {
273: if (sb.length() > 0) {
274: String msgText = sb.toString();
275: System.out.println("Publishing message:");
276: System.out.println(msgText);
277: msg.setText(msgText);
278: tpub.publish(msg);
279: sb.setLength(0);
280: }
281:
282: // If we've reached the end of input, get out of the loop.
283: if (line == null)
284: break;
285: }
286: }
287: }
288:
289: tpub.close();
290: tsession.close();
291: tcon.stop();
292: tcon.close();
293: }
294:
295: private static void send(String url, String destJndiName,
296: Object content, int msgType, Map properties)
297: throws NamingException, JMSException, IOException {
298:
299: Context ctx = getInitialContext(url);
300: QueueConnectionFactory tcf = (QueueConnectionFactory) ctx
301: .lookup(JMS_QUEUE_CON_FACTORY);
302: Queue queue = (Queue) ctx.lookup(destJndiName);
303: ctx.close();
304: QueueConnection qcon = tcf.createQueueConnection();
305: QueueSession qsession = qcon.createQueueSession(false, 1);
306: QueueSender qsend = qsession.createSender(queue);
307:
308: if (content != null) {
309: Message msg = createMessage(msgType, qsession, content,
310: properties);
311: System.out.println("Sending message");
312: qsend.send(msg);
313: } else {
314: TextMessage msg = qsession.createTextMessage();
315: System.out.println("Sending to JMS Queue: " + queue);
316: System.out
317: .println("Enter text<\\n><\\n> to send message, or ! to terminate");
318: System.out.flush();
319: BufferedReader in = new BufferedReader(
320: new InputStreamReader(System.in));
321: StringBuffer sb = new StringBuffer();
322: String line;
323: while (!"!".equals(line = in.readLine())) {
324: if (line != null && line.length() != 0) {
325: sb.append(line);
326: sb.append(NEWLINE);
327: } else {
328: if (sb.length() > 0) {
329: String msgText = sb.toString();
330: System.out.println("Sending message:");
331: System.out.println(msgText);
332: msg.setText(msgText);
333: qsend.send(msg);
334: sb.setLength(0);
335: }
336:
337: // If we've reached the end of input, get out of the loop.
338: if (line == null)
339: break;
340: }
341: }
342: }
343:
344: qsend.close();
345: qsession.close();
346: qcon.stop();
347: qcon.close();
348: }
349:
350: private static Context getInitialContext(String s)
351: throws NamingException {
352: Hashtable hashtable = new Hashtable();
353: hashtable.put(Context.PROVIDER_URL, s);
354: return new InitialContext(hashtable);
355: }
356:
357: private JMSClient() {
358: }
359:
360: private void subscribe(String url, String destJndiName)
361: throws NamingException, JMSException, IOException {
362:
363: Context ctx = getInitialContext(url);
364: TopicConnectionFactory tcf = (TopicConnectionFactory) ctx
365: .lookup(JMS_TOPIC_CON_FACTORY);
366: Topic topic = (Topic) ctx.lookup(destJndiName);
367: ctx.close();
368: TopicConnection tcon = tcf.createTopicConnection();
369: TopicSession tsession = tcon.createTopicSession(false, 1);
370: TopicSubscriber tsub = tsession.createSubscriber(topic);
371: tsub.setMessageListener(this );
372: tcon.start();
373: System.out.println("Subscribed to JMS Topic: " + topic);
374: System.out.println("Press any key to terminate");
375: System.in.read();
376: tsession.close();
377: tcon.stop();
378: tcon.close();
379: }
380:
381: private void receive(String url, String destJndiName)
382: throws NamingException, JMSException, IOException {
383:
384: Context ctx = getInitialContext(url);
385: QueueConnectionFactory qcf = (QueueConnectionFactory) ctx
386: .lookup(JMS_QUEUE_CON_FACTORY);
387: Queue queue = (Queue) ctx.lookup(destJndiName);
388: ctx.close();
389: QueueConnection qcon = qcf.createQueueConnection();
390: QueueSession qsession = qcon.createQueueSession(false, 1);
391: QueueReceiver qrcvr = qsession.createReceiver(queue);
392: qrcvr.setMessageListener(this );
393: qcon.start();
394: System.out.println("Receiving from JMS Queue: " + queue);
395: System.out.println("Press any key to terminate");
396: System.in.read();
397: qsession.close();
398: qcon.stop();
399: qcon.close();
400: }
401:
402: public void onMessage(Message msg) {
403: try {
404: String s = msg instanceof TextMessage ? ((TextMessage) msg)
405: .getText() : msg.toString();
406: String s1 = msg.getClass().getName();
407: s1 = s1.substring(s1.lastIndexOf('.') + 1);
408: System.out.println("Received " + s1);
409: Enumeration enumeration = msg.getPropertyNames();
410: if (enumeration.hasMoreElements()) {
411: System.out.println("Message header: ");
412: while (enumeration.hasMoreElements()) {
413: String s2 = (String) enumeration.nextElement();
414: System.out.println('\t' + s2 + '='
415: + msg.getObjectProperty(s2));
416: }
417:
418: }
419: System.out.println("Message body: " + s);
420: System.out
421: .println("----------------------------------------");
422: } catch (JMSException e) {
423: e.printStackTrace();
424: }
425: }
426: }
|