Source Code Cross Referenced for DeliveryChannelImpl.java in  » ESB » servicemix » org » apache » servicemix » jbi » messaging » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » ESB » servicemix » org.apache.servicemix.jbi.messaging 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        /*
002:         * Licensed to the Apache Software Foundation (ASF) under one or more
003:         * contributor license agreements.  See the NOTICE file distributed with
004:         * this work for additional information regarding copyright ownership.
005:         * The ASF licenses this file to You under the Apache License, Version 2.0
006:         * (the "License"); you may not use this file except in compliance with
007:         * the License.  You may obtain a copy of the License at
008:         *
009:         *      http://www.apache.org/licenses/LICENSE-2.0
010:         *
011:         * Unless required by applicable law or agreed to in writing, software
012:         * distributed under the License is distributed on an "AS IS" BASIS,
013:         * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014:         * See the License for the specific language governing permissions and
015:         * limitations under the License.
016:         */
017:        package org.apache.servicemix.jbi.messaging;
018:
019:        import java.util.ArrayList;
020:        import java.util.List;
021:        import java.util.Map;
022:        import java.util.concurrent.ArrayBlockingQueue;
023:        import java.util.concurrent.BlockingQueue;
024:        import java.util.concurrent.ConcurrentHashMap;
025:        import java.util.concurrent.TimeUnit;
026:        import java.util.concurrent.atomic.AtomicBoolean;
027:
028:        import javax.jbi.JBIException;
029:        import javax.jbi.component.Component;
030:        import javax.jbi.component.ComponentLifeCycle;
031:        import javax.jbi.messaging.DeliveryChannel;
032:        import javax.jbi.messaging.ExchangeStatus;
033:        import javax.jbi.messaging.MessageExchange;
034:        import javax.jbi.messaging.MessageExchange.Role;
035:        import javax.jbi.messaging.MessageExchangeFactory;
036:        import javax.jbi.messaging.MessagingException;
037:        import javax.jbi.servicedesc.ServiceEndpoint;
038:        import javax.transaction.Transaction;
039:        import javax.transaction.TransactionManager;
040:        import javax.xml.namespace.QName;
041:
042:        import org.apache.commons.logging.Log;
043:        import org.apache.commons.logging.LogFactory;
044:        import org.apache.servicemix.JbiConstants;
045:        import org.apache.servicemix.MessageExchangeListener;
046:        import org.apache.servicemix.id.IdGenerator;
047:        import org.apache.servicemix.jbi.ExchangeTimeoutException;
048:        import org.apache.servicemix.jbi.container.ActivationSpec;
049:        import org.apache.servicemix.jbi.container.JBIContainer;
050:        import org.apache.servicemix.jbi.event.ExchangeEvent;
051:        import org.apache.servicemix.jbi.event.ExchangeListener;
052:        import org.apache.servicemix.jbi.framework.ComponentContextImpl;
053:        import org.apache.servicemix.jbi.framework.ComponentMBeanImpl;
054:
055:        /**
056:         * DeliveryChannel implementation
057:         * 
058:         * @version $Revision: 564607 $
059:         */
060:        public class DeliveryChannelImpl implements  DeliveryChannel {
061:
062:            private static final Log LOG = LogFactory
063:                    .getLog(DeliveryChannelImpl.class);
064:
065:            private JBIContainer container;
066:
067:            private ComponentContextImpl context;
068:
069:            private ComponentMBeanImpl component;
070:
071:            private BlockingQueue<MessageExchangeImpl> queue;
072:
073:            private IdGenerator idGenerator = new IdGenerator();
074:
075:            private MessageExchangeFactory inboundFactory;
076:
077:            private int intervalCount;
078:
079:            private AtomicBoolean closed = new AtomicBoolean(false);
080:
081:            private Map<Thread, Boolean> waiters = new ConcurrentHashMap<Thread, Boolean>();
082:
083:            private TransactionManager transactionManager;
084:
085:            /**
086:             * When using clustering and sendSync, the exchange received will not be the
087:             * same as the one sent (because it has been serialized/deserialized. We
088:             * thus need to keep the original exchange in a map and override its state.
089:             */
090:            private Map<String, MessageExchangeImpl> exchangesById = new ConcurrentHashMap<String, MessageExchangeImpl>();
091:
092:            /**
093:             * Constructor
094:             */
095:            public DeliveryChannelImpl(ComponentMBeanImpl component) {
096:                this .component = component;
097:                this .container = component.getContainer();
098:                this .queue = new ArrayBlockingQueue<MessageExchangeImpl>(
099:                        component.getInboundQueueCapacity());
100:                this .transactionManager = (TransactionManager) this .container
101:                        .getTransactionManager();
102:            }
103:
104:            /**
105:             * @return size of the inbound Queue
106:             */
107:            public int getQueueSize() {
108:                return queue.size();
109:            }
110:
111:            /**
112:             * close the delivery channel
113:             * 
114:             * @throws MessagingException
115:             */
116:            public void close() throws MessagingException {
117:                if (this .closed.compareAndSet(false, true)) {
118:                    if (LOG.isDebugEnabled()) {
119:                        LOG.debug("Closing DeliveryChannel " + this );
120:                    }
121:                    List<MessageExchangeImpl> pending = new ArrayList<MessageExchangeImpl>(
122:                            queue.size());
123:                    queue.drainTo(pending);
124:                    for (MessageExchangeImpl messageExchange : pending) {
125:                        if (messageExchange.getTransactionContext() != null
126:                                && messageExchange.getMirror().getSyncState() == MessageExchangeImpl.SYNC_STATE_SYNC_SENT) {
127:                            notifyExchange(messageExchange.getMirror(),
128:                                    messageExchange.getMirror(), "close");
129:                        }
130:                    }
131:                    // Interrupt all blocked thread
132:                    Thread[] threads = waiters.keySet().toArray(
133:                            new Thread[waiters.size()]);
134:                    for (int i = 0; i < threads.length; i++) {
135:                        threads[i].interrupt();
136:                    }
137:                    // deactivate all endpoints from this component
138:                    ServiceEndpoint[] endpoints = container.getRegistry()
139:                            .getEndpointsForComponent(
140:                                    component.getComponentNameSpace());
141:                    for (int i = 0; i < endpoints.length; i++) {
142:                        try {
143:                            component.getContext().deactivateEndpoint(
144:                                    endpoints[i]);
145:                        } catch (JBIException e) {
146:                            LOG.error("Error deactivating endpoint", e);
147:                        }
148:                    }
149:                    // TODO: Cause all accepts to return null
150:                    // TODO: Abort all pending exchanges
151:                }
152:            }
153:
154:            protected void checkNotClosed() throws MessagingException {
155:                if (closed.get()) {
156:                    throw new MessagingException(this  + " has been closed.");
157:                }
158:            }
159:
160:            /**
161:             * Create a message exchange factory. This factory will create exchange
162:             * instances with all appropriate properties set to null.
163:             * 
164:             * @return a message exchange factory
165:             */
166:            public MessageExchangeFactory createExchangeFactory() {
167:                MessageExchangeFactoryImpl result = createMessageExchangeFactory();
168:                result.setContext(context);
169:                ActivationSpec activationSpec = context.getActivationSpec();
170:                if (activationSpec != null) {
171:                    String componentName = context.getComponentNameSpace()
172:                            .getName();
173:                    // lets auto-default the container-routing information
174:                    QName serviceName = activationSpec.getDestinationService();
175:                    if (serviceName != null) {
176:                        result.setServiceName(serviceName);
177:                        LOG.debug("default destination serviceName for "
178:                                + componentName + " = " + serviceName);
179:                    }
180:                    QName interfaceName = activationSpec
181:                            .getDestinationInterface();
182:                    if (interfaceName != null) {
183:                        result.setInterfaceName(interfaceName);
184:                        LOG.debug("default destination interfaceName for "
185:                                + componentName + " = " + interfaceName);
186:                    }
187:                    QName operationName = activationSpec
188:                            .getDestinationOperation();
189:                    if (operationName != null) {
190:                        result.setOperationName(operationName);
191:                        LOG.debug("default destination operationName for "
192:                                + componentName + " = " + operationName);
193:                    }
194:                    String endpointName = activationSpec
195:                            .getDestinationEndpoint();
196:                    if (endpointName != null) {
197:                        boolean endpointSet = false;
198:                        LOG.debug("default destination endpointName for "
199:                                + componentName + " = " + endpointName);
200:                        if (serviceName != null && endpointName != null) {
201:                            endpointName = endpointName.trim();
202:                            ServiceEndpoint endpoint = container.getRegistry()
203:                                    .getEndpoint(serviceName, endpointName);
204:                            if (endpoint != null) {
205:                                result.setEndpoint(endpoint);
206:                                LOG
207:                                        .info("Set default destination endpoint for "
208:                                                + componentName
209:                                                + " to "
210:                                                + endpoint);
211:                                endpointSet = true;
212:                            }
213:                        }
214:                        if (!endpointSet) {
215:                            LOG.warn("Could not find destination endpoint for "
216:                                    + componentName + " service(" + serviceName
217:                                    + ") with endpointName " + endpointName);
218:                        }
219:                    }
220:                }
221:                return result;
222:            }
223:
224:            /**
225:             * Create a message exchange factory for the given interface name.
226:             * 
227:             * @param interfaceName
228:             *            name of the interface for which all exchanges created by the
229:             *            returned factory will be set
230:             * @return an exchange factory that will create exchanges for the given
231:             *         interface; must be non-null
232:             */
233:            public MessageExchangeFactory createExchangeFactory(
234:                    QName interfaceName) {
235:                MessageExchangeFactoryImpl result = createMessageExchangeFactory();
236:                result.setInterfaceName(interfaceName);
237:                return result;
238:            }
239:
240:            /**
241:             * Create a message exchange factory for the given service name.
242:             * 
243:             * @param serviceName
244:             *            name of the service for which all exchanges created by the
245:             *            returned factory will be set
246:             * @return an exchange factory that will create exchanges for the given
247:             *         service; must be non-null
248:             */
249:            public MessageExchangeFactory createExchangeFactoryForService(
250:                    QName serviceName) {
251:                MessageExchangeFactoryImpl result = createMessageExchangeFactory();
252:                result.setServiceName(serviceName);
253:                return result;
254:            }
255:
256:            /**
257:             * Create a message exchange factory for the given endpoint.
258:             * 
259:             * @param endpoint
260:             *            endpoint for which all exchanges created by the returned
261:             *            factory will be set for
262:             * @return an exchange factory that will create exchanges for the given
263:             *         endpoint
264:             */
265:            public MessageExchangeFactory createExchangeFactory(
266:                    ServiceEndpoint endpoint) {
267:                MessageExchangeFactoryImpl result = createMessageExchangeFactory();
268:                result.setEndpoint(endpoint);
269:                return result;
270:            }
271:
272:            protected MessageExchangeFactoryImpl createMessageExchangeFactory() {
273:                MessageExchangeFactoryImpl messageExchangeFactory = new MessageExchangeFactoryImpl(
274:                        idGenerator, closed);
275:                messageExchangeFactory.setContext(context);
276:                return messageExchangeFactory;
277:            }
278:
279:            /**
280:             * @return a MessageExchange - blocking call
281:             * @throws MessagingException
282:             */
283:            public MessageExchange accept() throws MessagingException {
284:                return accept(Long.MAX_VALUE);
285:            }
286:
287:            /**
288:             * return a MessageExchange
289:             * 
290:             * @param timeoutMS
291:             * @return Message Exchange
292:             * @throws MessagingException
293:             */
294:            public MessageExchange accept(long timeoutMS)
295:                    throws MessagingException {
296:                try {
297:                    checkNotClosed();
298:                    MessageExchangeImpl me = queue.poll(timeoutMS,
299:                            TimeUnit.MILLISECONDS);
300:                    if (me != null) {
301:                        // If the exchange has already timed out,
302:                        // do not give it to the component
303:                        if (me.getPacket().isAborted()) {
304:                            if (LOG.isDebugEnabled()) {
305:                                LOG.debug("Aborted " + me.getExchangeId()
306:                                        + " in " + this );
307:                            }
308:                            me = null;
309:                        } else {
310:                            if (LOG.isDebugEnabled()) {
311:                                LOG.debug("Accepting " + me.getExchangeId()
312:                                        + " in " + this );
313:                            }
314:                            // If we have a tx lock and the exchange is not active, we
315:                            // need
316:                            // to notify here without resuming transaction
317:                            if (me.getTxLock() != null
318:                                    && me.getStatus() != ExchangeStatus.ACTIVE) {
319:                                notifyExchange(me.getMirror(), me.getTxLock(),
320:                                        "acceptFinishedExchangeWithTxLock");
321:                                me.handleAccept();
322:                                if (LOG.isTraceEnabled()) {
323:                                    LOG.trace("Accepted: " + me);
324:                                }
325:                                // We transactionnaly deliver a finished exchange
326:                            } else if (me.isTransacted()
327:                                    && me.getStatus() != ExchangeStatus.ACTIVE) {
328:                                // Do not resume transaction
329:                                me.handleAccept();
330:                                if (LOG.isTraceEnabled()) {
331:                                    LOG.trace("Accepted: " + me);
332:                                }
333:                            } else {
334:                                resumeTx(me);
335:                                me.handleAccept();
336:                                if (LOG.isTraceEnabled()) {
337:                                    LOG.trace("Accepted: " + me);
338:                                }
339:                            }
340:                        }
341:                    }
342:                    if (me != null) {
343:                        // Call input listeners
344:                        ExchangeListener[] l = (ExchangeListener[]) container
345:                                .getListeners(ExchangeListener.class);
346:                        ExchangeEvent event = new ExchangeEvent(me,
347:                                ExchangeEvent.EXCHANGE_ACCEPTED);
348:                        for (int i = 0; i < l.length; i++) {
349:                            try {
350:                                l[i].exchangeAccepted(event);
351:                            } catch (Exception e) {
352:                                LOG.warn("Error calling listener: "
353:                                        + e.getMessage(), e);
354:                            }
355:                        }
356:                    }
357:                    return me;
358:                } catch (InterruptedException e) {
359:                    throw new MessagingException("accept failed", e);
360:                }
361:            }
362:
363:            protected void autoSetPersistent(MessageExchangeImpl me) {
364:                Boolean persistent = me.getPersistent();
365:                if (persistent == null) {
366:                    if (context.getActivationSpec().getPersistent() != null) {
367:                        persistent = context.getActivationSpec()
368:                                .getPersistent();
369:                    } else {
370:                        persistent = Boolean.valueOf(context.getContainer()
371:                                .isPersistent());
372:                    }
373:                    me.setPersistent(persistent);
374:                }
375:            }
376:
377:            protected void throttle() {
378:                if (component.isExchangeThrottling()) {
379:                    if (component.getThrottlingInterval() > intervalCount) {
380:                        intervalCount = 0;
381:                        try {
382:                            Thread.sleep(component.getThrottlingTimeout());
383:                        } catch (InterruptedException e) {
384:                            LOG.warn("throttling failed", e);
385:                        }
386:                    }
387:                    intervalCount++;
388:                }
389:            }
390:
391:            protected void doSend(MessageExchangeImpl me, boolean sync)
392:                    throws MessagingException {
393:                MessageExchangeImpl mirror = me.getMirror();
394:                boolean finished = me.getStatus() != ExchangeStatus.ACTIVE;
395:                try {
396:                    if (LOG.isTraceEnabled()) {
397:                        LOG.trace("Sent: " + me);
398:                    }
399:                    // If the message has timed out
400:                    if (me.getPacket().isAborted()) {
401:                        throw new ExchangeTimeoutException(me);
402:                    }
403:                    // Auto enlist exchange in transaction
404:                    autoEnlistInTx(me);
405:                    // Update persistence info
406:                    autoSetPersistent(me);
407:                    // Throttle if needed
408:                    throttle();
409:                    // Store the consumer component
410:                    if (me.getRole() == Role.CONSUMER) {
411:                        me.setSourceId(component.getComponentNameSpace());
412:                    }
413:                    // Call the listeners before the ownership changes
414:                    // Call input listeners
415:                    ExchangeListener[] l = (ExchangeListener[]) container
416:                            .getListeners(ExchangeListener.class);
417:                    ExchangeEvent event = new ExchangeEvent(me,
418:                            ExchangeEvent.EXCHANGE_SENT);
419:                    for (int i = 0; i < l.length; i++) {
420:                        try {
421:                            l[i].exchangeSent(event);
422:                        } catch (Exception e) {
423:                            LOG.warn("Error calling listener: "
424:                                    + e.getMessage(), e);
425:                        }
426:                    }
427:                    // Change ownership
428:                    me.handleSend(sync);
429:                    mirror.setTxState(MessageExchangeImpl.TX_STATE_NONE);
430:                    // If this is the DONE or ERROR status from a synchronous
431:                    // transactional exchange,
432:                    // it should not be part of the transaction, so remove the tx
433:                    // context
434:                    if (finished
435:                            && me.getTxLock() == null
436:                            && me.getTxState() == MessageExchangeImpl.TX_STATE_CONVEYED
437:                            && !me.isPushDelivery()
438:                            && me.getRole() == Role.CONSUMER) {
439:                        me.setTransactionContext(null);
440:                    }
441:                    container.sendExchange(mirror);
442:                } catch (MessagingException e) {
443:                    if (LOG.isDebugEnabled()) {
444:                        LOG.debug("Exception processing: " + me.getExchangeId()
445:                                + " in " + this );
446:                    }
447:                    throw e;
448:                } finally {
449:                    // If there is a tx lock, we need to suspend and notify
450:                    if (me.getTxLock() != null) {
451:                        if (mirror.getTxState() == MessageExchangeImpl.TX_STATE_ENLISTED) {
452:                            suspendTx(mirror);
453:                        }
454:                        synchronized (me.getTxLock()) {
455:                            notifyExchange(me, me.getTxLock(),
456:                                    "doSendWithTxLock");
457:                        }
458:                    }
459:                }
460:            }
461:
462:            /**
463:             * routes a MessageExchange
464:             * 
465:             * @param messageExchange
466:             * @throws MessagingException
467:             */
468:            public void send(MessageExchange messageExchange)
469:                    throws MessagingException {
470:                // If the delivery channel has been closed
471:                checkNotClosed();
472:                // Log call
473:                if (LOG.isDebugEnabled()) {
474:                    LOG.debug("Send " + messageExchange.getExchangeId()
475:                            + " in " + this );
476:                }
477:                // // JBI 5.5.2.1.3: remove sync property
478:                messageExchange.setProperty(JbiConstants.SEND_SYNC, null);
479:                // Call doSend
480:                MessageExchangeImpl me = (MessageExchangeImpl) messageExchange;
481:                doSend(me, false);
482:            }
483:
484:            /**
485:             * routes a MessageExchange
486:             * 
487:             * @param messageExchange
488:             * @return true if processed
489:             * @throws MessagingException
490:             */
491:            public boolean sendSync(MessageExchange messageExchange)
492:                    throws MessagingException {
493:                return sendSync(messageExchange, 0);
494:            }
495:
496:            /**
497:             * routes a MessageExchange
498:             * 
499:             * @param messageExchange
500:             * @param timeout
501:             * @return true if processed
502:             * @throws MessagingException
503:             */
504:            public boolean sendSync(MessageExchange messageExchange,
505:                    long timeout) throws MessagingException {
506:                // If the delivery channel has been closed
507:                checkNotClosed();
508:                // Log call
509:                if (LOG.isDebugEnabled()) {
510:                    LOG.debug("SendSync " + messageExchange.getExchangeId()
511:                            + " in " + this );
512:                }
513:                boolean result = false;
514:                // JBI 5.5.2.1.3: set the sendSync property
515:                messageExchange.setProperty(JbiConstants.SEND_SYNC,
516:                        Boolean.TRUE);
517:                // Call doSend
518:                MessageExchangeImpl me = (MessageExchangeImpl) messageExchange;
519:                String exchangeKey = me.getKey();
520:                try {
521:                    exchangesById.put(exchangeKey, me);
522:                    // Synchronously send a message and wait for the response
523:                    synchronized (me) {
524:                        doSend(me, true);
525:                        if (me.getSyncState() != MessageExchangeImpl.SYNC_STATE_SYNC_RECEIVED) {
526:                            waitForExchange(me, me, timeout, "sendSync");
527:                        } else {
528:                            if (LOG.isDebugEnabled()) {
529:                                LOG
530:                                        .debug("Exchange "
531:                                                + messageExchange
532:                                                        .getExchangeId()
533:                                                + " has already been answered (no need to wait)");
534:                            }
535:                        }
536:                    }
537:                    if (me.getSyncState() == MessageExchangeImpl.SYNC_STATE_SYNC_RECEIVED) {
538:                        me.handleAccept();
539:                        // If the sender flag has been removed, it means
540:                        // the message has been delivered in the same thread
541:                        // so there is no need to resume the transaction
542:                        // See processInBound
543:                        // if (messageExchangeImpl.getSyncSenderThread() != null) {
544:                        resumeTx(me);
545:                        // }
546:                        result = true;
547:                    } else {
548:                        // JBI 5.5.2.1.3: the exchange should be set to ERROR status
549:                        if (LOG.isDebugEnabled()) {
550:                            LOG.debug("Exchange "
551:                                    + messageExchange.getExchangeId()
552:                                    + " has been aborted");
553:                        }
554:                        me.getPacket().setAborted(true);
555:                        result = false;
556:                    }
557:                } catch (InterruptedException e) {
558:                    throw new MessagingException(e);
559:                } catch (RuntimeException e) {
560:                    // e.printStackTrace();
561:                    throw e;
562:                } finally {
563:                    exchangesById.remove(exchangeKey);
564:                }
565:                return result;
566:            }
567:
568:            /**
569:             * @return Returns the container.
570:             */
571:            public JBIContainer getContainer() {
572:                return container;
573:            }
574:
575:            /**
576:             * @param container
577:             *            The container to set.
578:             */
579:            public void setContainer(JBIContainer container) {
580:                this .container = container;
581:            }
582:
583:            /**
584:             * @return Returns the componentConnector.
585:             */
586:            public ComponentMBeanImpl getComponent() {
587:                return component;
588:            }
589:
590:            /**
591:             * Get the context
592:             * 
593:             * @return the context
594:             */
595:            public ComponentContextImpl getContext() {
596:                return context;
597:            }
598:
599:            /**
600:             * set the context
601:             * 
602:             * @param context
603:             */
604:            public void setContext(ComponentContextImpl context) {
605:                this .context = context;
606:            }
607:
608:            /**
609:             * Used internally for passing in a MessageExchange
610:             * 
611:             * @param me
612:             * @throws MessagingException
613:             */
614:            public void processInBound(MessageExchangeImpl me)
615:                    throws MessagingException {
616:                if (LOG.isTraceEnabled()) {
617:                    LOG.trace("Processing inbound exchange: " + me);
618:                }
619:                // Check if the delivery channel has been closed
620:                checkNotClosed();
621:                // Retrieve the original exchange sent
622:                MessageExchangeImpl original = exchangesById.get(me.getKey());
623:                if (original != null && me != original) {
624:                    original.copyFrom(me);
625:                    me = original;
626:                }
627:                // Check if the incoming exchange is a response to a synchronous
628:                // exchange previously sent
629:                // In this case, we do not have to queue it, but rather notify the
630:                // waiting thread.
631:                if (me.getSyncState() == MessageExchangeImpl.SYNC_STATE_SYNC_SENT) {
632:                    // If the mirror has been delivered using push, better wait until
633:                    // the push call return. This can only work if not using clustered
634:                    // flows,
635:                    // but the flag is transient so we do not care.
636:                    // Ensure that data is uptodate with the incoming exchange (in
637:                    // case the exchange has
638:                    // been serialized / deserialized by a clustered flow)
639:                    suspendTx(original);
640:                    me
641:                            .setSyncState(MessageExchangeImpl.SYNC_STATE_SYNC_RECEIVED);
642:                    notifyExchange(original, original,
643:                            "processInboundSynchronousExchange");
644:                    return;
645:                }
646:
647:                // If the component implements the MessageExchangeListener,
648:                // the delivery can be made synchronously, so we don't need
649:                // to bother with transactions
650:                MessageExchangeListener listener = getExchangeListener();
651:                if (listener != null) {
652:                    me.handleAccept();
653:                    if (LOG.isTraceEnabled()) {
654:                        LOG.trace("Received: " + me);
655:                    }
656:                    // Call input listeners
657:                    ExchangeListener[] l = (ExchangeListener[]) container
658:                            .getListeners(ExchangeListener.class);
659:                    ExchangeEvent event = new ExchangeEvent(me,
660:                            ExchangeEvent.EXCHANGE_ACCEPTED);
661:                    for (int i = 0; i < l.length; i++) {
662:                        try {
663:                            l[i].exchangeAccepted(event);
664:                        } catch (Exception e) {
665:                            LOG.warn("Error calling listener: "
666:                                    + e.getMessage(), e);
667:                        }
668:                    }
669:                    // Set the flag the the exchange was delivered using push mode
670:                    // This is important for transaction boundaries
671:                    me.setPushDeliver(true);
672:                    // Deliver the exchange
673:                    ClassLoader old = Thread.currentThread()
674:                            .getContextClassLoader();
675:                    try {
676:                        Thread.currentThread().setContextClassLoader(
677:                                component.getComponent().getClass()
678:                                        .getClassLoader());
679:                        listener.onMessageExchange(me);
680:                    } finally {
681:                        Thread.currentThread().setContextClassLoader(old);
682:                    }
683:                    // TODO: handle delayed exchange notifications
684:                    return;
685:                }
686:
687:                // Component uses pull delivery.
688:
689:                // If the exchange is transacted, special care should be taken.
690:                // But if the exchange is no more ACTIVE, just queue it, as
691:                // we will never have an answer back.
692:                if (me.isTransacted()
693:                        && me.getStatus() == ExchangeStatus.ACTIVE) {
694:                    // If the transaction is conveyed by the exchange
695:                    // We do not need to resume the transaction in this thread
696:                    if (me.getTxState() == MessageExchangeImpl.TX_STATE_CONVEYED) {
697:                        try {
698:                            suspendTx(me);
699:                            queue.put(me);
700:                        } catch (InterruptedException e) {
701:                            LOG.debug("Exchange " + me.getExchangeId()
702:                                    + " aborted due to thread interruption", e);
703:                            me.getPacket().setAborted(true);
704:                        }
705:                        // Else the delivery / send are enlisted in the current tx.
706:                        // We must suspend the transaction, queue it, and wait for the
707:                        // answer
708:                        // to be sent, at which time the tx should be suspended and resumed
709:                        // in
710:                        // this thread.
711:                    } else {
712:                        Object lock = new Object();
713:                        synchronized (lock) {
714:                            try {
715:                                me.setTxLock(lock);
716:                                suspendTx(me);
717:                                queue.put(me);
718:                                waitForExchange(me, lock, 0,
719:                                        "processInboundTransactionalExchange");
720:                            } catch (InterruptedException e) {
721:                                LOG
722:                                        .debug(
723:                                                "Exchange "
724:                                                        + me.getExchangeId()
725:                                                        + " aborted due to thread interruption",
726:                                                e);
727:                                me.getPacket().setAborted(true);
728:                            } finally {
729:                                me.setTxLock(null);
730:                                resumeTx(me);
731:                            }
732:                        }
733:                    }
734:                    // If the exchange is ACTIVE, the transaction boundary will suspended
735:                    // when the
736:                    // answer is sent
737:                    // Else just queue the exchange
738:                } else {
739:                    try {
740:                        queue.put(me);
741:                    } catch (InterruptedException e) {
742:                        LOG.debug("Exchange " + me.getExchangeId()
743:                                + " aborted due to thread interruption", e);
744:                        me.getPacket().setAborted(true);
745:                    }
746:                }
747:            }
748:
749:            protected MessageExchangeListener getExchangeListener() {
750:                Component comp = this .component.getComponent();
751:                if (comp instanceof  MessageExchangeListener) {
752:                    return (MessageExchangeListener) comp;
753:                }
754:                ComponentLifeCycle lifecycle = this .component.getLifeCycle();
755:                if (lifecycle instanceof  MessageExchangeListener) {
756:                    return (MessageExchangeListener) lifecycle;
757:                }
758:                return null;
759:            }
760:
761:            /**
762:             * Synchronization must be performed on the given exchange when calling this
763:             * method
764:             * 
765:             * @param me
766:             * @throws InterruptedException
767:             */
768:            protected void waitForExchange(MessageExchangeImpl me, Object lock,
769:                    long timeout, String from) throws InterruptedException {
770:                // If the channel is closed while here, we must abort
771:                if (LOG.isDebugEnabled()) {
772:                    LOG.debug("Waiting for exchange " + me.getExchangeId()
773:                            + " (" + Integer.toHexString(me.hashCode())
774:                            + ") to be answered in " + this  + " from " + from);
775:                }
776:                Thread th = Thread.currentThread();
777:                try {
778:                    waiters.put(th, Boolean.TRUE);
779:                    lock.wait(timeout);
780:                } finally {
781:                    waiters.remove(th);
782:                }
783:                if (LOG.isDebugEnabled()) {
784:                    LOG.debug("Notified: " + me.getExchangeId() + "("
785:                            + Integer.toHexString(me.hashCode()) + ") in "
786:                            + this  + " from " + from);
787:                }
788:            }
789:
790:            protected void notifyExchange(MessageExchangeImpl me, Object lock,
791:                    String from) {
792:                if (LOG.isDebugEnabled()) {
793:                    LOG.debug("Notifying exchange " + me.getExchangeId() + "("
794:                            + Integer.toHexString(me.hashCode()) + ") in "
795:                            + this  + " from " + from);
796:                }
797:                synchronized (lock) {
798:                    lock.notify();
799:                }
800:            }
801:
802:            /**
803:             * Get Inbound Factory
804:             * 
805:             * @return the inbound message factory
806:             */
807:            public MessageExchangeFactory getInboundFactory() {
808:                if (inboundFactory == null) {
809:                    inboundFactory = createExchangeFactory();
810:                }
811:                return inboundFactory;
812:            }
813:
814:            protected void suspendTx(MessageExchangeImpl me) {
815:                if (transactionManager != null) {
816:                    try {
817:                        Transaction oldTx = me.getTransactionContext();
818:                        if (oldTx != null) {
819:                            if (LOG.isDebugEnabled()) {
820:                                LOG.debug("Suspending transaction for "
821:                                        + me.getExchangeId() + " in " + this );
822:                            }
823:                            Transaction tx = transactionManager.suspend();
824:                            if (tx != oldTx) {
825:                                throw new IllegalStateException(
826:                                        "the transaction context set in the messageExchange is not bound to the current thread");
827:                            }
828:                        }
829:                    } catch (Exception e) {
830:                        LOG.info("Exchange " + me.getExchangeId()
831:                                + " aborted due to transaction exception", e);
832:                        me.getPacket().setAborted(true);
833:                    }
834:                }
835:            }
836:
837:            protected void resumeTx(MessageExchangeImpl me)
838:                    throws MessagingException {
839:                if (transactionManager != null) {
840:                    try {
841:                        Transaction oldTx = me.getTransactionContext();
842:                        if (oldTx != null) {
843:                            if (LOG.isDebugEnabled()) {
844:                                LOG.debug("Resuming transaction for "
845:                                        + me.getExchangeId() + " in " + this );
846:                            }
847:                            transactionManager.resume(oldTx);
848:                        }
849:                    } catch (Exception e) {
850:                        throw new MessagingException(e);
851:                    }
852:                }
853:            }
854:
855:            /**
856:             * If the jbi container configured to do so, the message exchange will
857:             * automatically be enlisted in the current transaction, if exists.
858:             * 
859:             * @throws MessagingException
860:             */
861:            protected void autoEnlistInTx(MessageExchangeImpl me)
862:                    throws MessagingException {
863:                if (transactionManager != null
864:                        && container.isAutoEnlistInTransaction()) {
865:                    try {
866:                        Transaction tx = transactionManager.getTransaction();
867:                        if (tx != null) {
868:                            Object oldTx = me.getTransactionContext();
869:                            if (oldTx == null) {
870:                                me.setTransactionContext(tx);
871:                            } else if (oldTx != tx) {
872:                                throw new IllegalStateException(
873:                                        "the transaction context set in the messageExchange is not bound to the current thread");
874:                            }
875:                        }
876:                    } catch (Exception e) {
877:                        throw new MessagingException(e);
878:                    }
879:                }
880:            }
881:
882:            /**
883:             * @return pretty print
884:             */
885:            public String toString() {
886:                return "DeliveryChannel{" + component.getName() + "}";
887:            }
888:
889:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.