001: /*
002: * This file is part of the WfMOpen project.
003: * Copyright (C) 2001-2003 Danet GmbH (www.danet.de), GS-AN.
004: * All rights reserved.
005: *
006: * This program is free software; you can redistribute it and/or modify
007: * it under the terms of the GNU General Public License as published by
008: * the Free Software Foundation; either version 2 of the License, or
009: * (at your option) any later version.
010: *
011: * This program is distributed in the hope that it will be useful,
012: * but WITHOUT ANY WARRANTY; without even the implied warranty of
013: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
014: * GNU General Public License for more details.
015: *
016: * You should have received a copy of the GNU General Public License
017: * along with this program; if not, write to the Free Software
018: * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
019: *
020: * $Id: QueuerEJB.java,v 1.9 2007/03/27 21:59:44 mlipp Exp $
021: *
022: * $Log: QueuerEJB.java,v $
023: * Revision 1.9 2007/03/27 21:59:44 mlipp
024: * Fixed lots of checkstyle warnings.
025: *
026: * Revision 1.8 2007/01/18 09:57:15 drmlipp
027: * Fixed problem with J2EE 1.4 allowing only one session per JMS connection.
028: *
029: * Revision 1.7 2006/10/11 09:05:54 drmlipp
030: * Fixed EJB naming.
031: *
032: * Revision 1.6 2006/10/10 09:23:58 drmlipp
033: * Made queue and topic names "generic".
034: *
035: * Revision 1.5 2006/09/29 12:32:12 drmlipp
036: * Consistently using WfMOpen as projct name now.
037: *
038: * Revision 1.4 2005/04/08 11:28:05 drmlipp
039: * Merged changes from 1.3 branch up to 1.3p6.
040: *
041: * Revision 1.3.2.2 2005/04/07 15:54:23 drmlipp
042: * Fixed problem with process not being started. Removed fix that handled
043: * special case of subprocess only.
044: *
045: * Revision 1.3.2.1 2005/04/04 20:09:20 drmlipp
046: * Changed WLS transaction isolation.
047: *
048: * Revision 1.3 2005/01/07 14:58:59 drmlipp
049: * Made Queuer EJB a local EJB. Not expecting significant performance
050: * gains from this ;-), but it is somehow makes sense.
051: *
052: * Revision 1.2 2004/09/10 12:44:30 drmlipp
053: * Enabled call by reference for weblogic by default.
054: *
055: * Revision 1.1.1.3 2004/08/18 15:17:38 drmlipp
056: * Update to 1.2
057: *
058: * Revision 1.15 2004/07/04 17:36:03 lipp
059: * Added JOnAS support.
060: *
061: * Revision 1.14 2004/07/02 15:10:24 lipp
062: * Workaround for JBoss 3.2.3/3.2.5 incompatibility.
063: *
064: * Revision 1.13 2004/07/02 13:43:37 lipp
065: * Fixed JMS usage.
066: *
067: * Revision 1.12 2004/02/13 08:25:29 lipp
068: * Changed channel message data type to Map which is more appropriate.
069: *
070: * Revision 1.11 2004/02/06 13:37:35 lipp
071: * Added channel close notification.
072: *
073: * Revision 1.10 2004/02/06 10:25:46 lipp
074: * Finshed Receiver.
075: *
076: * Revision 1.9 2004/01/14 07:59:45 lipp
077: * Added transaction isolation attribute for WLS.
078: *
079: * Revision 1.8 2003/11/21 14:56:21 lipp
080: * Adapted wls queue names.
081: *
082: * Revision 1.7 2003/11/20 14:40:36 lipp
083: * Using JmsXA now.
084: *
085: * Revision 1.6 2003/11/14 10:42:51 lipp
086: * Using WLS default connection factory name.
087: *
088: * Revision 1.5 2003/11/06 16:30:00 lipp
089: * Using proper JMS connection factory now.
090: *
091: * Revision 1.4 2003/11/04 10:08:22 lipp
092: * Removed queuing optimization.
093: *
094: * Revision 1.3 2003/11/03 16:32:52 lipp
095: * Fixed event saving.
096: *
097: * Revision 1.2 2003/10/25 22:20:46 lipp
098: * Using in JVM connection factory whereever possible.
099: *
100: * Revision 1.1 2003/10/25 20:59:32 lipp
101: * Made AuditEventQueuer the general queuer.
102: *
103: * Revision 1.1 2003/10/24 11:08:49 lipp
104: * Made AuditEventQueuer an EJB.
105: *
106: */
107: package de.danet.an.workflow.ejbs.util;
108:
109: import java.io.Serializable;
110:
111: import java.util.HashMap;
112: import java.util.Map;
113:
114: import java.rmi.RemoteException;
115:
116: import javax.ejb.CreateException;
117: import javax.ejb.EJBException;
118: import javax.ejb.SessionBean;
119: import javax.ejb.SessionContext;
120: import javax.jms.JMSException;
121: import javax.jms.Message;
122: import javax.jms.ObjectMessage;
123: import javax.jms.Queue;
124: import javax.jms.QueueConnection;
125: import javax.jms.QueueConnectionFactory;
126: import javax.jms.QueueReceiver;
127: import javax.jms.QueueSender;
128: import javax.jms.QueueSession;
129: import javax.jms.Topic;
130: import javax.jms.TopicConnection;
131: import javax.jms.TopicConnectionFactory;
132: import javax.jms.TopicPublisher;
133: import javax.jms.TopicSession;
134:
135: import de.danet.an.util.EJBUtil;
136: import de.danet.an.util.ResourceNotAvailableException;
137:
138: import de.danet.an.workflow.domain.DefaultAuditEvent;
139:
140: /**
141: * This class provides a method to queue an event in the internal
142: * event queue or a tool invocation in the tool invocation queue. We
143: * use a session bean for this to enable pooling of queue connections
144: * (as a result of pooling stateless session beans) by the
145: * container. Opening the queue connections in e.g. entity beans
146: * results in too many connections being open.
147: *
148: * @author <a href="mailto:lipp@danet.de">Michael Lipp</a>
149: * @version $Revision: 1.9 $
150: * @ejb.bean name="Queuer" display-name="Queuer EJB"
151: * local-jndi-name="ejb/@@@_JNDI_Name_Prefix_@@@QueuerLocal"
152: * type="Stateless" transaction-type="Container" view-type="local"
153: * @jonas.bean ejb-name="Queuer"
154: * @ejb.transaction type="Required"
155: * @ejb.resource-ref res-ref-name="jms/QCF"
156: * res-type="javax.jms.QueueConnectionFactory" res-auth="Container"
157: * @jboss.resource-ref res-ref-name="jms/QCF" jndi-name="java:/JmsXA"
158: * @jonas.resource res-ref-name="jms/QCF" jndi-name="QCF"
159: * @weblogic.resource-description res-ref-name="jms/QCF"
160: * jndi-name="weblogic.jms.XAConnectionFactory"
161: * @ejb.resource-ref res-ref-name="jms/TCF"
162: * res-type="javax.jms.TopicConnectionFactory" res-auth="Container"
163: * @jboss.resource-ref res-ref-name="jms/TCF" jndi-name="java:/JmsXA"
164: * @jonas.resource res-ref-name="jms/TCF" jndi-name="TCF"
165: * @weblogic.enable-call-by-reference True
166: * @weblogic.resource-description res-ref-name="jms/TCF"
167: * jndi-name="weblogic.jms.XAConnectionFactory"
168: * @ejb.resource-ref res-ref-name="jms/InternalEventQueue"
169: * res-type="javax.jms.Queue" res-auth="Container"
170: * @jboss.resource-ref res-ref-name="jms/InternalEventQueue"
171: * jndi-name="queue/@@@_JNDI_Name_Prefix_@@@InternalEventQueue"
172: * @jonas.resource res-ref-name="jms/InternalEventQueue"
173: * jndi-name="queue/@@@_JNDI_Name_Prefix_@@@InternalEventQueue"
174: * @weblogic.resource-description res-ref-name="jms/InternalEventQueue"
175: * jndi-name="queue/@@@_JNDI_Name_Prefix_@@@InternalEventQueue"
176: * @ejb.resource-ref res-ref-name="jms/ApplicationInvocations"
177: * res-type="javax.jms.Queue" res-auth="Container"
178: * @jboss.resource-ref res-ref-name="jms/ApplicationInvocations"
179: * jndi-name="queue/@@@_JNDI_Name_Prefix_@@@ApplicationInvocations"
180: * @jonas.resource res-ref-name="jms/ApplicationInvocations"
181: * jndi-name="queue/@@@_JNDI_Name_Prefix_@@@ApplicationInvocations"
182: * @weblogic.resource-description res-ref-name="jms/ApplicationInvocations"
183: * jndi-name="queue/@@@_JNDI_Name_Prefix_@@@ApplicationInvocations"
184: * @ejb.resource-ref res-ref-name="jms/ChannelIn"
185: * res-type="javax.jms.Queue" res-auth="Container"
186: * @jonas.resource res-ref-name="jms/ChannelIn"
187: * jndi-name="queue/@@@_JNDI_Name_Prefix_@@@ChannelInMessages"
188: * @jboss.resource-ref res-ref-name="jms/ChannelIn"
189: * jndi-name="queue/@@@_JNDI_Name_Prefix_@@@ChannelInMessages"
190: * @weblogic.resource-description res-ref-name="jms/ChannelIn"
191: * jndi-name="queue/@@@_JNDI_Name_Prefix_@@@ChannelInMessages"
192: * @ejb.resource-ref res-ref-name="jms/ChannelOut"
193: * res-type="javax.jms.Topic" res-auth="Container"
194: * @jboss.resource-ref res-ref-name="jms/ChannelOut"
195: * jndi-name="topic/@@@_JNDI_Name_Prefix_@@@ChannelOutMessages"
196: * @jonas.resource res-ref-name="jms/ChannelOut"
197: * jndi-name="topic/@@@_JNDI_Name_Prefix_@@@ChannelOutMessages"
198: * @weblogic.resource-description res-ref-name="jms/ChannelOut"
199: * jndi-name="topic/@@@_JNDI_Name_Prefix_@@@ChannelOutMessages"
200: * @ejb.permission role-name="WfMOpenAdmin"
201: */
202:
203: public class QueuerEJB implements SessionBean {
204:
205: private static final org.apache.commons.logging.Log logger = org.apache.commons.logging.LogFactory
206: .getLog(QueuerEJB.class);
207:
208: /** The SessionContext interface of the instance. */
209: private SessionContext ctx;
210:
211: private QueueConnection queueConnectionCache = null;
212: private TopicConnection topicConnectionCache = null;
213: private Queue eventQueueCache = null;
214: private Queue invocQueueCache = null;
215: private Queue channelInQueueCache = null;
216: private Topic channelOutTopicCache = null;
217:
218: private QueueConnection queueConnection()
219: throws ResourceNotAvailableException, JMSException {
220: if (queueConnectionCache == null) {
221: queueConnectionCache = ((QueueConnectionFactory) EJBUtil
222: .retrieveJNDIEntry("java:comp/env/jms/QCF"))
223: .createQueueConnection();
224: try {
225: // Workaround for bug in JBoss: 3.2.3 throws exception
226: // if called, 3.2.5 doesn't work without this being
227: // called.
228: queueConnectionCache.start();
229: } catch (IllegalStateException e) {
230: if (!e.getMessage().equals(
231: "This method is not applicatable "
232: + "in JMS resource adapter")) {
233: throw e;
234: }
235: }
236: }
237: return queueConnectionCache;
238: }
239:
240: private TopicConnection topicConnection()
241: throws ResourceNotAvailableException, JMSException {
242: if (topicConnectionCache == null) {
243: topicConnectionCache = ((TopicConnectionFactory) EJBUtil
244: .retrieveJNDIEntry("java:comp/env/jms/TCF"))
245: .createTopicConnection();
246: try {
247: // Workaround for bug in JBoss: 3.2.3 throws exception
248: // if called, 3.2.5 doesn't work without this being
249: // called.
250: topicConnectionCache.start();
251: } catch (IllegalStateException e) {
252: if (!e.getMessage().equals(
253: "This method is not applicatable "
254: + "in JMS resource adapter")) {
255: throw e;
256: }
257: }
258: }
259: return topicConnectionCache;
260: }
261:
262: private Queue eventQueue() throws ResourceNotAvailableException {
263: if (eventQueueCache == null) {
264: eventQueueCache = (Queue) EJBUtil
265: .retrieveJNDIEntry("java:comp/env/jms/InternalEventQueue");
266: }
267: return eventQueueCache;
268: }
269:
270: private Queue invocQueue() throws ResourceNotAvailableException {
271: if (invocQueueCache == null) {
272: invocQueueCache = (Queue) EJBUtil
273: .retrieveJNDIEntry("java:comp/env/jms/ApplicationInvocations");
274: }
275: return invocQueueCache;
276: }
277:
278: private Queue channelInQueue() throws ResourceNotAvailableException {
279: if (channelInQueueCache == null) {
280: channelInQueueCache = (Queue) EJBUtil
281: .retrieveJNDIEntry("java:comp/env/jms/ChannelIn");
282: }
283: return channelInQueueCache;
284: }
285:
286: private Topic channelOutTopic()
287: throws ResourceNotAvailableException {
288: if (channelOutTopicCache == null) {
289: channelOutTopicCache = (Topic) EJBUtil
290: .retrieveJNDIEntry("java:comp/env/jms/ChannelOut");
291: }
292: return channelOutTopicCache;
293: }
294:
295: /**
296: * Set the associated session context. The container calls this method
297: * after the instance creation.
298: * @see javax.ejb.SessionBean
299: * @param context a SessionContext interface for the instance
300: */
301: public void setSessionContext(SessionContext context) {
302: ctx = context;
303: }
304:
305: /**
306: * Create an new instance of the EJB.
307: * @throws CreateException if the EJB can not be create.
308: * @ejb.create-method view-type="local"
309: */
310: public void ejbCreate() throws CreateException {
311: queueConnectionCache = null;
312: logger.debug("Created.");
313: }
314:
315: /**
316: * The activate method is called when the instance is activated from its
317: * "passive" state. The instance should acquire any resource that it has
318: * released earlier in the ejbPassivate() method.
319: * @see javax.ejb.SessionBean
320: */
321: public void ejbActivate() {
322: // nothing to do
323: }
324:
325: /**
326: * The passivate method is called before the instance enters the
327: * "passive" state. The instance should release any resources that it
328: * can re-acquire later in the ejbActivate() method.
329: * @see javax.ejb.SessionBean
330: */
331: public void ejbPassivate() {
332: // nothing to do
333: }
334:
335: /**
336: * A container invokes this method before it ends the life of the session
337: * object. This happens as a result of a client's invoking a remove
338: * operation, or when a container decides to terminate the session object
339: * after a timeout.
340: * @see javax.ejb.SessionBean
341: */
342: public void ejbRemove() {
343: try {
344: if (queueConnectionCache != null) {
345: queueConnectionCache.close();
346: }
347: } catch (JMSException e) {
348: logger.warn("Problem closing queue connection (ignored): "
349: + e.getMessage(), e);
350: }
351: queueConnectionCache = null;
352: try {
353: if (topicConnectionCache != null) {
354: topicConnectionCache.close();
355: }
356: } catch (JMSException e) {
357: logger.warn("Problem closing queue connection (ignored): "
358: + e.getMessage(), e);
359: }
360: topicConnectionCache = null;
361: logger.debug("Removed.");
362: }
363:
364: /**
365: * Queue the given event.
366: *
367: * @param evt the <code>WfAuditEvent</code>.
368: * @ejb.interface-method view-type="local"
369: */
370: public void queue(DefaultAuditEvent evt) {
371: try {
372: QueueSession qs = queueConnection().createQueueSession(
373: true, 0);
374: QueueSender snd = qs.createSender(eventQueue());
375: snd.setDisableMessageID(true);
376: snd.setDisableMessageTimestamp(true);
377: Map args = new HashMap();
378: args.put("event", evt.replaceSource(null));
379: ObjectMessage msg = qs.createObjectMessage();
380: msg.setObject((Serializable) args);
381: snd.send(msg);
382: snd.close();
383: qs.close();
384: if (logger.isDebugEnabled()) {
385: logger.debug("Queued with JMS: " + evt.toString());
386: }
387: } catch (JMSException e) {
388: throw new EJBException(e);
389: } catch (RemoteException e) {
390: throw new EJBException(e);
391: }
392: }
393:
394: /**
395: * Queue the given event with a requeued count.
396: *
397: * @param evt the <code>WfAuditEvent</code>
398: * @param requeued the number of times this has been queued
399: * @ejb.interface-method view-type="local"
400: */
401: public void queue(DefaultAuditEvent evt, int requeued) {
402: try {
403: QueueSession qs = queueConnection().createQueueSession(
404: true, 0);
405: QueueSender snd = qs.createSender(eventQueue());
406: snd.setDisableMessageID(true);
407: snd.setDisableMessageTimestamp(true);
408: Map args = new HashMap();
409: args.put("event", evt.replaceSource(null));
410: ObjectMessage msg = qs.createObjectMessage();
411: msg.setIntProperty("requeuedCount", requeued);
412: msg.setObject((Serializable) args);
413: snd.send(msg);
414: snd.close();
415: qs.close();
416: if (logger.isDebugEnabled()) {
417: logger.debug("Queued with JMS: " + evt.toString());
418: }
419: } catch (JMSException e) {
420: throw new EJBException(e);
421: } catch (RemoteException e) {
422: throw new EJBException(e);
423: }
424: }
425:
426: /**
427: * Queue the given invocation.
428: *
429: * @param args the invocation arguments
430: * @ejb.interface-method view-type="local"
431: */
432: public void queueToolInvocation(Map args) {
433: try {
434: QueueSession qs = queueConnection().createQueueSession(
435: true, 0);
436: QueueSender snd = qs.createSender(invocQueue());
437: snd.setDisableMessageID(true);
438: snd.setDisableMessageTimestamp(true);
439: ObjectMessage msg = qs.createObjectMessage();
440: msg.setObject((Serializable) args);
441: snd.send(msg);
442: snd.close();
443: qs.close();
444: } catch (JMSException e) {
445: throw new EJBException(e);
446: } catch (RemoteException e) {
447: throw new EJBException(e);
448: }
449: }
450:
451: /**
452: * Queue the given channel message.
453: *
454: * @param processKey the process key
455: * @param channel the channel
456: * @param message the message
457: * @ejb.interface-method view-type="local"
458: */
459: public void queueChannelMessage(String processKey, String channel,
460: Map message) {
461: try {
462: QueueSession qs = queueConnection().createQueueSession(
463: true, 0);
464: QueueSender snd = qs.createSender(channelInQueue());
465: snd.setDisableMessageID(true);
466: snd.setDisableMessageTimestamp(true);
467: ObjectMessage msg = qs
468: .createObjectMessage((Serializable) message);
469: msg.setStringProperty("processKey", processKey);
470: msg.setStringProperty("channelName", channel);
471: snd.send(msg);
472: snd.close();
473: qs.close();
474: } catch (JMSException e) {
475: throw new EJBException(e);
476: } catch (RemoteException e) {
477: throw new EJBException(e);
478: }
479: }
480:
481: /**
482: * Looks for a message for the given process and channel on the
483: * channel in queue and if found return it.
484: *
485: * @param processKey the process key
486: * @param channel the channel
487: * @return the message or <code>null</code>
488: * @ejb.interface-method view-type="local"
489: */
490: public Map lookForChannelMessage(String processKey, String channel) {
491: try {
492: QueueSession qs = queueConnection().createQueueSession(
493: true, 0);
494: QueueReceiver rec = qs.createReceiver(channelInQueue(),
495: "processKey = '" + processKey + "'"
496: + " AND channelName = '" + channel + "'");
497: Message msg = rec.receiveNoWait();
498: Map result = null;
499: if (msg != null && (msg instanceof ObjectMessage)) {
500: result = (Map) ((ObjectMessage) msg).getObject();
501: }
502: rec.close();
503: qs.close();
504: return result;
505: } catch (JMSException e) {
506: throw new EJBException(e);
507: } catch (RemoteException e) {
508: throw new EJBException(e);
509: }
510: }
511:
512: /**
513: * Broadcast the given channel message.
514: *
515: * @param processKey the process key
516: * @param channel the channel
517: * @param message the message
518: * @ejb.interface-method view-type="local"
519: */
520: public void broadcastChannelMessage(String processKey,
521: String channel, Map data) {
522: try {
523: TopicSession ts = topicConnection().createTopicSession(
524: true, 0);
525: TopicPublisher sndr = ts.createPublisher(channelOutTopic());
526: sndr.setDisableMessageID(true);
527: ObjectMessage msg = ts
528: .createObjectMessage((Serializable) data);
529: msg.setStringProperty("processKey", processKey);
530: msg.setStringProperty("channelName", channel);
531: msg.setStringProperty("messageType", "DATA");
532: sndr.publish(msg);
533: sndr.close();
534: ts.close();
535:
536: // Now clean up in queue
537: QueueSession qs = queueConnection().createQueueSession(
538: true, 0);
539: QueueReceiver rec = qs.createReceiver(channelInQueue(),
540: "processKey = '" + processKey + "'");
541: while (rec.receiveNoWait() != null) {
542: }
543: rec.close();
544: qs.close();
545: } catch (JMSException e) {
546: throw new EJBException(e);
547: } catch (RemoteException e) {
548: throw new EJBException(e);
549: }
550: }
551:
552: /**
553: * Send a message about process completion and cleanup in queue.
554: *
555: * @param processKey the process key
556: * @ejb.interface-method view-type="local"
557: */
558: public void closeChannels(String processKey) {
559: try {
560: TopicSession ts = topicConnection().createTopicSession(
561: true, 0);
562: TopicPublisher sndr = ts.createPublisher(channelOutTopic());
563: sndr.setDisableMessageID(true);
564: Message msg = ts.createMessage();
565: msg.setStringProperty("processKey", processKey);
566: msg.setStringProperty("messageType", "CLOSE_NOTIFICATION");
567: sndr.publish(msg);
568: sndr.close();
569: ts.close();
570:
571: // Now clean up in queue
572: QueueSession qs = queueConnection().createQueueSession(
573: true, 0);
574: QueueReceiver rec = qs.createReceiver(channelInQueue(),
575: "processKey = '" + processKey + "'");
576: while (rec.receiveNoWait() != null) {
577: }
578: rec.close();
579: qs.close();
580: } catch (JMSException e) {
581: throw new EJBException(e);
582: } catch (RemoteException e) {
583: throw new EJBException(e);
584: }
585: }
586:
587: }
|