Source Code Cross Referenced for ScheduledProcessHandler.java in  » ESB » cbesb-1.2 » com » bostechcorp » cbesb » runtime » ccsl » jbi » messaging » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » ESB » cbesb 1.2 » com.bostechcorp.cbesb.runtime.ccsl.jbi.messaging 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        /*
002:         * ChainBuilder ESB
003:         * 		Visual Enterprise Integration
004:         * 
005:         * Copyright (C) 2007 Bostech Corporation
006:         * 
007:         * This program is free software; you can redistribute it and/or modify it 
008:         * under the terms of the GNU General Public License as published by the 
009:         * Free Software Foundation; either version 2 of the License, or (at your option) 
010:         * any later version.
011:         *
012:         * This program is distributed in the hope that it will be useful, 
013:         * but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY 
014:         * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 
015:         * for more details.
016:         * 
017:         * You should have received a copy of the GNU General Public License along with 
018:         * this program; if not, write to the Free Software Foundation, Inc., 
019:         * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
020:         *
021:         *
022:         * $Id: ScheduledProcessHandler.java 12067 2008-02-22 15:07:09Z mpreston $
023:         */
024:        package com.bostechcorp.cbesb.runtime.ccsl.jbi.messaging;
025:
026:        import java.util.Vector;
027:
028:        import javax.jbi.JBIException;
029:        import javax.jbi.component.ComponentContext;
030:        import javax.jbi.messaging.ExchangeStatus;
031:        import javax.jbi.messaging.InOnly;
032:        import javax.jbi.messaging.InOut;
033:        import javax.jbi.messaging.MessageExchange;
034:        import javax.jbi.messaging.MessagingException;
035:        import javax.jbi.messaging.NormalizedMessage;
036:        import javax.jbi.messaging.RobustInOnly;
037:        import javax.jbi.servicedesc.ServiceEndpoint;
038:
039:        import org.apache.commons.logging.Log;
040:        import org.apache.commons.logging.LogFactory;
041:
042:        import com.bostechcorp.cbesb.common.constant.MetadataConstants;
043:        import com.bostechcorp.cbesb.common.runtime.CbesbException;
044:        import com.bostechcorp.cbesb.common.util.ErrorUtil;
045:        import com.bostechcorp.cbesb.runtime.ccsl.lib.ErrorDb;
046:        import com.bostechcorp.cbesb.runtime.ccsl.lib.ExternalInput;
047:        import com.bostechcorp.cbesb.runtime.ccsl.nmhandler.FaultHandler;
048:        import com.bostechcorp.cbesb.runtime.ccsl.nmhandler.SourceHelper;
049:        import com.bostechcorp.cbesb.runtime.component.util.wsdl.WsdlMepConstants;
050:
051:        /**
052:         * This is the base class that should be extended by concrete Binding Components
053:         * that support polling or schedules. This acts as the consumer rather than
054:         * implementing a concrete ConsumerProvider class.
055:         */
056:        public abstract class ScheduledProcessHandler {
057:
058:            protected final transient Log logger = LogFactory
059:                    .getLog(getClass());
060:
061:            protected ScheduledEndpointProcessor endpoint;
062:
063:            protected ServiceDescriptionHandler serviceDescriptionHandler;
064:
065:            protected String lastError;
066:
067:            protected long startProcessTime;
068:
069:            public ScheduledProcessHandler(ScheduledEndpointProcessor endpoint) {
070:                this .endpoint = endpoint;
071:            }
072:
073:            /**
074:             * @return the lastError
075:             */
076:            public String getLastError() {
077:                return lastError;
078:            }
079:
080:            /**
081:             * @param lastError
082:             *            the lastError to set
083:             */
084:            public void setLastError(String lastError) {
085:                this .lastError = lastError;
086:            }
087:
088:            protected void doStart() throws Exception {
089:            }
090:
091:            protected void doStop() throws Exception {
092:            }
093:
094:            public void processRobustInOnly(MessageExchange exchange)
095:                    throws JBIException {
096:                throw new JBIException(
097:                        getLocalErrorMsg()
098:                                + "receives an unexpected MessageExchange in Consumer processor :"
099:                                + exchange.toString());
100:
101:            }
102:
103:            public void processInOnly(MessageExchange exchange)
104:                    throws JBIException {
105:                throw new JBIException(
106:                        getLocalErrorMsg()
107:                                + "receives an unexpected MessageExchange in Consumer processor :"
108:                                + exchange.toString());
109:            }
110:
111:            public void processInOut(MessageExchange exchange, boolean optional)
112:                    throws JBIException {
113:                throw new JBIException(
114:                        getLocalErrorMsg()
115:                                + "receive an unexpected MessageExchange in Consumer processor :"
116:                                + exchange.toString());
117:            }
118:
119:            protected boolean doTriggerProc() throws Exception {
120:                this .startProcessTime = System.currentTimeMillis();
121:                return doTrigger();
122:            }
123:
124:            protected abstract boolean doTrigger() throws Exception;
125:
126:            protected void doProcessFault(NormalizedMessage nm, String fault)
127:                    throws Exception {
128:
129:                FaultHandler fh = new FaultHandler(fault);
130:
131:                logger.error(getLocalErrorMsg() + " reports error: "
132:                        + fh.getMessage());
133:                if (fh.getEndpointString() != null)
134:                    logger.info("The error occured at endpoint: "
135:                            + fh.getEndpointString());
136:
137:                if (fh.getRemedy() != null) {
138:                    logger.info("Remedy: " + fh.getRemedy());
139:                }
140:
141:                if (logger.isDebugEnabled() && fh.getDetail() != null) {
142:                    logger.debug("Error details: " + fh.getDetail());
143:                }
144:            }
145:
146:            protected abstract void doProcessOut(NormalizedMessage nm,
147:                    String s, MessageExchange me) throws Exception;
148:
149:            private String getLocalErrorMsg() {
150:                return "Consumer endpoint '" + endpoint.getEndpoint() + "' ";
151:            }
152:
153:            /**
154:             * The process() method is called from the child consumer processor when it
155:             * read data from external connection and turn it into an MessageExchange
156:             * and route to NMR.
157:             */
158:            public void process(Object data, IConsumerHandlerContext chContext) {
159:                // logger.debug("Endpoint's DefaultMEP :" + endpoint.getDefaultMep());
160:                // if (endpoint.getDefaultMep().equals(WsdlMepConstants.IN_ONLY)) {
161:                // handleInOnly(data);
162:                // } else if (endpoint.getDefaultMep().equals(WsdlMepConstants.IN_OUT))
163:                // {
164:                // handleInOut(data);
165:                // } else if
166:                // (endpoint.getDefaultMep().equals(WsdlMepConstants.ROBUST_IN_ONLY)) {
167:                // handleRobustInOnly(data);
168:                // } else
169:                // throw new JBIException("trying to process unknown MEP
170:                // \""+endpoint.getDefaultMep()+"\"");
171:
172:                if (logger.isDebugEnabled())
173:                    logger.debug("Endpoint's DefaultMEP :"
174:                            + endpoint.getDefaultMep());
175:
176:                MessageExchange me = null;
177:                try {
178:                    if (endpoint.getDefaultMep().equals(
179:                            WsdlMepConstants.IN_ONLY)) {
180:                        handleInOnly(data, me, chContext);
181:                    } else if (endpoint.getDefaultMep().equals(
182:                            WsdlMepConstants.IN_OUT)) {
183:                        handleInOut(data, me, chContext);
184:                    } else if (endpoint.getDefaultMep().equals(
185:                            WsdlMepConstants.ROBUST_IN_ONLY)) {
186:                        handleRobustInOnly(data, me, chContext);
187:                    } else {
188:
189:                        logger.warn("trying to process unknown MEP \""
190:                                + endpoint.getDefaultMep() + "\"");
191:                    }
192:                    this .endpoint.sendMessageProcessedNotification(System
193:                            .currentTimeMillis()
194:                            - this .startProcessTime);
195:
196:                } catch (JBIException e) {
197:                    ErrorUtil.printError(getLocalErrorMsg()
198:                            + " encountered with JBI errors: ", e);
199:                    ErrorDb.write(e, me, logger);
200:
201:                }
202:                // catch (CbesbException e2) {
203:                // ErrorUtil.printWarn("Consumer endpoint '" + endpoint.getEndpoint() +
204:                // "' encountered with errors: ", e2);
205:                // }
206:
207:            }
208:
209:            /**
210:             * @deprecated
211:             * 
212:             * The process() method is called from the child consumer processor when it
213:             * read data from external connection and turn it into an MessageExchange
214:             * and route to NMR.
215:             * 
216:             */
217:            public void process(Object data) {
218:                process(data, null);
219:
220:            }
221:
222:            /**
223:             * 
224:             * The transform() method is to transform the data read from connection into
225:             * NormalizedMessage in the MessageExchange. This implementation expects
226:             * "data" to be an instance of ExternalInput. Subclasses can override this
227:             * if they have other requirements. Subclasses can also override this if
228:             * they need to copy metadata values into the exchange. Most classes should
229:             * still be able to use super.transform() to do the actual data copy.
230:             * 
231:             * @param data
232:             * @param me
233:             * @throws Exception
234:             */
235:            protected void transform(Object data, MessageExchange me,
236:                    IConsumerHandlerContext context) throws Exception {
237:                ExternalInput ext = (ExternalInput) data;
238:                NormalizedMessage msg = me.createMessage();
239:                ext.populateMessage(msg, getSvcDescHandlerInstance());
240:                me.setMessage(msg, "in");
241:            }
242:
243:            protected ServiceDescriptionHandler getSvcDescHandlerInstance() {
244:                if (serviceDescriptionHandler == null) {
245:                    logger
246:                            .debug("Attempting to retreive Service Unit Descriptor");
247:                    ServiceUnitDescriptor suDescriptor = endpoint
248:                            .getServiceUnit().getServiceUnitDescriptor();
249:                    if (suDescriptor != null) {
250:                        Vector<ServiceInfo> consumes = suDescriptor
251:                                .getConsumes();
252:                        if (consumes.size() > 0) {
253:                            ServiceInfo svcInfo = consumes.elementAt(0);
254:                            logger.debug("Target Service Info: "
255:                                    + svcInfo.toString());
256:                            ComponentContext context = endpoint
257:                                    .getServiceUnit().getComponent()
258:                                    .getComponentContext();
259:                            serviceDescriptionHandler = ServiceDescriptionHandler
260:                                    .getInstance(svcInfo, context);
261:                        }
262:                    }
263:                }
264:                return serviceDescriptionHandler;
265:            }
266:
267:            protected boolean handleInOnly(Object data, MessageExchange me,
268:                    IConsumerHandlerContext chContext) throws JBIException {
269:                boolean sended = false;
270:
271:                ComponentContext context = endpoint.getServiceUnit()
272:                        .getComponent().getComponentContext();
273:                me = endpoint.getChannel().createExchangeFactory()
274:                        .createInOnlyExchange();
275:                String endpointKey = "{"
276:                        + endpoint.getService().getNamespaceURI() + "}"
277:                        + endpoint.getService().getLocalPart() + ":"
278:                        + endpoint.getEndpoint();
279:                me.setProperty(MetadataConstants.SENDER_ENDPOINT_PROPERTY,
280:                        endpointKey);
281:                try {
282:                    transform(data, me, chContext);
283:                } catch (Exception e) {
284:                    // just report an warning. not much you can do
285:                    ErrorUtil
286:                            .printError(
287:                                    "Consumer endpoint '"
288:                                            + endpoint.getEndpoint()
289:                                            + "' failed to populate the InOnly MessageExchange ",
290:                                    e);
291:                    return false;
292:                }
293:
294:                logger.debug("Consumer endpoint service="
295:                        + endpoint.getService() + "  endpoint="
296:                        + endpoint.getEndpoint());
297:                ServiceEndpoint linkedEndpoint = context.getEndpoint(endpoint
298:                        .getService(), endpoint.getEndpoint());
299:                logger.debug("Got target endpoint " + linkedEndpoint
300:                        + "   service=" + linkedEndpoint.getServiceName()
301:                        + "   endpoint=" + linkedEndpoint.getEndpointName());
302:
303:                me.setEndpoint(linkedEndpoint);
304:                me.setService(endpoint.getService());
305:
306:                // TODO LU : If we change to use send(); the ME will be received from
307:                // NMR
308:                // we can use the processInonly() method to continue the flow.
309:                endpoint.getChannel().sendSync(me);
310:                if (ExchangeStatus.DONE.equals(me.getStatus())) {
311:                    sended = true;
312:                }
313:                return sended;
314:            }
315:
316:            protected void handleInOut(Object data, MessageExchange me,
317:                    IConsumerHandlerContext chContext) throws JBIException {
318:
319:                // This was missing from in-out causing a null destination
320:                ComponentContext context = endpoint.getServiceUnit()
321:                        .getComponent().getComponentContext();
322:                me = endpoint.getChannel().createExchangeFactory()
323:                        .createInOutExchange();
324:                String endpointKey = "{"
325:                        + endpoint.getService().getNamespaceURI() + "}"
326:                        + endpoint.getService().getLocalPart() + ":"
327:                        + endpoint.getEndpoint();
328:                me.setProperty(MetadataConstants.SENDER_ENDPOINT_PROPERTY,
329:                        endpointKey);
330:
331:                logger.debug("Consumer endpoint service="
332:                        + endpoint.getService() + "  endpoint="
333:                        + endpoint.getEndpoint());
334:                ServiceEndpoint linkedEndpoint = context.getEndpoint(endpoint
335:                        .getService(), endpoint.getEndpoint());
336:                logger.debug("Got target endpoint " + linkedEndpoint
337:                        + "   service=" + linkedEndpoint.getServiceName()
338:                        + "   endpoint=" + linkedEndpoint.getEndpointName());
339:
340:                me.setEndpoint(linkedEndpoint);
341:                me.setService(endpoint.getService());
342:
343:                try {
344:                    transform(data, me, chContext);
345:                } catch (Exception e) {
346:                    // just report an warning. not much you can do
347:                    ErrorUtil.printError(getLocalErrorMsg()
348:                            + " failed to populate the InOut exchange ", e);
349:                    return;
350:                }
351:
352:                // TODO LU : If we change to use send(); the ME will be received from
353:                // NMR
354:                // we can use the processInOut() method to continue the flow.
355:
356:                endpoint.getChannel().sendSync(me);
357:
358:                try {
359:                    // Maybe it had an error in called service
360:                    if ((ExchangeStatus.ERROR).equals(me.getStatus())
361:                            || me.getFault() != null) {
362:                        handleErrorFault(me, data);
363:                    } else {
364:                        NormalizedMessage nm = me.getMessage("out");
365:                        if (nm != null)
366:                            doProcessOut(nm, me);
367:                        else {
368:
369:                            logger
370:                                    .error(getLocalErrorMsg()
371:                                            + "does get the out message returned for InOut exchange");
372:                            ErrorDb
373:                                    .write(
374:                                            new Exception(
375:                                                    getLocalErrorMsg()
376:                                                            + "does get the out message returned for InOut exchange"),
377:                                            me, logger);
378:                        }
379:                        me.setStatus(ExchangeStatus.DONE);
380:                        endpoint.getChannel().send(me);
381:                    }
382:
383:                } catch (CbesbException ex) {
384:                    ErrorUtil.printError(getLocalErrorMsg()
385:                            + "fails to process the out message. ", ex);
386:                    ErrorDb.write(ex, me, logger);
387:                    // throw new JBIException(ex.getMessage());
388:                }
389:            }
390:
391:            protected boolean handleRobustInOnly(Object objMsg,
392:                    MessageExchange me, IConsumerHandlerContext chContext)
393:                    throws JBIException {
394:                boolean sended = false;
395:
396:                ComponentContext context = endpoint.getServiceUnit()
397:                        .getComponent().getComponentContext();
398:
399:                me = endpoint.getChannel().createExchangeFactory()
400:                        .createRobustInOnlyExchange();
401:                String endpointKey = "{"
402:                        + endpoint.getService().getNamespaceURI() + "}"
403:                        + endpoint.getService().getLocalPart() + ":"
404:                        + endpoint.getEndpoint();
405:                me.setProperty(MetadataConstants.SENDER_ENDPOINT_PROPERTY,
406:                        endpointKey);
407:
408:                logger.debug("Consumer endpoint service="
409:                        + endpoint.getService() + "  endpoint="
410:                        + endpoint.getEndpoint());
411:                ServiceEndpoint linkedEndpoint = context.getEndpoint(endpoint
412:                        .getService(), endpoint.getEndpoint());
413:                logger.debug("Got target endpoint " + linkedEndpoint
414:                        + "   service=" + linkedEndpoint.getServiceName()
415:                        + "   endpoint=" + linkedEndpoint.getEndpointName());
416:
417:                me.setEndpoint(linkedEndpoint);
418:                me.setService(endpoint.getService());
419:
420:                try {
421:                    transform(objMsg, me, chContext);
422:                } catch (Exception e) {
423:                    ErrorUtil.printError(getLocalErrorMsg()
424:                            + " failed to populate the RobustInOnly exchange ",
425:                            e);
426:                    return false;
427:                }
428:                endpoint.getChannel().sendSync(me);
429:
430:                ExchangeStatus status = me.getStatus();
431:
432:                if (status.equals(ExchangeStatus.ERROR)
433:                        || me.getFault() != null) {
434:                    // Notify error to external service
435:                    handleErrorFault(me, objMsg);
436:                    sended = true;
437:                } else if (status.equals(ExchangeStatus.DONE)) {
438:                    sended = true;
439:                }
440:                return sended;
441:
442:            }
443:
444:            /**
445:             * Get fault message and do something with it; The child class need to
446:             * overwrite this one to provide more action such as write into a File, put
447:             * into databasee and etc al.
448:             * 
449:             * @param me
450:             * @param data
451:             */
452:            protected void handleErrorFault(MessageExchange me, Object data)
453:                    throws JBIException {
454:
455:                NormalizedMessage nm = me.getFault();
456:
457:                if (nm != null) {
458:                    try {
459:                        doProcessFault(nm);
460:                    } catch (CbesbException e) {
461:                        throw new JBIException(
462:                                "Exception in processing fault: ", e);
463:                    }
464:
465:                    ErrorDb.write(new Exception("fault:" + nm.toString()), me,
466:                            logger);
467:
468:                    me.setStatus(ExchangeStatus.DONE);
469:                    endpoint.getChannel().send(me);
470:
471:                } else {
472:                    // Must be an error
473:                    logger.error(me.getError().getMessage());
474:                    if (logger.isDebugEnabled())
475:                        logger.debug(me.getError());
476:
477:                    ErrorDb.write(me.getError(), me, logger);
478:
479:                    throw new JBIException("General Exception.", me.getError());
480:                }
481:
482:            }
483:
484:            protected void doProcessFault(NormalizedMessage nm)
485:                    throws JBIException, CbesbException {
486:                String fault = null;
487:                if (nm != null) {
488:                    fault = SourceHelper.createString(nm.getContent());
489:                }
490:                try {
491:                    doProcessFault(nm, fault);
492:                } catch (Exception e) {
493:                    throw CbesbException.create(e);
494:                }
495:            }
496:
497:            protected void doProcessOut(NormalizedMessage nm, MessageExchange me)
498:                    throws JBIException, CbesbException {
499:                String response = null;
500:                if (nm != null) {
501:                    response = SourceHelper.createString(nm.getContent());
502:                }
503:                try {
504:                    doProcessOut(nm, response, me);
505:                } catch (Exception e) {
506:                    throw CbesbException.create(e);
507:                }
508:
509:            }
510:
511:        }
w_ww_.j___av___a___2s_.__c_o_m__ | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.