Source Code Cross Referenced for AbstractConsumerEndpoint.java in  » ESB » servicemix » org » apache » servicemix » jms » endpoints » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » ESB » servicemix » org.apache.servicemix.jms.endpoints 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        /*
002:         * Licensed to the Apache Software Foundation (ASF) under one or more
003:         * contributor license agreements.  See the NOTICE file distributed with
004:         * this work for additional information regarding copyright ownership.
005:         * The ASF licenses this file to You under the Apache License, Version 2.0
006:         * (the "License"); you may not use this file except in compliance with
007:         * the License.  You may obtain a copy of the License at
008:         *
009:         *      http://www.apache.org/licenses/LICENSE-2.0
010:         *
011:         * Unless required by applicable law or agreed to in writing, software
012:         * distributed under the License is distributed on an "AS IS" BASIS,
013:         * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014:         * See the License for the specific language governing permissions and
015:         * limitations under the License.
016:         */
017:        package org.apache.servicemix.jms.endpoints;
018:
019:        import java.util.Map;
020:
021:        import javax.jbi.JBIException;
022:        import javax.jbi.messaging.ExchangeStatus;
023:        import javax.jbi.messaging.InOnly;
024:        import javax.jbi.messaging.MessageExchange;
025:        import javax.jbi.servicedesc.ServiceEndpoint;
026:        import javax.jms.ConnectionFactory;
027:        import javax.jms.Destination;
028:        import javax.jms.JMSException;
029:        import javax.jms.Message;
030:        import javax.jms.MessageProducer;
031:        import javax.jms.Session;
032:        import javax.xml.namespace.QName;
033:
034:        import org.apache.servicemix.common.DefaultComponent;
035:        import org.apache.servicemix.common.ServiceUnit;
036:        import org.apache.servicemix.common.endpoints.ConsumerEndpoint;
037:        import org.apache.servicemix.jms.endpoints.JmsConsumerMarshaler.JmsContext;
038:        import org.apache.servicemix.store.Store;
039:        import org.apache.servicemix.store.StoreFactory;
040:        import org.apache.servicemix.store.memory.MemoryStoreFactory;
041:        import org.springframework.jms.core.JmsTemplate;
042:        import org.springframework.jms.core.SessionCallback;
043:        import org.springframework.jms.listener.adapter.ListenerExecutionFailedException;
044:        import org.springframework.jms.support.JmsUtils;
045:        import org.springframework.jms.support.destination.DestinationResolver;
046:        import org.springframework.jms.support.destination.DynamicDestinationResolver;
047:
048:        public abstract class AbstractConsumerEndpoint extends ConsumerEndpoint {
049:
050:            protected static final String PROP_JMS_CONTEXT = JmsContext.class
051:                    .getName();
052:
053:            private JmsConsumerMarshaler marshaler = new DefaultConsumerMarshaler();
054:            private boolean synchronous = true;
055:            private DestinationChooser destinationChooser;
056:            private DestinationResolver destinationResolver = new DynamicDestinationResolver();
057:            private boolean pubSubDomain;
058:            private ConnectionFactory connectionFactory;
059:            private JmsTemplate template;
060:
061:            // Reply properties
062:            private Boolean useMessageIdInResponse;
063:            private Destination replyDestination;
064:            private String replyDestinationName;
065:            private boolean replyExplicitQosEnabled;
066:            private int replyDeliveryMode = Message.DEFAULT_DELIVERY_MODE;
067:            private int replyPriority = Message.DEFAULT_PRIORITY;
068:            private long replyTimeToLive = Message.DEFAULT_TIME_TO_LIVE;
069:            private Map<String, Object> replyProperties;
070:
071:            private boolean stateless;
072:            private StoreFactory storeFactory;
073:            private Store store;
074:
075:            public AbstractConsumerEndpoint() {
076:                super ();
077:            }
078:
079:            public AbstractConsumerEndpoint(DefaultComponent component,
080:                    ServiceEndpoint endpoint) {
081:                super (component, endpoint);
082:            }
083:
084:            public AbstractConsumerEndpoint(ServiceUnit serviceUnit,
085:                    QName service, String endpoint) {
086:                super (serviceUnit, service, endpoint);
087:            }
088:
089:            /**
090:             * @return the destinationChooser
091:             */
092:            public DestinationChooser getDestinationChooser() {
093:                return destinationChooser;
094:            }
095:
096:            /**
097:             * @param destinationChooser the destinationChooser to set
098:             */
099:            public void setDestinationChooser(
100:                    DestinationChooser destinationChooser) {
101:                this .destinationChooser = destinationChooser;
102:            }
103:
104:            /**
105:             * @return the replyDeliveryMode
106:             */
107:            public int getReplyDeliveryMode() {
108:                return replyDeliveryMode;
109:            }
110:
111:            /**
112:             * @param replyDeliveryMode the replyDeliveryMode to set
113:             */
114:            public void setReplyDeliveryMode(int replyDeliveryMode) {
115:                this .replyDeliveryMode = replyDeliveryMode;
116:            }
117:
118:            /**
119:             * @return the replyDestination
120:             */
121:            public Destination getReplyDestination() {
122:                return replyDestination;
123:            }
124:
125:            /**
126:             * @param replyDestination the replyDestination to set
127:             */
128:            public void setReplyDestination(Destination replyDestination) {
129:                this .replyDestination = replyDestination;
130:            }
131:
132:            /**
133:             * @return the replyDestinationName
134:             */
135:            public String getReplyDestinationName() {
136:                return replyDestinationName;
137:            }
138:
139:            /**
140:             * @param replyDestinationName the replyDestinationName to set
141:             */
142:            public void setReplyDestinationName(String replyDestinationName) {
143:                this .replyDestinationName = replyDestinationName;
144:            }
145:
146:            /**
147:             * @return the replyExplicitQosEnabled
148:             */
149:            public boolean isReplyExplicitQosEnabled() {
150:                return replyExplicitQosEnabled;
151:            }
152:
153:            /**
154:             * @param replyExplicitQosEnabled the replyExplicitQosEnabled to set
155:             */
156:            public void setReplyExplicitQosEnabled(
157:                    boolean replyExplicitQosEnabled) {
158:                this .replyExplicitQosEnabled = replyExplicitQosEnabled;
159:            }
160:
161:            /**
162:             * @return the replyPriority
163:             */
164:            public int getReplyPriority() {
165:                return replyPriority;
166:            }
167:
168:            /**
169:             * @param replyPriority the replyPriority to set
170:             */
171:            public void setReplyPriority(int replyPriority) {
172:                this .replyPriority = replyPriority;
173:            }
174:
175:            /**
176:             * @return the replyProperties
177:             */
178:            public Map<String, Object> getReplyProperties() {
179:                return replyProperties;
180:            }
181:
182:            /**
183:             * @param replyProperties the replyProperties to set
184:             */
185:            public void setReplyProperties(Map<String, Object> replyProperties) {
186:                this .replyProperties = replyProperties;
187:            }
188:
189:            /**
190:             * @return the replyTimeToLive
191:             */
192:            public long getReplyTimeToLive() {
193:                return replyTimeToLive;
194:            }
195:
196:            /**
197:             * @param replyTimeToLive the replyTimeToLive to set
198:             */
199:            public void setReplyTimeToLive(long replyTimeToLive) {
200:                this .replyTimeToLive = replyTimeToLive;
201:            }
202:
203:            /**
204:             * @return the useMessageIdInResponse
205:             */
206:            public Boolean getUseMessageIdInResponse() {
207:                return useMessageIdInResponse;
208:            }
209:
210:            /**
211:             * @param useMessageIdInResponse the useMessageIdInResponse to set
212:             */
213:            public void setUseMessageIdInResponse(Boolean useMessageIdInResponse) {
214:                this .useMessageIdInResponse = useMessageIdInResponse;
215:            }
216:
217:            /**
218:             * @return the connectionFactory
219:             */
220:            public ConnectionFactory getConnectionFactory() {
221:                return connectionFactory;
222:            }
223:
224:            /**
225:             * @param connectionFactory the connectionFactory to set
226:             */
227:            public void setConnectionFactory(ConnectionFactory connectionFactory) {
228:                this .connectionFactory = connectionFactory;
229:            }
230:
231:            /**
232:             * @return the pubSubDomain
233:             */
234:            public boolean isPubSubDomain() {
235:                return pubSubDomain;
236:            }
237:
238:            /**
239:             * @param pubSubDomain the pubSubDomain to set
240:             */
241:            public void setPubSubDomain(boolean pubSubDomain) {
242:                this .pubSubDomain = pubSubDomain;
243:            }
244:
245:            /**
246:             * @return the destinationResolver
247:             */
248:            public DestinationResolver getDestinationResolver() {
249:                return destinationResolver;
250:            }
251:
252:            /**
253:             * @param destinationResolver the destinationResolver to set
254:             */
255:            public void setDestinationResolver(
256:                    DestinationResolver destinationResolver) {
257:                this .destinationResolver = destinationResolver;
258:            }
259:
260:            /**
261:             * @return the marshaler
262:             */
263:            public JmsConsumerMarshaler getMarshaler() {
264:                return marshaler;
265:            }
266:
267:            /**
268:             * @param marshaler the marshaler to set
269:             */
270:            public void setMarshaler(JmsConsumerMarshaler marshaler) {
271:                this .marshaler = marshaler;
272:            }
273:
274:            /**
275:             * @return the synchronous
276:             */
277:            public boolean isSynchronous() {
278:                return synchronous;
279:            }
280:
281:            /**
282:             * @param synchronous the synchronous to set
283:             */
284:            public void setSynchronous(boolean synchronous) {
285:                this .synchronous = synchronous;
286:            }
287:
288:            public boolean isStateless() {
289:                return stateless;
290:            }
291:
292:            public void setStateless(boolean stateless) {
293:                this .stateless = stateless;
294:            }
295:
296:            public Store getStore() {
297:                return store;
298:            }
299:
300:            public void setStore(Store store) {
301:                this .store = store;
302:            }
303:
304:            public StoreFactory getStoreFactory() {
305:                return storeFactory;
306:            }
307:
308:            public void setStoreFactory(StoreFactory storeFactory) {
309:                this .storeFactory = storeFactory;
310:            }
311:
312:            public String getLocationURI() {
313:                // TODO: Need to return a real URI
314:                return getService() + "#" + getEndpoint();
315:            }
316:
317:            public synchronized void start() throws Exception {
318:                super .start();
319:                if (template == null) {
320:                    template = new JmsTemplate(getConnectionFactory());
321:                }
322:                if (store == null && !stateless) {
323:                    if (storeFactory == null) {
324:                        storeFactory = new MemoryStoreFactory();
325:                    }
326:                    store = storeFactory.open(getService().toString()
327:                            + getEndpoint());
328:                }
329:            }
330:
331:            public synchronized void stop() throws Exception {
332:                if (store != null) {
333:                    if (storeFactory != null) {
334:                        storeFactory.close(store);
335:                    }
336:                    store = null;
337:                }
338:                super .stop();
339:            }
340:
341:            public void process(MessageExchange exchange) throws Exception {
342:                JmsContext context;
343:                if (stateless) {
344:                    context = (JmsContext) exchange
345:                            .getProperty(PROP_JMS_CONTEXT);
346:                } else {
347:                    context = (JmsContext) store.load(exchange.getExchangeId());
348:                }
349:                processExchange(exchange, null, context);
350:            }
351:
352:            protected void processExchange(final MessageExchange exchange,
353:                    final Session session, final JmsContext context)
354:                    throws Exception {
355:                // Ignore DONE exchanges
356:                if (exchange.getStatus() == ExchangeStatus.DONE) {
357:                    return;
358:                }
359:                // Create session if needed
360:                if (session == null) {
361:                    template.execute(new SessionCallback() {
362:                        public Object doInJms(Session session)
363:                                throws JMSException {
364:                            try {
365:                                processExchange(exchange, session, context);
366:                            } catch (Exception e) {
367:                                throw new ListenerExecutionFailedException(
368:                                        "Exchange processing failed", e);
369:                            }
370:                            return null;
371:                        }
372:                    });
373:                    return;
374:                }
375:                // Handle exchanges
376:                Message msg = null;
377:                Destination dest = null;
378:                if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
379:                    if (exchange.getFault() != null) {
380:                        msg = marshaler.createFault(exchange, exchange
381:                                .getFault(), session, context);
382:                        dest = getReplyDestination(exchange, exchange
383:                                .getFault(), session, context);
384:                    } else if (exchange.getMessage("out") != null) {
385:                        msg = marshaler.createOut(exchange, exchange
386:                                .getMessage("out"), session, context);
387:                        dest = getReplyDestination(exchange, exchange
388:                                .getMessage("out"), session, context);
389:                    }
390:                    if (msg == null) {
391:                        throw new IllegalStateException(
392:                                "Unable to send back answer or fault");
393:                    }
394:                    setCorrelationId(context.getMessage(), msg);
395:                    try {
396:                        send(msg, session, dest);
397:                        done(exchange);
398:                    } catch (Exception e) {
399:                        fail(exchange, e);
400:                        throw e;
401:                    }
402:                } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
403:                    Exception error = exchange.getError();
404:                    if (error == null) {
405:                        error = new JBIException(
406:                                "Exchange in ERROR state, but no exception provided");
407:                    }
408:                    msg = marshaler.createError(exchange, error, session,
409:                            context);
410:                    dest = getReplyDestination(exchange, error, session,
411:                            context);
412:                    setCorrelationId(context.getMessage(), msg);
413:                    send(msg, session, dest);
414:                } else {
415:                    throw new IllegalStateException(
416:                            "Unrecognized exchange status");
417:                }
418:            }
419:
420:            protected void send(Message msg, Session session, Destination dest)
421:                    throws JMSException {
422:                MessageProducer producer = session.createProducer(dest);
423:                try {
424:                    if (replyProperties != null) {
425:                        for (Map.Entry<String, Object> e : replyProperties
426:                                .entrySet()) {
427:                            msg.setObjectProperty(e.getKey(), e.getValue());
428:                        }
429:                    }
430:                    if (replyExplicitQosEnabled) {
431:                        producer.send(msg, replyDeliveryMode, replyPriority,
432:                                replyTimeToLive);
433:                    } else {
434:                        producer.send(msg);
435:                    }
436:                } finally {
437:                    JmsUtils.closeMessageProducer(producer);
438:                }
439:            }
440:
441:            protected void onMessage(Message jmsMessage, Session session)
442:                    throws JMSException {
443:                if (logger.isTraceEnabled()) {
444:                    logger.trace("Received: " + jmsMessage);
445:                }
446:                try {
447:                    JmsContext context = marshaler.createContext(jmsMessage);
448:                    MessageExchange exchange = marshaler.createExchange(
449:                            context, getContext());
450:                    configureExchangeTarget(exchange);
451:                    if (synchronous) {
452:                        try {
453:                            sendSync(exchange);
454:                        } catch (Exception e) {
455:                            handleException(exchange, e, session, context);
456:                        }
457:                        if (exchange.getStatus() != ExchangeStatus.DONE) {
458:                            processExchange(exchange, session, context);
459:                        }
460:                    } else {
461:                        if (stateless) {
462:                            exchange.setProperty(PROP_JMS_CONTEXT, context);
463:                        } else {
464:                            store.store(exchange.getExchangeId(), context);
465:                        }
466:                        boolean success = false;
467:                        try {
468:                            send(exchange);
469:                            success = true;
470:                        } catch (Exception e) {
471:                            handleException(exchange, e, session, context);
472:                        } finally {
473:                            if (!success && !stateless) {
474:                                store.load(exchange.getExchangeId());
475:                            }
476:                        }
477:                    }
478:                } catch (JMSException e) {
479:                    throw e;
480:                } catch (Exception e) {
481:                    throw (JMSException) new JMSException(
482:                            "Error sending JBI exchange").initCause(e);
483:                }
484:            }
485:
486:            protected Destination getReplyDestination(MessageExchange exchange,
487:                    Object message, Session session, JmsContext context)
488:                    throws JMSException {
489:                // If a JMS ReplyTo property is set, use it
490:                if (context.getMessage().getJMSReplyTo() != null) {
491:                    return context.getMessage().getJMSReplyTo();
492:                }
493:                Object dest = null;
494:                // Let the destinationChooser a chance to choose the destination 
495:                if (destinationChooser != null) {
496:                    dest = destinationChooser.chooseDestination(exchange,
497:                            message);
498:                }
499:                // Default to replyDestination / replyDestinationName properties
500:                if (dest == null) {
501:                    dest = replyDestination;
502:                }
503:                if (dest == null) {
504:                    dest = replyDestinationName;
505:                }
506:                // Resolve destination if needed
507:                if (dest instanceof  Destination) {
508:                    return (Destination) dest;
509:                } else if (dest instanceof  String) {
510:                    return destinationResolver.resolveDestinationName(session,
511:                            (String) dest, isPubSubDomain());
512:                }
513:                throw new IllegalStateException(
514:                        "Unable to choose destination for exchange " + exchange);
515:            }
516:
517:            protected void setCorrelationId(Message query, Message reply)
518:                    throws Exception {
519:                if (useMessageIdInResponse == null) {
520:                    if (query.getJMSCorrelationID() != null) {
521:                        reply.setJMSCorrelationID(query.getJMSCorrelationID());
522:                    } else if (query.getJMSMessageID() != null) {
523:                        reply.setJMSCorrelationID(query.getJMSMessageID());
524:                    } else {
525:                        throw new IllegalStateException(
526:                                "No JMSCorrelationID or JMSMessageID set on query message");
527:                    }
528:                } else if (useMessageIdInResponse.booleanValue()) {
529:                    if (query.getJMSMessageID() != null) {
530:                        reply.setJMSCorrelationID(query.getJMSMessageID());
531:                    } else {
532:                        throw new IllegalStateException(
533:                                "No JMSMessageID set on query message");
534:                    }
535:                } else {
536:                    if (query.getJMSCorrelationID() != null) {
537:                        reply.setJMSCorrelationID(query.getJMSCorrelationID());
538:                    } else {
539:                        throw new IllegalStateException(
540:                                "No JMSCorrelationID set on query message");
541:                    }
542:                }
543:            }
544:
545:            protected void handleException(MessageExchange exchange,
546:                    Exception error, Session session, JmsContext context)
547:                    throws Exception {
548:                // For InOnly, the consumer does not expect any response back, so
549:                // just rethrow it and let the fault behavior
550:                if (exchange instanceof  InOnly) {
551:                    throw error;
552:                }
553:                // Check if the exception should lead to an error back
554:                if (treatExceptionAsFault(error)) {
555:                    sendError(exchange, error, session, context);
556:                } else {
557:                    throw error;
558:                }
559:            }
560:
561:            protected boolean treatExceptionAsFault(Exception error) {
562:                return error instanceof  SecurityException;
563:            }
564:
565:            protected void sendError(final MessageExchange exchange,
566:                    final Exception error, Session session,
567:                    final JmsContext context) throws Exception {
568:                // Create session if needed
569:                if (session == null) {
570:                    template.execute(new SessionCallback() {
571:                        public Object doInJms(Session session)
572:                                throws JMSException {
573:                            try {
574:                                sendError(exchange, error, session, context);
575:                            } catch (Exception e) {
576:                                throw new ListenerExecutionFailedException(
577:                                        "Exchange processing failed", e);
578:                            }
579:                            return null;
580:                        }
581:                    });
582:                    return;
583:                }
584:                Message msg = marshaler.createError(exchange, error, session,
585:                        context);
586:                Destination dest = getReplyDestination(exchange, error,
587:                        session, context);
588:                setCorrelationId(context.getMessage(), msg);
589:                send(msg, session, dest);
590:            }
591:
592:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.