001: /*******************************************************************************
002: * Licensed to the Apache Software Foundation (ASF) under one
003: * or more contributor license agreements. See the NOTICE file
004: * distributed with this work for additional information
005: * regarding copyright ownership. The ASF licenses this file
006: * to you under the Apache License, Version 2.0 (the
007: * "License"); you may not use this file except in compliance
008: * with the License. You may obtain a copy of the License at
009: *
010: * http://www.apache.org/licenses/LICENSE-2.0
011: *
012: * Unless required by applicable law or agreed to in writing,
013: * software distributed under the License is distributed on an
014: * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015: * KIND, either express or implied. See the License for the
016: * specific language governing permissions and limitations
017: * under the License.
018: *******************************************************************************/package org.ofbiz.service.jms;
019:
020: import java.util.ArrayList;
021: import java.util.HashMap;
022: import java.util.Iterator;
023: import java.util.List;
024: import java.util.Map;
025:
026: import javax.jms.JMSException;
027: import javax.jms.MapMessage;
028: import javax.jms.Message;
029: import javax.jms.Queue;
030: import javax.jms.QueueConnection;
031: import javax.jms.QueueConnectionFactory;
032: import javax.jms.QueueSender;
033: import javax.jms.QueueSession;
034: import javax.jms.Session;
035: import javax.jms.Topic;
036: import javax.jms.TopicConnection;
037: import javax.jms.TopicConnectionFactory;
038: import javax.jms.TopicPublisher;
039: import javax.jms.TopicSession;
040: import javax.jms.XAQueueConnection;
041: import javax.jms.XAQueueConnectionFactory;
042: import javax.jms.XAQueueSession;
043: import javax.naming.InitialContext;
044: import javax.naming.NamingException;
045: import javax.transaction.xa.XAResource;
046:
047: import org.ofbiz.base.config.GenericConfigException;
048: import org.ofbiz.base.util.Debug;
049: import org.ofbiz.base.util.GeneralException;
050: import org.ofbiz.base.util.JNDIContextFactory;
051: import org.ofbiz.base.util.UtilXml;
052: import org.ofbiz.entity.serialize.XmlSerializer;
053: import org.ofbiz.entity.transaction.GenericTransactionException;
054: import org.ofbiz.entity.transaction.TransactionUtil;
055: import org.ofbiz.service.GenericRequester;
056: import org.ofbiz.service.GenericServiceException;
057: import org.ofbiz.service.ModelService;
058: import org.ofbiz.service.ServiceDispatcher;
059: import org.ofbiz.service.ServiceUtil;
060: import org.ofbiz.service.config.ServiceConfigUtil;
061: import org.ofbiz.service.engine.AbstractEngine;
062: import org.w3c.dom.Element;
063:
064: /**
065: * AbstractJMSEngine
066: */
067: public class JmsServiceEngine extends AbstractEngine {
068:
069: public static final String module = JmsServiceEngine.class
070: .getName();
071:
072: public JmsServiceEngine(ServiceDispatcher dispatcher) {
073: super (dispatcher);
074: }
075:
076: protected Element getServiceElement(ModelService modelService)
077: throws GenericServiceException {
078: Element rootElement = null;
079:
080: try {
081: rootElement = ServiceConfigUtil.getXmlRootElement();
082: } catch (GenericConfigException e) {
083: throw new GenericServiceException(
084: "Error getting JMS Service element", e);
085: }
086:
087: String location = this .getLocation(modelService);
088:
089: Element serviceElement = UtilXml.firstChildElement(rootElement,
090: "jms-service", "name", location);
091:
092: if (serviceElement == null) {
093: throw new GenericServiceException(
094: "Cannot find an JMS service definition for the name ["
095: + location
096: + "] in the serviceengine.xml file");
097: }
098: return serviceElement;
099: }
100:
101: protected Message makeMessage(Session session,
102: ModelService modelService, Map context)
103: throws GenericServiceException, JMSException {
104: List outParams = modelService.getParameterNames(
105: ModelService.OUT_PARAM, false);
106:
107: if (outParams != null && outParams.size() > 0)
108: throw new GenericServiceException(
109: "JMS service cannot have required OUT parameters; no parameters will be returned.");
110: String xmlContext = null;
111:
112: try {
113: if (Debug.verboseOn())
114: Debug.logVerbose("Serializing Context --> " + context,
115: module);
116: xmlContext = XmlSerializer.serialize(context);
117: } catch (Exception e) {
118: throw new GenericServiceException(
119: "Cannot serialize context.", e);
120: }
121: MapMessage message = session.createMapMessage();
122:
123: message.setString("serviceName", modelService.invoke);
124: message.setString("serviceContext", xmlContext);
125: return message;
126: }
127:
128: protected List serverList(Element serviceElement)
129: throws GenericServiceException {
130: String sendMode = serviceElement.getAttribute("send-mode");
131: List serverList = UtilXml.childElementList(serviceElement,
132: "server");
133:
134: if (sendMode.equals("none")) {
135: return new ArrayList();
136: } else if (sendMode.equals("all")) {
137: return serverList;
138: } else {
139: throw new GenericServiceException(
140: "Requested send mode not supported.");
141: }
142: }
143:
144: protected Map runTopic(ModelService modelService, Map context,
145: Element server) throws GenericServiceException {
146: String serverName = server.getAttribute("jndi-server-name");
147: String jndiName = server.getAttribute("jndi-name");
148: String topicName = server.getAttribute("topic-queue");
149: String userName = server.getAttribute("username");
150: String password = server.getAttribute("password");
151: String clientId = server.getAttribute("client-id");
152:
153: InitialContext jndi = null;
154: TopicConnectionFactory factory = null;
155: TopicConnection con = null;
156:
157: try {
158: jndi = JNDIContextFactory.getInitialContext(serverName);
159: factory = (TopicConnectionFactory) jndi.lookup(jndiName);
160: } catch (GeneralException ge) {
161: throw new GenericServiceException(
162: "Problems getting JNDI InitialContext.", ge
163: .getNested());
164: } catch (NamingException ne) {
165: JNDIContextFactory.clearInitialContext(serverName);
166: try {
167: jndi = JNDIContextFactory.getInitialContext(serverName);
168: factory = (TopicConnectionFactory) jndi
169: .lookup(jndiName);
170: } catch (GeneralException ge2) {
171: throw new GenericServiceException(
172: "Problems getting JNDI InitialContext.", ge2
173: .getNested());
174: } catch (NamingException ne2) {
175: throw new GenericServiceException(
176: "JNDI lookup problems.", ne);
177: }
178: }
179:
180: try {
181: con = factory.createTopicConnection(userName, password);
182:
183: if (clientId != null && clientId.length() > 1)
184: con.setClientID(clientId);
185: con.start();
186:
187: TopicSession session = con.createTopicSession(false,
188: Session.AUTO_ACKNOWLEDGE);
189: Topic topic = (Topic) jndi.lookup(topicName);
190: TopicPublisher publisher = session.createPublisher(topic);
191:
192: // create/send the message
193: Message message = makeMessage(session, modelService,
194: context);
195:
196: publisher.publish(message);
197: if (Debug.verboseOn())
198: Debug.logVerbose("Sent JMS Message to " + topicName,
199: module);
200:
201: // close the connections
202: publisher.close();
203: session.close();
204: con.close();
205: } catch (NamingException ne) {
206: throw new GenericServiceException(
207: "Problems with JNDI lookup.", ne);
208: } catch (JMSException je) {
209: throw new GenericServiceException("JMS Internal Error.", je);
210: }
211: return ServiceUtil.returnSuccess();
212:
213: }
214:
215: protected Map runQueue(ModelService modelService, Map context,
216: Element server) throws GenericServiceException {
217: String serverName = server.getAttribute("jndi-server-name");
218: String jndiName = server.getAttribute("jndi-name");
219: String queueName = server.getAttribute("topic-queue");
220: String userName = server.getAttribute("username");
221: String password = server.getAttribute("password");
222: String clientId = server.getAttribute("client-id");
223:
224: InitialContext jndi = null;
225: QueueConnectionFactory factory = null;
226: QueueConnection con = null;
227:
228: try {
229: jndi = JNDIContextFactory.getInitialContext(serverName);
230: factory = (QueueConnectionFactory) jndi.lookup(jndiName);
231: } catch (GeneralException ge) {
232: throw new GenericServiceException(
233: "Problems getting JNDI InitialContext.", ge
234: .getNested());
235: } catch (NamingException ne) {
236: JNDIContextFactory.clearInitialContext(serverName);
237: try {
238: jndi = JNDIContextFactory.getInitialContext(serverName);
239: factory = (QueueConnectionFactory) jndi
240: .lookup(jndiName);
241: } catch (GeneralException ge2) {
242: throw new GenericServiceException(
243: "Problems getting JNDI InitialContext.", ge2
244: .getNested());
245: } catch (NamingException ne2) {
246: throw new GenericServiceException(
247: "JNDI lookup problem.", ne2);
248: }
249: }
250:
251: try {
252: con = factory.createQueueConnection(userName, password);
253:
254: if (clientId != null && clientId.length() > 1)
255: con.setClientID(clientId);
256: con.start();
257:
258: QueueSession session = con.createQueueSession(false,
259: Session.AUTO_ACKNOWLEDGE);
260: Queue queue = (Queue) jndi.lookup(queueName);
261: QueueSender sender = session.createSender(queue);
262:
263: // create/send the message
264: Message message = makeMessage(session, modelService,
265: context);
266:
267: sender.send(message);
268: if (Debug.verboseOn())
269: Debug.logVerbose("Sent JMS Message to " + queueName,
270: module);
271:
272: // close the connections
273: sender.close();
274: session.close();
275: con.close();
276: } catch (NamingException ne) {
277: throw new GenericServiceException(
278: "Problems with JNDI lookup.", ne);
279: } catch (JMSException je) {
280: throw new GenericServiceException("JMS Internal Error.", je);
281: }
282: return ServiceUtil.returnSuccess();
283: }
284:
285: protected Map runXaQueue(ModelService modelService, Map context,
286: Element server) throws GenericServiceException {
287: String serverName = server.getAttribute("jndi-server-name");
288: String jndiName = server.getAttribute("jndi-name");
289: String queueName = server.getAttribute("topic-queue");
290: String userName = server.getAttribute("username");
291: String password = server.getAttribute("password");
292: String clientId = server.getAttribute("client-id");
293:
294: InitialContext jndi = null;
295: XAQueueConnectionFactory factory = null;
296: XAQueueConnection con = null;
297:
298: try {
299: jndi = JNDIContextFactory.getInitialContext(serverName);
300: factory = (XAQueueConnectionFactory) jndi.lookup(jndiName);
301: } catch (GeneralException ge) {
302: throw new GenericServiceException(
303: "Problems getting JNDI InitialContext.", ge
304: .getNested());
305: } catch (NamingException ne) {
306: JNDIContextFactory.clearInitialContext(serverName);
307: try {
308: jndi = JNDIContextFactory.getInitialContext(serverName);
309: factory = (XAQueueConnectionFactory) jndi
310: .lookup(jndiName);
311: } catch (GeneralException ge2) {
312: throw new GenericServiceException(
313: "Problems getting JNDI InitialContext.", ge2
314: .getNested());
315: } catch (NamingException ne2) {
316: throw new GenericServiceException(
317: "JNDI lookup problems.", ne2);
318: }
319: }
320:
321: try {
322: con = factory.createXAQueueConnection(userName, password);
323:
324: if (clientId != null && clientId.length() > 1)
325: con.setClientID(userName);
326: con.start();
327:
328: // enlist the XAResource
329: XAQueueSession session = con.createXAQueueSession();
330: XAResource resource = session.getXAResource();
331:
332: if (TransactionUtil.getStatus() == TransactionUtil.STATUS_ACTIVE)
333: TransactionUtil.enlistResource(resource);
334:
335: Queue queue = (Queue) jndi.lookup(queueName);
336: QueueSession qSession = session.getQueueSession();
337: QueueSender sender = qSession.createSender(queue);
338:
339: // create/send the message
340: Message message = makeMessage(session, modelService,
341: context);
342:
343: sender.send(message);
344:
345: if (TransactionUtil.getStatus() != TransactionUtil.STATUS_ACTIVE)
346: session.commit();
347:
348: Debug.logInfo("Message sent.", module);
349:
350: // close the connections
351: sender.close();
352: session.close();
353: con.close();
354: } catch (GenericTransactionException gte) {
355: throw new GenericServiceException(
356: "Problems enlisting resource w/ transaction manager.",
357: gte.getNested());
358: } catch (NamingException ne) {
359: throw new GenericServiceException(
360: "Problems with JNDI lookup.", ne);
361: } catch (JMSException je) {
362: throw new GenericServiceException("JMS Internal Error.", je);
363: }
364: return ServiceUtil.returnSuccess();
365: }
366:
367: protected Map run(ModelService modelService, Map context)
368: throws GenericServiceException {
369: Element serviceElement = getServiceElement(modelService);
370: List serverList = serverList(serviceElement);
371:
372: Map result = new HashMap();
373: Iterator i = serverList.iterator();
374:
375: while (i.hasNext()) {
376: Element server = (Element) i.next();
377: String serverType = server.getAttribute("type");
378:
379: if (serverType.equals("topic"))
380: result.putAll(runTopic(modelService, context, server));
381: else if (serverType.equals("queue"))
382: result.putAll(runQueue(modelService, context, server));
383: else
384: throw new GenericServiceException(
385: "Illegal server messaging type.");
386: }
387: return result;
388: }
389:
390: /**
391: * @see org.ofbiz.service.engine.GenericEngine#runSync(java.lang.String, org.ofbiz.service.ModelService, java.util.Map)
392: */
393: public Map runSync(String localName, ModelService modelService,
394: Map context) throws GenericServiceException {
395: return run(modelService, context);
396: }
397:
398: /**
399: * @see org.ofbiz.service.engine.GenericEngine#runSyncIgnore(java.lang.String, org.ofbiz.service.ModelService, java.util.Map)
400: */
401: public void runSyncIgnore(String localName,
402: ModelService modelService, Map context)
403: throws GenericServiceException {
404: run(modelService, context);
405: }
406:
407: /**
408: * @see org.ofbiz.service.engine.GenericEngine#runAsync(java.lang.String, org.ofbiz.service.ModelService, java.util.Map, org.ofbiz.service.GenericRequester, boolean)
409: */
410: public void runAsync(String localName, ModelService modelService,
411: Map context, GenericRequester requester, boolean persist)
412: throws GenericServiceException {
413: Map result = run(modelService, context);
414:
415: requester.receiveResult(result);
416: }
417:
418: /**
419: * @see org.ofbiz.service.engine.GenericEngine#runAsync(java.lang.String, org.ofbiz.service.ModelService, java.util.Map, boolean)
420: */
421: public void runAsync(String localName, ModelService modelService,
422: Map context, boolean persist)
423: throws GenericServiceException {
424: run(modelService, context);
425: }
426:
427: }
|