001: /*
002: * ChainBuilder ESB
003: * Visual Enterprise Integration
004: *
005: * Copyright (C) 2007 Bostech Corporation
006: *
007: * This program is free software; you can redistribute it and/or modify it
008: * under the terms of the GNU General Public License as published by the
009: * Free Software Foundation; either version 2 of the License, or (at your option)
010: * any later version.
011: *
012: * This program is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
014: * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
015: * for more details.
016: *
017: * You should have received a copy of the GNU General Public License along with
018: * this program; if not, write to the Free Software Foundation, Inc.,
019: * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
020: *
021: *
022: * $Id: CbEmbeddedSchedulerConsumerProcessor.java 11077 2007-12-20 15:30:49Z mpreston $
023: */
024: package com.bostechcorp.cbesb.runtime.ccsl.jbi.messaging;
025:
026: import java.io.File;
027: import java.util.Iterator;
028:
029: import javax.jbi.JBIException;
030: import javax.jbi.messaging.MessageExchange;
031: import javax.jbi.messaging.NormalizedMessage;
032:
033: import com.bostechcorp.cbesb.common.util.EsbPathHelper;
034: import com.bostechcorp.cbesb.runtime.scheduler.AutoRetrySchedule;
035: import com.bostechcorp.cbesb.runtime.scheduler.ComponentSchedule;
036: import com.bostechcorp.cbesb.runtime.scheduler.HolidaySchedule;
037: import com.bostechcorp.cbesb.runtime.scheduler.ISchedule;
038: import com.bostechcorp.cbesb.runtime.scheduler.SchedulerCore;
039: import com.bostechcorp.cbesb.runtime.scheduler.StandardSchedule;
040:
041: /**
042: * This class is used for the Consumer Processor in binding components that
043: * can support a poll interval or schedule. This consumer uses an embedded
044: * scheduler runtime instead of relying on a separated scheduler component
045: * and message bases triggers. The concrete component's endpoint class
046: * should return an instance of this class for the consumer processor.
047: */
048: public class CbEmbeddedSchedulerConsumerProcessor extends
049: CbConsumerProcessor {
050:
051: protected ScheduledEndpointProcessor endpoint;
052: protected boolean isRunning;
053: protected SchedulerCore scheduler;
054:
055: /**
056: * @param endpoint
057: */
058: public CbEmbeddedSchedulerConsumerProcessor(
059: ScheduledEndpointProcessor endpoint) {
060: super (endpoint);
061: this .endpoint = endpoint;
062: if (endpoint.getMode() == ScheduledEndpointProcessor.MODE_SCHEDULE) {
063: scheduler = new SchedulerCore();
064: }
065: }
066:
067: /* (non-Javadoc)
068: * @see com.bostechcorp.cbesb.runtime.ccsl.jbi.messaging.CbConsumerProcessor#doStart()
069: */
070: @Override
071: protected void doStart() throws Exception {
072: logger.debug("Scheduled Consumer Processor - doStart");
073: endpoint.getHandler().doStart();
074: if (endpoint.getMode() == ScheduledEndpointProcessor.MODE_POLL) {
075: isRunning = true;
076: poll();
077: } else {
078: ComponentSchedule schedule = loadSchedule(endpoint
079: .getScheduleFile());
080: submitSchedule(schedule);
081: scheduler.start();
082: }
083: }
084:
085: /* (non-Javadoc)
086: * @see com.bostechcorp.cbesb.runtime.ccsl.jbi.messaging.CbConsumerProcessor#doStop()
087: */
088: @Override
089: protected void doStop() throws Exception {
090: if (endpoint.getMode() == ScheduledEndpointProcessor.MODE_POLL) {
091: isRunning = false;
092: } else {
093: String jobGroup = "{"
094: + endpoint.getService().getNamespaceURI() + "}"
095: + endpoint.getService().getLocalPart();
096: String jobName = endpoint.getEndpoint();
097: scheduler.removeJob(jobName, jobGroup);
098: //bug 338 - this will stop all scheduled jobs for all Service Units,
099: // don't shutdown the entire scheduler
100: // scheduler.shutdown();
101: }
102: endpoint.getHandler().doStop();
103: }
104:
105: private ComponentSchedule loadSchedule(String scheduleName)
106: throws Exception {
107: String rootPath = endpoint.getServiceUnit().getRootPath();
108: File schedFile = new File(rootPath + File.separator
109: + scheduleName);
110: ComponentSchedule compSched = new ComponentSchedule();
111: compSched.load(schedFile);
112: return compSched;
113: }
114:
115: private void submitSchedule(ComponentSchedule schedule)
116: throws Exception {
117: String jobGroup = "{" + endpoint.getService().getNamespaceURI()
118: + "}" + endpoint.getService().getLocalPart();
119: String jobName = endpoint.getEndpoint();
120:
121: for (Iterator iter = schedule.getSchedules().iterator(); iter
122: .hasNext();) {
123: ISchedule sched = (ISchedule) iter.next();
124: if (sched instanceof StandardSchedule) {
125: StandardScheduleProcessor schedProcessor = new StandardScheduleProcessor(
126: endpoint);
127: String holSched = ((StandardSchedule) sched)
128: .getHolidaySchedule();
129: if (holSched != null) {
130: loadHolidaySchedule(holSched);
131: }
132: scheduler.submitStandardJob(jobName, jobGroup,
133: schedProcessor, (StandardSchedule) sched);
134:
135: } else if (sched instanceof AutoRetrySchedule) {
136: AutoRetryScheduleProcessor schedProcessor = new AutoRetryScheduleProcessor(
137: endpoint);
138: String holSched = ((AutoRetrySchedule) sched)
139: .getHolidaySchedule();
140: if (holSched != null) {
141: loadHolidaySchedule(holSched);
142: }
143: schedProcessor
144: .setSuccessEndpointServiceName(((AutoRetrySchedule) sched)
145: .getSuccessNotificationService());
146: schedProcessor
147: .setSuccessEndpointEPName(((AutoRetrySchedule) sched)
148: .getSuccessNotificationEndpoint());
149: schedProcessor
150: .setFailureEndpointServiceName(((AutoRetrySchedule) sched)
151: .getFailureNotificationService());
152: schedProcessor
153: .setFailureEndpointEPName(((AutoRetrySchedule) sched)
154: .getFailureNotificationEndpoint());
155: scheduler.submitAutoRetryJob(jobName, jobGroup,
156: schedProcessor, (AutoRetrySchedule) sched);
157: }
158: }
159:
160: }
161:
162: public void loadHolidaySchedule(String holidayFile)
163: throws Exception {
164: String fullPath = EsbPathHelper.getFullPathForDef(holidayFile);
165:
166: HolidaySchedule holSched = new HolidaySchedule();
167: if (holSched.load(new File(fullPath))) {
168: try {
169: scheduler.submitHolidaySchedule(holidayFile, holSched);
170: logger.debug("Holiday Schedule '" + holidayFile
171: + "' loaded.");
172:
173: } catch (Exception e) {
174: logger.error("Unable to load Holiday Schedule '"
175: + holidayFile + "'", e);
176: }
177:
178: } else {
179: logger.error("Unable to load Holiday Schedule '"
180: + holidayFile + "'");
181: }
182: }
183:
184: protected void poll() throws Exception {
185: while (isRunning) {
186: try {
187: endpoint.getHandler().doTriggerProc();
188: } catch (Exception e) {
189: logger
190: .error(
191: "ScheduledConsumer caught exception during poll interval.",
192: e);
193: }
194: try {
195: Thread.sleep(endpoint.getPollInterval());
196: } catch (InterruptedException e) {
197: logger.error("Sleep was interrupted", e);
198: }
199: }
200: }
201:
202: /* (non-Javadoc)
203: * @see com.bostechcorp.cbesb.runtime.ccsl.jbi.messaging.CbConsumerProcessor#doProcessFault(javax.jbi.messaging.NormalizedMessage, java.lang.String)
204: */
205: @Override
206: protected void doProcessFault(NormalizedMessage nm, String fault)
207: throws JBIException {
208: //Not used here, handled by ScheduledProcessHandler
209: }
210:
211: /* (non-Javadoc)
212: * @see com.bostechcorp.cbesb.runtime.ccsl.jbi.messaging.CbConsumerProcessor#doProcessOut(javax.jbi.messaging.NormalizedMessage, java.lang.String, javax.jbi.messaging.MessageExchange)
213: */
214: @Override
215: protected void doProcessOut(NormalizedMessage nm, String s,
216: MessageExchange me) throws JBIException {
217: //Not used here, handled by ScheduledProcessHandler
218: }
219:
220: }
|