001: /*
002: * Copyright 2004-2006 the original author or authors.
003: *
004: * Licensed under the Apache License, Version 2.0 (the "License");
005: * you may not use this file except in compliance with the License.
006: * You may obtain a copy of the License at
007: *
008: * http://www.apache.org/licenses/LICENSE-2.0
009: *
010: * Unless required by applicable law or agreed to in writing, software
011: * distributed under the License is distributed on an "AS IS" BASIS,
012: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013: * See the License for the specific language governing permissions and
014: * limitations under the License.
015: */
016:
017: package org.compass.gps.device.jdbc;
018:
019: import java.sql.Connection;
020: import java.sql.PreparedStatement;
021: import java.sql.ResultSet;
022: import java.sql.ResultSetMetaData;
023: import java.sql.SQLException;
024: import java.util.ArrayList;
025: import java.util.Iterator;
026: import java.util.List;
027:
028: import org.compass.core.CompassException;
029: import org.compass.core.CompassSession;
030: import org.compass.core.Resource;
031: import org.compass.core.config.CommonMetaDataLookup;
032: import org.compass.core.mapping.CascadeMapping;
033: import org.compass.core.spi.InternalCompass;
034: import org.compass.core.spi.InternalCompassSession;
035: import org.compass.gps.CompassGpsException;
036: import org.compass.gps.device.jdbc.mapping.AutoGenerateMapping;
037: import org.compass.gps.device.jdbc.mapping.ColumnMapping;
038: import org.compass.gps.device.jdbc.mapping.ColumnToPropertyMapping;
039: import org.compass.gps.device.jdbc.mapping.ResultSetToResourceMapping;
040: import org.compass.gps.device.jdbc.mapping.VersionColumnMapping;
041: import org.compass.gps.device.jdbc.snapshot.ConfigureSnapshotEvent;
042: import org.compass.gps.device.jdbc.snapshot.CreateAndUpdateSnapshotEvent;
043: import org.compass.gps.device.jdbc.snapshot.DeleteSnapshotEvent;
044: import org.compass.gps.device.jdbc.snapshot.JdbcAliasRowSnapshot;
045: import org.compass.gps.device.jdbc.snapshot.JdbcAliasSnapshot;
046: import org.compass.gps.device.jdbc.snapshot.JdbcSnapshot;
047:
048: /**
049: * A gps device that index a jdbc <code>ResultSet</code> to a set of Compass
050: * <code>Resource</code>s. Each <code>Resource</code> maps to a
051: * <code>ResultSet</code> row. The device can handle multiple
052: * <code>ResultSet</code>s.
053: * <p>
054: * The device holds a list of
055: * {@link org.compass.gps.device.jdbc.mapping.ResultSetToResourceMapping}s
056: * (or derived classes like
057: * {@link org.compass.gps.device.jdbc.mapping.TableToResourceMapping}).
058: * Each one has all the required mappings setting to map the
059: * <code>ResultSet</code> with all it's rows to the set of corresponding
060: * <code>Resource</code>s.
061: * <p>
062: * The device can perform active data base mirroring. The mirror operation is
063: * enabled only if the mirror flag is enabled, and will execute against each
064: * mapping that
065: * {@link org.compass.gps.device.jdbc.mapping.ResultSetToResourceMapping#supportsVersioning()}.
066: * <p>
067: * The <code>autoDetectVersionColumnSqlType</code> setting (which defauls to
068: * <code>true</code>) will automatically set the version column jdbc type for
069: * mappings that support versioning.
070: *
071: * @author kimchy
072: * @see org.compass.gps.device.jdbc.mapping.ResultSetToResourceMapping
073: * @see org.compass.gps.device.jdbc.mapping.TableToResourceMapping
074: */
075: public class ResultSetJdbcGpsDevice extends
076: AbstractJdbcActiveMirrorGpsDevice {
077:
078: protected List mappings = new ArrayList();
079:
080: private JdbcSnapshot snapshot;
081:
082: private boolean autoDetectVersionColumnSqlType = true;
083:
084: /**
085: * performs operations on startup, such as auto generation of mappings for
086: * mappings that implement the {@link AutoGenerateMapping}, auto detection
087: * of version column jdbc type, and {@link JdbcSnapshot} loading (using the
088: * {@link org.compass.gps.device.jdbc.snapshot.JdbcSnapshotPersister}).
089: */
090: protected void doStart() throws CompassGpsException {
091: super .doStart();
092: // call auto generate for mappings that implement the AutoGenerate
093: // interface
094: for (Iterator it = mappings.iterator(); it.hasNext();) {
095: ResultSetToResourceMapping rsMapping = (ResultSetToResourceMapping) it
096: .next();
097: if (rsMapping instanceof AutoGenerateMapping) {
098: ((AutoGenerateMapping) rsMapping)
099: .generateMappings(dataSource);
100: }
101: }
102: // support for meta data lookup
103: CommonMetaDataLookup commonMetaDataLookup = new CommonMetaDataLookup(
104: ((InternalCompass) compassGps.getIndexCompass())
105: .getMetaData());
106: for (Iterator it = mappings.iterator(); it.hasNext();) {
107: ResultSetToResourceMapping rsMapping = (ResultSetToResourceMapping) it
108: .next();
109: rsMapping.setAlias(commonMetaDataLookup
110: .lookupAliasName(rsMapping.getAlias()));
111: for (Iterator it1 = rsMapping.mappingsIt(); it1.hasNext();) {
112: List columns = (List) it1.next();
113: for (Iterator it2 = columns.iterator(); it2.hasNext();) {
114: ColumnMapping columnMapping = (ColumnMapping) it2
115: .next();
116: if (columnMapping instanceof ColumnToPropertyMapping) {
117: ColumnToPropertyMapping columnToPropertyMapping = (ColumnToPropertyMapping) columnMapping;
118: columnToPropertyMapping
119: .setPropertyName(commonMetaDataLookup
120: .lookupMetaDataName(columnToPropertyMapping
121: .getPropertyName()));
122: }
123: }
124: }
125: }
126: // double check that all the result set mapping have Compass::Core
127: // resource mapping
128: for (Iterator it = mappings.iterator(); it.hasNext();) {
129: ResultSetToResourceMapping rsMapping = (ResultSetToResourceMapping) it
130: .next();
131: if (!compassGps.hasMappingForEntityForMirror(rsMapping
132: .getAlias(), CascadeMapping.Cascade.ALL)) {
133: throw new IllegalStateException(
134: buildMessage("No resource mapping defined in gps mirror compass for alias ["
135: + rsMapping.getAlias()
136: + "]. Did you defined a jdbc mapping builder?"));
137: }
138: if (!compassGps.hasMappingForEntityForIndex(rsMapping
139: .getAlias())) {
140: throw new IllegalStateException(
141: buildMessage("No resource mapping defined in gps index compass for alias ["
142: + rsMapping.getAlias()
143: + "]. Did you defined a jdbc mapping builder?"));
144: }
145: }
146: if (isAutoDetectVersionColumnSqlType()) {
147: if (log.isInfoEnabled()) {
148: log
149: .info(buildMessage("Auto detecting version column sql types"));
150: }
151: // set the version databse type
152: Connection connection = JdbcUtils.getConnection(dataSource);
153: PreparedStatement ps = null;
154: ResultSet rs = null;
155: try {
156: for (Iterator it = mappings.iterator(); it.hasNext();) {
157: ResultSetToResourceMapping mapping = (ResultSetToResourceMapping) it
158: .next();
159: if (!mapping.supportsVersioning()) {
160: continue;
161: }
162: ps = connection.prepareStatement(mapping
163: .getVersionQuery());
164: ps.setFetchSize(1);
165: rs = ps.executeQuery();
166: ResultSetMetaData metaData = rs.getMetaData();
167: for (Iterator verIt = mapping.versionMappingsIt(); verIt
168: .hasNext();) {
169: VersionColumnMapping versionMapping = (VersionColumnMapping) verIt
170: .next();
171: int columnIndex;
172: if (versionMapping.isUsingColumnIndex()) {
173: columnIndex = versionMapping
174: .getColumnIndex();
175: } else {
176: columnIndex = JdbcUtils
177: .getColumnIndexFromColumnName(
178: metaData, versionMapping
179: .getColumnName());
180: }
181: versionMapping.setSqlType(metaData
182: .getColumnType(columnIndex));
183: }
184: }
185: } catch (SQLException e) {
186: throw new JdbcGpsDeviceException(
187: buildMessage("Failed to find version column type"),
188: e);
189: } finally {
190: JdbcUtils.closeResultSet(rs);
191: JdbcUtils.closeStatement(ps);
192: JdbcUtils.closeConnection(connection);
193: }
194: }
195: if (isMirrorDataChanges()) {
196: if (log.isInfoEnabled()) {
197: log
198: .info(buildMessage("Using mirroring, loading snapshot data"));
199: }
200: // set up the snapshot
201: snapshot = getSnapshotPersister().load();
202: for (Iterator it = mappings.iterator(); it.hasNext();) {
203: ResultSetToResourceMapping mapping = (ResultSetToResourceMapping) it
204: .next();
205: if (mapping.supportsVersioning()
206: && snapshot
207: .getAliasSnapshot(mapping.getAlias()) == null) {
208: if (log.isDebugEnabled()) {
209: log
210: .debug(buildMessage("Alias ["
211: + mapping.getAlias()
212: + "] not found in snapshot data, creating..."));
213: }
214: JdbcAliasSnapshot aliasSnapshot = new JdbcAliasSnapshot(
215: mapping.getAlias());
216: snapshot.putAliasSnapshot(aliasSnapshot);
217: }
218: }
219: // configure the snapshot event listener
220: Connection connection = JdbcUtils.getConnection(dataSource);
221: try {
222: getSnapshotEventListener().configure(
223: new ConfigureSnapshotEvent(connection, dialect,
224: mappings));
225: } finally {
226: JdbcUtils.closeConnection(connection);
227: }
228: }
229: if (log.isDebugEnabled()) {
230: for (Iterator it = mappings.iterator(); it.hasNext();) {
231: log
232: .debug(buildMessage("Using DB Mapping "
233: + it.next()));
234: }
235: }
236: }
237:
238: /**
239: * Saves the {@link JdbcSnapshot}.
240: */
241: protected void doStop() throws CompassGpsException {
242: getSnapshotPersister().save(snapshot);
243: super .doStop();
244: }
245:
246: protected void doIndex(CompassSession session)
247: throws CompassGpsException {
248: // reset the snapshot data before we perform the index operation
249: snapshot = new JdbcSnapshot();
250: for (Iterator it = mappings.iterator(); it.hasNext();) {
251: ResultSetToResourceMapping mapping = (ResultSetToResourceMapping) it
252: .next();
253: if (mapping.supportsVersioning()) {
254: JdbcAliasSnapshot aliasSnapshot = new JdbcAliasSnapshot(
255: mapping.getAlias());
256: snapshot.putAliasSnapshot(aliasSnapshot);
257: }
258: }
259: super .doIndex(session);
260: // save the sanpshot data
261: getSnapshotPersister().save(snapshot);
262: }
263:
264: /**
265: * Returns the array of index execution with a size of the number of
266: * mappings.
267: */
268: protected IndexExecution[] doGetIndexExecutions(
269: Connection connection) throws SQLException,
270: JdbcGpsDeviceException {
271: IndexExecution[] indexExecutions = new IndexExecution[mappings
272: .size()];
273: for (int i = 0; i < indexExecutions.length; i++) {
274: ResultSetToResourceMapping mapping = (ResultSetToResourceMapping) mappings
275: .get(i);
276: indexExecutions[i] = new IndexExecution(mapping, mapping
277: .getSelectQuery());
278: }
279: return indexExecutions;
280: }
281:
282: /**
283: * Index the given <code>ResultSet</code> row into a Compass
284: * <code>Resource</code>.
285: */
286: protected Object processRowValue(Object description, ResultSet rs,
287: CompassSession session) throws SQLException,
288: CompassException {
289:
290: if (log.isDebugEnabled()) {
291: StringBuffer sb = new StringBuffer();
292: sb.append(buildMessage("Indexing data row with values "));
293: ResultSetMetaData metaData = rs.getMetaData();
294: for (int i = 1; i <= metaData.getColumnCount(); i++) {
295: sb.append("[").append(metaData.getColumnName(i))
296: .append(":");
297: String value = rs.getString(i);
298: if (rs.wasNull()) {
299: value = "(null)";
300: }
301: sb.append(value);
302: sb.append("] ");
303: }
304: log.debug(sb.toString());
305: }
306:
307: ResultSetToResourceMapping mapping = (ResultSetToResourceMapping) description;
308:
309: JdbcAliasRowSnapshot rowSnapshot = null;
310: if (shouldMirrorDataChanges() && mapping.supportsVersioning()) {
311: rowSnapshot = new JdbcAliasRowSnapshot();
312: }
313: Resource resource = ((InternalCompassSession) session)
314: .getCompass().getResourceFactory().createResource(
315: mapping.getAlias());
316: ResultSetRowMarshallHelper marshallHelper = new ResultSetRowMarshallHelper(
317: mapping, session, dialect, resource, rowSnapshot);
318: marshallHelper.marshallResultSet(rs);
319:
320: if (shouldMirrorDataChanges() && mapping.supportsVersioning()) {
321: snapshot.getAliasSnapshot(mapping.getAlias()).putRow(
322: rowSnapshot);
323: }
324:
325: return resource;
326: }
327:
328: /**
329: * Performs the data change mirroring operation.
330: */
331: public synchronized void performMirroring()
332: throws JdbcGpsDeviceException {
333: if (!shouldMirrorDataChanges() || isPerformingIndexOperation()) {
334: return;
335: }
336: if (snapshot == null) {
337: throw new IllegalStateException(
338: buildMessage("Versioning data was not properly initialized, did you index the device or loaded the data?"));
339: }
340: Connection connection = JdbcUtils.getConnection(dataSource);
341: PreparedStatement ps = null;
342: ResultSet rs = null;
343: try {
344: for (Iterator it = mappings.iterator(); it.hasNext();) {
345: ResultSetToResourceMapping mapping = (ResultSetToResourceMapping) it
346: .next();
347: if (!mapping.supportsVersioning()) {
348: continue;
349: }
350: JdbcAliasSnapshot oldAliasSnapshot = snapshot
351: .getAliasSnapshot(mapping.getAlias());
352: if (oldAliasSnapshot == null) {
353: log
354: .warn(buildMessage("No snapshot for alias ["
355: + mapping.getAlias()
356: + "] even though there should be support for versioning ignoring the alias"));
357: continue;
358: }
359: JdbcAliasSnapshot newAliasSnapshot = new JdbcAliasSnapshot(
360: mapping.getAlias());
361: ArrayList createdRows = new ArrayList();
362: ArrayList updatedRows = new ArrayList();
363: ArrayList deletedRows = new ArrayList();
364: if (log.isDebugEnabled()) {
365: log.debug(buildMessage("Executing version query ["
366: + mapping.getVersionQuery() + "]"));
367: }
368: ps = connection.prepareStatement(mapping
369: .getVersionQuery());
370: if (getFetchSize() > 0) {
371: ps.setFetchSize(getFetchSize());
372: }
373: rs = ps.executeQuery();
374: while (rs.next()) {
375:
376: if (log.isDebugEnabled()) {
377: StringBuffer sb = new StringBuffer();
378: sb
379: .append(buildMessage("Version row with values "));
380: ResultSetMetaData metaData = rs.getMetaData();
381: for (int i = 1; i <= metaData.getColumnCount(); i++) {
382: sb.append("[").append(
383: metaData.getColumnName(i)).append(
384: ":");
385: String value = rs.getString(i);
386: if (rs.wasNull()) {
387: value = "(null)";
388: }
389: sb.append(value);
390: sb.append("] ");
391: }
392: log.debug(sb.toString());
393: }
394:
395: JdbcAliasRowSnapshot newRowSnapshot = new JdbcAliasRowSnapshot();
396: ResultSetRowMarshallHelper marshallHelper = new ResultSetRowMarshallHelper(
397: mapping, dialect, newRowSnapshot,
398: compassGps.getMirrorCompass());
399: marshallHelper.marshallResultSet(rs);
400:
401: // new and old have the same ids
402: JdbcAliasRowSnapshot oldRowSnapshot = oldAliasSnapshot
403: .getRow(newRowSnapshot);
404:
405: // new row or updated row
406: if (oldRowSnapshot == null) {
407: createdRows.add(newRowSnapshot);
408: } else if (oldRowSnapshot
409: .isOlderThan(newRowSnapshot)) {
410: updatedRows.add(newRowSnapshot);
411: }
412:
413: newAliasSnapshot.putRow(newRowSnapshot);
414: }
415: for (Iterator oldRowIt = oldAliasSnapshot
416: .rowSnapshotIt(); oldRowIt.hasNext();) {
417: JdbcAliasRowSnapshot tmpRow = (JdbcAliasRowSnapshot) oldRowIt
418: .next();
419: // deleted row
420: if (newAliasSnapshot.getRow(tmpRow) == null) {
421: deletedRows.add(tmpRow);
422: }
423: }
424: if (!createdRows.isEmpty() || !updatedRows.isEmpty()) {
425: getSnapshotEventListener().onCreateAndUpdate(
426: new CreateAndUpdateSnapshotEvent(
427: connection, dialect, mapping,
428: createdRows, updatedRows,
429: compassGps));
430: }
431: if (!deletedRows.isEmpty()) {
432: getSnapshotEventListener().onDelete(
433: new DeleteSnapshotEvent(connection,
434: dialect, mapping, deletedRows,
435: compassGps));
436: }
437: snapshot.putAliasSnapshot(newAliasSnapshot);
438: }
439: } catch (SQLException e) {
440: throw new JdbcGpsDeviceException(
441: buildMessage("Failed while mirroring data changes"),
442: e);
443: } finally {
444: JdbcUtils.closeResultSet(rs);
445: JdbcUtils.closeStatement(ps);
446: JdbcUtils.closeConnection(connection);
447: }
448: if (isSaveSnapshotAfterMirror()) {
449: getSnapshotPersister().save(snapshot);
450: }
451: }
452:
453: /**
454: * Adds a mapping to be indexed and mirrored.
455: */
456: public void addMapping(ResultSetToResourceMapping mapping) {
457: this .mappings.add(mapping);
458: }
459:
460: /**
461: * Adds an array of mappings to be indexed and mirrored.
462: */
463: public void setMappings(ResultSetToResourceMapping[] mappingsArr) {
464: for (int i = 0; i < mappingsArr.length; i++) {
465: addMapping(mappingsArr[i]);
466: }
467: }
468:
469: /**
470: * Should the device auto detect the version columns jdbc type.
471: */
472: public boolean isAutoDetectVersionColumnSqlType() {
473: return autoDetectVersionColumnSqlType;
474: }
475:
476: /**
477: * Sets if the device auto detect the version columns jdbc type.
478: */
479: public void setAutoDetectVersionColumnSqlType(
480: boolean autoDetectVersionColumnSqlType) {
481: this.autoDetectVersionColumnSqlType = autoDetectVersionColumnSqlType;
482: }
483: }
|