001: /*
002: * Copyright 2006-2007 The Scriptella Project Team.
003: *
004: * Licensed under the Apache License, Version 2.0 (the "License");
005: * you may not use this file except in compliance with the License.
006: * You may obtain a copy of the License at
007: *
008: * http://www.apache.org/licenses/LICENSE-2.0
009: *
010: * Unless required by applicable law or agreed to in writing, software
011: * distributed under the License is distributed on an "AS IS" BASIS,
012: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013: * See the License for the specific language governing permissions and
014: * limitations under the License.
015: */
016: package scriptella.execution;
017:
018: import scriptella.configuration.ConfigurationEl;
019: import scriptella.configuration.ConfigurationFactory;
020: import scriptella.core.Session;
021: import scriptella.core.SystemException;
022: import scriptella.core.ThreadSafe;
023: import scriptella.interactive.ProgressCallback;
024: import scriptella.interactive.ProgressIndicator;
025: import scriptella.util.CollectionUtils;
026: import scriptella.util.IOUtils;
027:
028: import java.io.File;
029: import java.net.MalformedURLException;
030: import java.net.URL;
031: import java.util.Map;
032: import java.util.concurrent.Callable;
033: import java.util.logging.Level;
034: import java.util.logging.Logger;
035:
036: /**
037: * Executor for ETL files.
038: * <p>Use {@link ConfigurationFactory} to parse script files and to configure
039: * the executor.
040: * <p>The usage scenario of this class may be described using the following steps:
041: * <ul>
042: * <li>{@link #EtlExecutor(scriptella.configuration.ConfigurationEl)} Create an instance of this class
043: * and pass a {@link scriptella.configuration.ConfigurationFactory#createConfiguration() script file configuration}.
044: * <li>{@link #execute() Execute} the script
045: * </ul>
046: * </pre></code>
047: * <p>Additionally simplified helper methods are declared in this class:
048: * <ul>
049: * <li>{@link #newExecutor(java.io.File)}
050: * <li>{@link #newExecutor(java.net.URL)}
051: * <li>{@link #newExecutor(java.net.URL, java.util.Map)}
052: * </ul>
053: * <p/>
054: * <h3>ETL Cancellation</h3>
055: * Scriptella execution model relies on a standard Java {@link Thread#interrupt()} mechanism.
056: * <p>To interrupt the ETL execution invoke {@link Thread#interrupt()} on a thread
057: * which {@link #execute() started} ETL operation. As a part of interruption process
058: * the engine tries to roll back all changes made during the ETL operation.
059: * <p>{@link java.util.concurrent.ExecutorService} and {@link java.util.concurrent.Future}
060: * can also be used to control ETL execution.
061: * <h3>Integration with third-party systems</h3>
062: * For convenience EtlExecutor implements {@link Runnable} and {@link java.util.concurrent.Callable}.
063: * This feature simplifies integration of Scriptella executors with {@link java.util.concurrent.Executors}
064: * or other systems like Spring/Quartz etc. It also minimizes application code dependency on Scriptella.
065: *
066: * @author Fyodor Kupolov
067: * @version 1.0
068: */
069: public class EtlExecutor implements Runnable,
070: Callable<ExecutionStatistics> {
071: private static final Logger LOG = Logger
072: .getLogger(EtlExecutor.class.getName());
073: private ConfigurationEl configuration;
074: private boolean jmxEnabled;
075:
076: /**
077: * Creates ETL executor.
078: */
079: public EtlExecutor() {
080: }
081:
082: /**
083: * Creates an ETL executor for specified configuration file.
084: *
085: * @param configuration ETL configuration.
086: */
087: public EtlExecutor(ConfigurationEl configuration) {
088: this .configuration = configuration;
089: }
090:
091: /**
092: * Returns ETL configuration for this executor.
093: *
094: * @return ETL configuration.
095: */
096: public ConfigurationEl getConfiguration() {
097: return configuration;
098: }
099:
100: /**
101: * Sets ETL configuration.
102: *
103: * @param configuration ETL configuration.
104: */
105: public void setConfiguration(final ConfigurationEl configuration) {
106: this .configuration = configuration;
107: }
108:
109: /**
110: * Returns true if monitoring/management via JMX is enabled.
111: * <p>If jmxEnabled=true the executor registers MBeans for executed ETL files.
112: * The object names of the mbeans have the following form:
113: * <code>scriptella: type=etl,url="ETL_FILE_URL"</code>
114: *
115: * @return true if monitoring/management via JMX is enabled.
116: */
117: public boolean isJmxEnabled() {
118: return jmxEnabled;
119: }
120:
121: /**
122: * Enables or disables ETL monitoring/management via JMX.
123: * <p>If jmxEnabled=true the executor registers MBeans for executed ETL files.
124: * The object names of the mbeans have the following form:
125: * <code>scriptella: type=etl,url="ETL_FILE_URL"</code>
126: *
127: * @param jmxEnabled true if monitoring/management via JMX is enabled.
128: * @see scriptella.execution.JmxEtlManagerMBean
129: */
130: public void setJmxEnabled(boolean jmxEnabled) {
131: this .jmxEnabled = jmxEnabled;
132: }
133:
134: /**
135: * Executes ETL based on a specified configuration.
136: *
137: * @return execution statistics for ETL execution.
138: * @throws EtlExecutorException if ETL fails.
139: * @see #execute(scriptella.interactive.ProgressIndicator)
140: */
141: @ThreadSafe
142: public ExecutionStatistics execute() throws EtlExecutorException {
143: return execute((ProgressIndicator) null);
144: }
145:
146: /**
147: * Executes ETL based on a specified configuration.
148: *
149: * @param indicator progress indicator to use.
150: * @return execution statistics for ETL execution.
151: * @throws EtlExecutorException if ETL fails.
152: */
153: @ThreadSafe
154: public ExecutionStatistics execute(final ProgressIndicator indicator)
155: throws EtlExecutorException {
156: EtlContext ctx = null;
157: JmxEtlManager etlManager = null;
158:
159: try {
160: ctx = prepare(indicator);
161: if (jmxEnabled) {
162: etlManager = new JmxEtlManager(ctx);
163: etlManager.register();
164: }
165: execute(ctx);
166: ctx.getProgressCallback().step(5, "Commiting transactions");
167: commitAll(ctx);
168: } catch (Throwable e) {
169: if (ctx != null) {
170: rollbackAll(ctx);
171: }
172: throw new EtlExecutorException(e);
173: } finally {
174: if (ctx != null) {
175: closeAll(ctx);
176: ctx.getStatisticsBuilder().etlComplete();
177: ctx.getProgressCallback().complete();
178: }
179: if (etlManager != null) {
180: etlManager.unregister();
181: }
182: }
183:
184: return ctx.getStatisticsBuilder().getStatistics();
185: }
186:
187: void rollbackAll(final EtlContext ctx) {
188: try {
189: ctx.session.rollback();
190: } catch (Exception e) {
191: LOG.log(Level.SEVERE, "Unable to rollback script", e);
192: }
193: }
194:
195: void commitAll(final EtlContext ctx) {
196: ctx.session.commit();
197: }
198:
199: void closeAll(final EtlContext ctx) {
200: ctx.session.close();
201: }
202:
203: private void execute(final EtlContext ctx) {
204: final ProgressCallback oldProgress = ctx.getProgressCallback();
205:
206: final ProgressCallback p = oldProgress.fork(85, 100);
207: final ProgressCallback p2 = p.fork(100);
208: ctx.setProgressCallback(p2);
209: ctx.session.execute(ctx);
210: p.complete();
211: ctx.setProgressCallback(oldProgress);
212: }
213:
214: /**
215: * Prepares the scripts context.
216: *
217: * @param indicator progress indicator to use.
218: * @return prepared scripts context.
219: */
220: protected EtlContext prepare(final ProgressIndicator indicator) {
221: EtlContext ctx = new EtlContext();
222: ctx.getStatisticsBuilder().etlStarted();
223: ctx.setBaseURL(configuration.getDocumentUrl());
224: ctx.setProgressCallback(new ProgressCallback(100, indicator));
225:
226: final ProgressCallback progress = ctx.getProgressCallback();
227: progress.step(1, "Initializing properties");
228: ctx.setProperties(configuration.getParameters());
229: ctx.setProgressCallback(progress.fork(9, 100));
230: ctx.session = new Session(configuration, ctx);
231: ctx.getProgressCallback().complete();
232: ctx.setProgressCallback(progress); //Restoring
233:
234: return ctx;
235: }
236:
237: /**
238: * Converts file to URL and invokes {@link #newExecutor(java.net.URL)}.
239: *
240: * @param scriptFile ETL file.
241: * @return configured instance of script executor.
242: * @see #newExecutor(java.net.URL)
243: */
244: public static EtlExecutor newExecutor(final File scriptFile) {
245: try {
246: return newExecutor(IOUtils.toUrl(scriptFile));
247: } catch (MalformedURLException e) {
248: throw new IllegalArgumentException(e.getMessage(), e);
249: }
250: }
251:
252: /**
253: * Helper method to create a new ScriptExecutor for specified script URL.
254: * <p>Calls {@link #newExecutor(java.net.URL, java.util.Map)} and passes {@link System#getProperties() System properties}
255: * as external properties.
256: *
257: * @param scriptFileUrl URL of script file.
258: * @return configured instance of script executor.
259: */
260: @ThreadSafe
261: public static EtlExecutor newExecutor(final URL scriptFileUrl) {
262: return newExecutor(scriptFileUrl, CollectionUtils.asMap(System
263: .getProperties()));
264: }
265:
266: /**
267: * Helper method to create a new ScriptExecutor for specified script URL.
268: *
269: * @param scriptFileUrl URL of script file.
270: * @param externalProperties see {@link ConfigurationFactory#setExternalParameters(java.util.Map)}
271: * @return configured instance of script executor.
272: * @see ConfigurationFactory
273: */
274: @ThreadSafe
275: public static EtlExecutor newExecutor(final URL scriptFileUrl,
276: final Map<String, ?> externalProperties) {
277: ConfigurationFactory cf = new ConfigurationFactory();
278: cf.setResourceURL(scriptFileUrl);
279: if (externalProperties != null) {
280: cf.setExternalParameters(externalProperties);
281: }
282: return new EtlExecutor(cf.createConfiguration());
283: }
284:
285: //Runnable/Callable convenience interfaces
286:
287: /**
288: * A runnable adapter for {@link #execute()} method.
289: * <p>Please note that due to a checked
290: * exceptions limitation a {@link scriptella.core.SystemException} is thrown instead of
291: * the {@link scriptella.execution.EtlExecutorException}.
292: *
293: * @throws SystemException a wrapped {@link scriptella.execution.EtlExecutorException}.
294: * @see #execute()
295: */
296: public void run() throws SystemException {
297: try {
298: execute();
299: } catch (EtlExecutorException e) {
300: throw new SystemException(e.getMessage(), e);
301: }
302: }
303:
304: /**
305: * A synonym for {@link #execute()}.
306: */
307: public ExecutionStatistics call() throws EtlExecutorException {
308: return execute();
309: }
310: }
|