Source Code Cross Referenced for QueuerEJB.java in  » Workflow-Engines » wfmopen-2.1.1 » de » danet » an » workflow » ejbs » util » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Workflow Engines » wfmopen 2.1.1 » de.danet.an.workflow.ejbs.util 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        /*
002:         * This file is part of the WfMOpen project.
003:         * Copyright (C) 2001-2003 Danet GmbH (www.danet.de), GS-AN.
004:         * All rights reserved.
005:         *
006:         * This program is free software; you can redistribute it and/or modify
007:         * it under the terms of the GNU General Public License as published by
008:         * the Free Software Foundation; either version 2 of the License, or
009:         * (at your option) any later version.
010:         *
011:         * This program is distributed in the hope that it will be useful,
012:         * but WITHOUT ANY WARRANTY; without even the implied warranty of
013:         * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
014:         * GNU General Public License for more details.
015:         *
016:         * You should have received a copy of the GNU General Public License
017:         * along with this program; if not, write to the Free Software
018:         * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
019:         *
020:         * $Id: QueuerEJB.java,v 1.9 2007/03/27 21:59:44 mlipp Exp $
021:         *
022:         * $Log: QueuerEJB.java,v $
023:         * Revision 1.9  2007/03/27 21:59:44  mlipp
024:         * Fixed lots of checkstyle warnings.
025:         *
026:         * Revision 1.8  2007/01/18 09:57:15  drmlipp
027:         * Fixed problem with J2EE 1.4 allowing only one session per JMS connection.
028:         *
029:         * Revision 1.7  2006/10/11 09:05:54  drmlipp
030:         * Fixed EJB naming.
031:         *
032:         * Revision 1.6  2006/10/10 09:23:58  drmlipp
033:         * Made queue and topic names "generic".
034:         *
035:         * Revision 1.5  2006/09/29 12:32:12  drmlipp
036:         * Consistently using WfMOpen as projct name now.
037:         *
038:         * Revision 1.4  2005/04/08 11:28:05  drmlipp
039:         * Merged changes from 1.3 branch up to 1.3p6.
040:         *
041:         * Revision 1.3.2.2  2005/04/07 15:54:23  drmlipp
042:         * Fixed problem with process not being started. Removed fix that handled
043:         * special case of subprocess only.
044:         *
045:         * Revision 1.3.2.1  2005/04/04 20:09:20  drmlipp
046:         * Changed WLS transaction isolation.
047:         *
048:         * Revision 1.3  2005/01/07 14:58:59  drmlipp
049:         * Made Queuer EJB a local EJB. Not expecting significant performance
050:         * gains from this ;-), but it is somehow makes sense.
051:         *
052:         * Revision 1.2  2004/09/10 12:44:30  drmlipp
053:         * Enabled call by reference for weblogic by default.
054:         *
055:         * Revision 1.1.1.3  2004/08/18 15:17:38  drmlipp
056:         * Update to 1.2
057:         *
058:         * Revision 1.15  2004/07/04 17:36:03  lipp
059:         * Added JOnAS support.
060:         *
061:         * Revision 1.14  2004/07/02 15:10:24  lipp
062:         * Workaround for JBoss 3.2.3/3.2.5 incompatibility.
063:         *
064:         * Revision 1.13  2004/07/02 13:43:37  lipp
065:         * Fixed JMS usage.
066:         *
067:         * Revision 1.12  2004/02/13 08:25:29  lipp
068:         * Changed channel message data type to Map which is more appropriate.
069:         *
070:         * Revision 1.11  2004/02/06 13:37:35  lipp
071:         * Added channel close notification.
072:         *
073:         * Revision 1.10  2004/02/06 10:25:46  lipp
074:         * Finshed Receiver.
075:         *
076:         * Revision 1.9  2004/01/14 07:59:45  lipp
077:         * Added transaction isolation attribute for WLS.
078:         *
079:         * Revision 1.8  2003/11/21 14:56:21  lipp
080:         * Adapted wls queue names.
081:         *
082:         * Revision 1.7  2003/11/20 14:40:36  lipp
083:         * Using JmsXA now.
084:         *
085:         * Revision 1.6  2003/11/14 10:42:51  lipp
086:         * Using WLS default connection factory name.
087:         *
088:         * Revision 1.5  2003/11/06 16:30:00  lipp
089:         * Using proper JMS connection factory now.
090:         *
091:         * Revision 1.4  2003/11/04 10:08:22  lipp
092:         * Removed queuing optimization.
093:         *
094:         * Revision 1.3  2003/11/03 16:32:52  lipp
095:         * Fixed event saving.
096:         *
097:         * Revision 1.2  2003/10/25 22:20:46  lipp
098:         * Using in JVM connection factory whereever possible.
099:         *
100:         * Revision 1.1  2003/10/25 20:59:32  lipp
101:         * Made AuditEventQueuer the general queuer.
102:         *
103:         * Revision 1.1  2003/10/24 11:08:49  lipp
104:         * Made AuditEventQueuer an EJB.
105:         *
106:         */
107:        package de.danet.an.workflow.ejbs.util;
108:
109:        import java.io.Serializable;
110:
111:        import java.util.HashMap;
112:        import java.util.Map;
113:
114:        import java.rmi.RemoteException;
115:
116:        import javax.ejb.CreateException;
117:        import javax.ejb.EJBException;
118:        import javax.ejb.SessionBean;
119:        import javax.ejb.SessionContext;
120:        import javax.jms.JMSException;
121:        import javax.jms.Message;
122:        import javax.jms.ObjectMessage;
123:        import javax.jms.Queue;
124:        import javax.jms.QueueConnection;
125:        import javax.jms.QueueConnectionFactory;
126:        import javax.jms.QueueReceiver;
127:        import javax.jms.QueueSender;
128:        import javax.jms.QueueSession;
129:        import javax.jms.Topic;
130:        import javax.jms.TopicConnection;
131:        import javax.jms.TopicConnectionFactory;
132:        import javax.jms.TopicPublisher;
133:        import javax.jms.TopicSession;
134:
135:        import de.danet.an.util.EJBUtil;
136:        import de.danet.an.util.ResourceNotAvailableException;
137:
138:        import de.danet.an.workflow.domain.DefaultAuditEvent;
139:
140:        /**
141:         * This class provides a method to queue an event in the internal
142:         * event queue or a tool invocation in the tool invocation queue. We
143:         * use a session bean for this to enable pooling of queue connections
144:         * (as a result of pooling stateless session beans) by the
145:         * container. Opening the queue connections in e.g. entity beans
146:         * results in too many connections being open.
147:         *
148:         * @author <a href="mailto:lipp@danet.de">Michael Lipp</a>
149:         * @version $Revision: 1.9 $
150:         * @ejb.bean name="Queuer" display-name="Queuer EJB"
151:         * local-jndi-name="ejb/@@@_JNDI_Name_Prefix_@@@QueuerLocal"
152:         * type="Stateless" transaction-type="Container" view-type="local"
153:         * @jonas.bean ejb-name="Queuer"
154:         * @ejb.transaction type="Required"
155:         * @ejb.resource-ref res-ref-name="jms/QCF"
156:         * res-type="javax.jms.QueueConnectionFactory" res-auth="Container"
157:         * @jboss.resource-ref res-ref-name="jms/QCF" jndi-name="java:/JmsXA"
158:         * @jonas.resource res-ref-name="jms/QCF" jndi-name="QCF"
159:         * @weblogic.resource-description res-ref-name="jms/QCF" 
160:         * jndi-name="weblogic.jms.XAConnectionFactory"
161:         * @ejb.resource-ref res-ref-name="jms/TCF"
162:         * res-type="javax.jms.TopicConnectionFactory" res-auth="Container"
163:         * @jboss.resource-ref res-ref-name="jms/TCF" jndi-name="java:/JmsXA"
164:         * @jonas.resource res-ref-name="jms/TCF" jndi-name="TCF"
165:         * @weblogic.enable-call-by-reference True
166:         * @weblogic.resource-description res-ref-name="jms/TCF"
167:         * jndi-name="weblogic.jms.XAConnectionFactory"
168:         * @ejb.resource-ref res-ref-name="jms/InternalEventQueue"
169:         * res-type="javax.jms.Queue" res-auth="Container"
170:         * @jboss.resource-ref res-ref-name="jms/InternalEventQueue"
171:         * jndi-name="queue/@@@_JNDI_Name_Prefix_@@@InternalEventQueue"
172:         * @jonas.resource res-ref-name="jms/InternalEventQueue"
173:         * jndi-name="queue/@@@_JNDI_Name_Prefix_@@@InternalEventQueue"
174:         * @weblogic.resource-description res-ref-name="jms/InternalEventQueue"
175:         * jndi-name="queue/@@@_JNDI_Name_Prefix_@@@InternalEventQueue"
176:         * @ejb.resource-ref res-ref-name="jms/ApplicationInvocations"
177:         * res-type="javax.jms.Queue" res-auth="Container"
178:         * @jboss.resource-ref res-ref-name="jms/ApplicationInvocations"
179:         * jndi-name="queue/@@@_JNDI_Name_Prefix_@@@ApplicationInvocations"
180:         * @jonas.resource res-ref-name="jms/ApplicationInvocations"
181:         * jndi-name="queue/@@@_JNDI_Name_Prefix_@@@ApplicationInvocations"
182:         * @weblogic.resource-description res-ref-name="jms/ApplicationInvocations"
183:         * jndi-name="queue/@@@_JNDI_Name_Prefix_@@@ApplicationInvocations"
184:         * @ejb.resource-ref res-ref-name="jms/ChannelIn"
185:         * res-type="javax.jms.Queue" res-auth="Container"
186:         * @jonas.resource res-ref-name="jms/ChannelIn"
187:         * jndi-name="queue/@@@_JNDI_Name_Prefix_@@@ChannelInMessages"
188:         * @jboss.resource-ref res-ref-name="jms/ChannelIn"
189:         * jndi-name="queue/@@@_JNDI_Name_Prefix_@@@ChannelInMessages"
190:         * @weblogic.resource-description res-ref-name="jms/ChannelIn"
191:         * jndi-name="queue/@@@_JNDI_Name_Prefix_@@@ChannelInMessages"
192:         * @ejb.resource-ref res-ref-name="jms/ChannelOut"
193:         * res-type="javax.jms.Topic" res-auth="Container"
194:         * @jboss.resource-ref res-ref-name="jms/ChannelOut"
195:         * jndi-name="topic/@@@_JNDI_Name_Prefix_@@@ChannelOutMessages"
196:         * @jonas.resource res-ref-name="jms/ChannelOut"
197:         * jndi-name="topic/@@@_JNDI_Name_Prefix_@@@ChannelOutMessages"
198:         * @weblogic.resource-description res-ref-name="jms/ChannelOut"
199:         * jndi-name="topic/@@@_JNDI_Name_Prefix_@@@ChannelOutMessages"
200:         * @ejb.permission role-name="WfMOpenAdmin"
201:         */
202:
203:        public class QueuerEJB implements  SessionBean {
204:
205:            private static final org.apache.commons.logging.Log logger = org.apache.commons.logging.LogFactory
206:                    .getLog(QueuerEJB.class);
207:
208:            /** The SessionContext interface of the instance. */
209:            private SessionContext ctx;
210:
211:            private QueueConnection queueConnectionCache = null;
212:            private TopicConnection topicConnectionCache = null;
213:            private Queue eventQueueCache = null;
214:            private Queue invocQueueCache = null;
215:            private Queue channelInQueueCache = null;
216:            private Topic channelOutTopicCache = null;
217:
218:            private QueueConnection queueConnection()
219:                    throws ResourceNotAvailableException, JMSException {
220:                if (queueConnectionCache == null) {
221:                    queueConnectionCache = ((QueueConnectionFactory) EJBUtil
222:                            .retrieveJNDIEntry("java:comp/env/jms/QCF"))
223:                            .createQueueConnection();
224:                    try {
225:                        // Workaround for bug in JBoss: 3.2.3 throws exception
226:                        // if called, 3.2.5 doesn't work without this being
227:                        // called.
228:                        queueConnectionCache.start();
229:                    } catch (IllegalStateException e) {
230:                        if (!e.getMessage().equals(
231:                                "This method is not applicatable "
232:                                        + "in JMS resource adapter")) {
233:                            throw e;
234:                        }
235:                    }
236:                }
237:                return queueConnectionCache;
238:            }
239:
240:            private TopicConnection topicConnection()
241:                    throws ResourceNotAvailableException, JMSException {
242:                if (topicConnectionCache == null) {
243:                    topicConnectionCache = ((TopicConnectionFactory) EJBUtil
244:                            .retrieveJNDIEntry("java:comp/env/jms/TCF"))
245:                            .createTopicConnection();
246:                    try {
247:                        // Workaround for bug in JBoss: 3.2.3 throws exception
248:                        // if called, 3.2.5 doesn't work without this being
249:                        // called.
250:                        topicConnectionCache.start();
251:                    } catch (IllegalStateException e) {
252:                        if (!e.getMessage().equals(
253:                                "This method is not applicatable "
254:                                        + "in JMS resource adapter")) {
255:                            throw e;
256:                        }
257:                    }
258:                }
259:                return topicConnectionCache;
260:            }
261:
262:            private Queue eventQueue() throws ResourceNotAvailableException {
263:                if (eventQueueCache == null) {
264:                    eventQueueCache = (Queue) EJBUtil
265:                            .retrieveJNDIEntry("java:comp/env/jms/InternalEventQueue");
266:                }
267:                return eventQueueCache;
268:            }
269:
270:            private Queue invocQueue() throws ResourceNotAvailableException {
271:                if (invocQueueCache == null) {
272:                    invocQueueCache = (Queue) EJBUtil
273:                            .retrieveJNDIEntry("java:comp/env/jms/ApplicationInvocations");
274:                }
275:                return invocQueueCache;
276:            }
277:
278:            private Queue channelInQueue() throws ResourceNotAvailableException {
279:                if (channelInQueueCache == null) {
280:                    channelInQueueCache = (Queue) EJBUtil
281:                            .retrieveJNDIEntry("java:comp/env/jms/ChannelIn");
282:                }
283:                return channelInQueueCache;
284:            }
285:
286:            private Topic channelOutTopic()
287:                    throws ResourceNotAvailableException {
288:                if (channelOutTopicCache == null) {
289:                    channelOutTopicCache = (Topic) EJBUtil
290:                            .retrieveJNDIEntry("java:comp/env/jms/ChannelOut");
291:                }
292:                return channelOutTopicCache;
293:            }
294:
295:            /**
296:             * Set the associated session context. The container calls this method 
297:             * after the instance creation.
298:             * @see javax.ejb.SessionBean
299:             * @param context a SessionContext interface for the instance
300:             */
301:            public void setSessionContext(SessionContext context) {
302:                ctx = context;
303:            }
304:
305:            /**
306:             * Create an new instance of the EJB.
307:             * @throws CreateException if the EJB can not be create.
308:             * @ejb.create-method view-type="local"
309:             */
310:            public void ejbCreate() throws CreateException {
311:                queueConnectionCache = null;
312:                logger.debug("Created.");
313:            }
314:
315:            /**
316:             * The activate method is called when the instance is activated from its 
317:             * "passive" state. The instance should acquire any resource that it has 
318:             * released earlier in the ejbPassivate() method.
319:             * @see javax.ejb.SessionBean
320:             */
321:            public void ejbActivate() {
322:                // nothing to do
323:            }
324:
325:            /**
326:             * The passivate method is called before the instance enters the 
327:             * "passive" state. The instance should release any resources that it 
328:             * can re-acquire later in the ejbActivate() method.
329:             * @see javax.ejb.SessionBean
330:             */
331:            public void ejbPassivate() {
332:                // nothing to do
333:            }
334:
335:            /**
336:             * A container invokes this method before it ends the life of the session 
337:             * object. This happens as a result of a client's invoking a remove 
338:             * operation, or when a container decides to terminate the session object 
339:             * after a timeout.
340:             * @see javax.ejb.SessionBean
341:             */
342:            public void ejbRemove() {
343:                try {
344:                    if (queueConnectionCache != null) {
345:                        queueConnectionCache.close();
346:                    }
347:                } catch (JMSException e) {
348:                    logger.warn("Problem closing queue connection (ignored): "
349:                            + e.getMessage(), e);
350:                }
351:                queueConnectionCache = null;
352:                try {
353:                    if (topicConnectionCache != null) {
354:                        topicConnectionCache.close();
355:                    }
356:                } catch (JMSException e) {
357:                    logger.warn("Problem closing queue connection (ignored): "
358:                            + e.getMessage(), e);
359:                }
360:                topicConnectionCache = null;
361:                logger.debug("Removed.");
362:            }
363:
364:            /**
365:             * Queue the given event.
366:             *
367:             * @param evt the <code>WfAuditEvent</code>.
368:             * @ejb.interface-method view-type="local"
369:             */
370:            public void queue(DefaultAuditEvent evt) {
371:                try {
372:                    QueueSession qs = queueConnection().createQueueSession(
373:                            true, 0);
374:                    QueueSender snd = qs.createSender(eventQueue());
375:                    snd.setDisableMessageID(true);
376:                    snd.setDisableMessageTimestamp(true);
377:                    Map args = new HashMap();
378:                    args.put("event", evt.replaceSource(null));
379:                    ObjectMessage msg = qs.createObjectMessage();
380:                    msg.setObject((Serializable) args);
381:                    snd.send(msg);
382:                    snd.close();
383:                    qs.close();
384:                    if (logger.isDebugEnabled()) {
385:                        logger.debug("Queued with JMS: " + evt.toString());
386:                    }
387:                } catch (JMSException e) {
388:                    throw new EJBException(e);
389:                } catch (RemoteException e) {
390:                    throw new EJBException(e);
391:                }
392:            }
393:
394:            /**
395:             * Queue the given event with a requeued count.
396:             *
397:             * @param evt the <code>WfAuditEvent</code>
398:             * @param requeued the number of times this has been queued
399:             * @ejb.interface-method view-type="local"
400:             */
401:            public void queue(DefaultAuditEvent evt, int requeued) {
402:                try {
403:                    QueueSession qs = queueConnection().createQueueSession(
404:                            true, 0);
405:                    QueueSender snd = qs.createSender(eventQueue());
406:                    snd.setDisableMessageID(true);
407:                    snd.setDisableMessageTimestamp(true);
408:                    Map args = new HashMap();
409:                    args.put("event", evt.replaceSource(null));
410:                    ObjectMessage msg = qs.createObjectMessage();
411:                    msg.setIntProperty("requeuedCount", requeued);
412:                    msg.setObject((Serializable) args);
413:                    snd.send(msg);
414:                    snd.close();
415:                    qs.close();
416:                    if (logger.isDebugEnabled()) {
417:                        logger.debug("Queued with JMS: " + evt.toString());
418:                    }
419:                } catch (JMSException e) {
420:                    throw new EJBException(e);
421:                } catch (RemoteException e) {
422:                    throw new EJBException(e);
423:                }
424:            }
425:
426:            /**
427:             * Queue the given invocation.
428:             *
429:             * @param args the invocation arguments
430:             * @ejb.interface-method view-type="local"
431:             */
432:            public void queueToolInvocation(Map args) {
433:                try {
434:                    QueueSession qs = queueConnection().createQueueSession(
435:                            true, 0);
436:                    QueueSender snd = qs.createSender(invocQueue());
437:                    snd.setDisableMessageID(true);
438:                    snd.setDisableMessageTimestamp(true);
439:                    ObjectMessage msg = qs.createObjectMessage();
440:                    msg.setObject((Serializable) args);
441:                    snd.send(msg);
442:                    snd.close();
443:                    qs.close();
444:                } catch (JMSException e) {
445:                    throw new EJBException(e);
446:                } catch (RemoteException e) {
447:                    throw new EJBException(e);
448:                }
449:            }
450:
451:            /**
452:             * Queue the given channel message.
453:             *
454:             * @param processKey the process key
455:             * @param channel the channel
456:             * @param message the message
457:             * @ejb.interface-method view-type="local"
458:             */
459:            public void queueChannelMessage(String processKey, String channel,
460:                    Map message) {
461:                try {
462:                    QueueSession qs = queueConnection().createQueueSession(
463:                            true, 0);
464:                    QueueSender snd = qs.createSender(channelInQueue());
465:                    snd.setDisableMessageID(true);
466:                    snd.setDisableMessageTimestamp(true);
467:                    ObjectMessage msg = qs
468:                            .createObjectMessage((Serializable) message);
469:                    msg.setStringProperty("processKey", processKey);
470:                    msg.setStringProperty("channelName", channel);
471:                    snd.send(msg);
472:                    snd.close();
473:                    qs.close();
474:                } catch (JMSException e) {
475:                    throw new EJBException(e);
476:                } catch (RemoteException e) {
477:                    throw new EJBException(e);
478:                }
479:            }
480:
481:            /**
482:             * Looks for a message for the given process and channel on the
483:             * channel in queue and if found return it.
484:             *
485:             * @param processKey the process key
486:             * @param channel the channel
487:             * @return the message or <code>null</code>
488:             * @ejb.interface-method view-type="local"
489:             */
490:            public Map lookForChannelMessage(String processKey, String channel) {
491:                try {
492:                    QueueSession qs = queueConnection().createQueueSession(
493:                            true, 0);
494:                    QueueReceiver rec = qs.createReceiver(channelInQueue(),
495:                            "processKey = '" + processKey + "'"
496:                                    + " AND channelName = '" + channel + "'");
497:                    Message msg = rec.receiveNoWait();
498:                    Map result = null;
499:                    if (msg != null && (msg instanceof  ObjectMessage)) {
500:                        result = (Map) ((ObjectMessage) msg).getObject();
501:                    }
502:                    rec.close();
503:                    qs.close();
504:                    return result;
505:                } catch (JMSException e) {
506:                    throw new EJBException(e);
507:                } catch (RemoteException e) {
508:                    throw new EJBException(e);
509:                }
510:            }
511:
512:            /**
513:             * Broadcast the given channel message.
514:             *
515:             * @param processKey the process key
516:             * @param channel the channel
517:             * @param message the message
518:             * @ejb.interface-method view-type="local"
519:             */
520:            public void broadcastChannelMessage(String processKey,
521:                    String channel, Map data) {
522:                try {
523:                    TopicSession ts = topicConnection().createTopicSession(
524:                            true, 0);
525:                    TopicPublisher sndr = ts.createPublisher(channelOutTopic());
526:                    sndr.setDisableMessageID(true);
527:                    ObjectMessage msg = ts
528:                            .createObjectMessage((Serializable) data);
529:                    msg.setStringProperty("processKey", processKey);
530:                    msg.setStringProperty("channelName", channel);
531:                    msg.setStringProperty("messageType", "DATA");
532:                    sndr.publish(msg);
533:                    sndr.close();
534:                    ts.close();
535:
536:                    // Now clean up in queue
537:                    QueueSession qs = queueConnection().createQueueSession(
538:                            true, 0);
539:                    QueueReceiver rec = qs.createReceiver(channelInQueue(),
540:                            "processKey = '" + processKey + "'");
541:                    while (rec.receiveNoWait() != null) {
542:                    }
543:                    rec.close();
544:                    qs.close();
545:                } catch (JMSException e) {
546:                    throw new EJBException(e);
547:                } catch (RemoteException e) {
548:                    throw new EJBException(e);
549:                }
550:            }
551:
552:            /**
553:             * Send a message about process completion and cleanup in queue.
554:             *
555:             * @param processKey the process key
556:             * @ejb.interface-method view-type="local"
557:             */
558:            public void closeChannels(String processKey) {
559:                try {
560:                    TopicSession ts = topicConnection().createTopicSession(
561:                            true, 0);
562:                    TopicPublisher sndr = ts.createPublisher(channelOutTopic());
563:                    sndr.setDisableMessageID(true);
564:                    Message msg = ts.createMessage();
565:                    msg.setStringProperty("processKey", processKey);
566:                    msg.setStringProperty("messageType", "CLOSE_NOTIFICATION");
567:                    sndr.publish(msg);
568:                    sndr.close();
569:                    ts.close();
570:
571:                    // Now clean up in queue
572:                    QueueSession qs = queueConnection().createQueueSession(
573:                            true, 0);
574:                    QueueReceiver rec = qs.createReceiver(channelInQueue(),
575:                            "processKey = '" + processKey + "'");
576:                    while (rec.receiveNoWait() != null) {
577:                    }
578:                    rec.close();
579:                    qs.close();
580:                } catch (JMSException e) {
581:                    throw new EJBException(e);
582:                } catch (RemoteException e) {
583:                    throw new EJBException(e);
584:                }
585:            }
586:
587:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.