001: /*
002: * GeoTools - OpenSource mapping toolkit
003: * http://geotools.org
004: * (C) 2002-2006, GeoTools Project Managment Committee (PMC)
005: *
006: * This library is free software; you can redistribute it and/or
007: * modify it under the terms of the GNU Lesser General Public
008: * License as published by the Free Software Foundation;
009: * version 2.1 of the License.
010: *
011: * This library is distributed in the hope that it will be useful,
012: * but WITHOUT ANY WARRANTY; without even the implied warranty of
013: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
014: * Lesser General Public License for more details.
015: */
016:
017: package org.geotools.data.postgis;
018:
019: import java.io.IOException;
020: import java.sql.Types;
021: import java.util.Arrays;
022: import java.util.Collections;
023: import java.util.HashSet;
024: import java.util.Iterator;
025: import java.util.Set;
026: import java.util.SortedSet;
027: import java.util.TreeSet;
028:
029: import org.geotools.data.AbstractFeatureStore;
030: import org.geotools.data.DataSourceException;
031: import org.geotools.data.DataStore;
032: import org.geotools.data.DataUtilities;
033: import org.geotools.data.DefaultQuery;
034: import org.geotools.data.DefaultTransaction;
035: import org.geotools.data.FeatureListener;
036: import org.geotools.data.FeatureLocking;
037: import org.geotools.data.FeatureReader;
038: import org.geotools.data.FeatureSource;
039: import org.geotools.data.FeatureWriter;
040: import org.geotools.data.Query;
041: import org.geotools.data.ReTypeFeatureReader;
042: import org.geotools.data.Transaction;
043: import org.geotools.data.VersioningFeatureStore;
044: import org.geotools.data.postgis.fidmapper.VersionedFIDMapper;
045: import org.geotools.data.store.EmptyFeatureCollection;
046: import org.geotools.data.store.ReTypingFeatureCollection;
047: import org.geotools.factory.CommonFactoryFinder;
048: import org.geotools.factory.Hints;
049: import org.geotools.feature.AttributeType;
050: import org.geotools.feature.Feature;
051: import org.geotools.feature.FeatureCollection;
052: import org.geotools.feature.FeatureType;
053: import org.geotools.feature.FeatureTypes;
054: import org.geotools.feature.IllegalAttributeException;
055: import org.geotools.feature.SchemaException;
056: import org.opengis.filter.Filter;
057: import org.opengis.filter.FilterFactory;
058: import org.opengis.filter.sort.SortBy;
059: import org.opengis.filter.sort.SortOrder;
060:
061: import com.vividsolutions.jts.geom.Envelope;
062:
063: /**
064: * A cheap implementation of a feature locking.
065: * <p>
066: * Implementation wise, for all locking needs, tries to leverage the wrapped datastore feature
067: * locking. If an optimization is possible (mass updates come to mind), we try to use the feature
068: * locking, otherwiser we fall back on the implementation inherited from AbstractFeatureSource.
069: * <p>
070: * {@link #modifyFeatures(AttributeType[], Object[], Filter)} is an example of things that cannot be
071: * optimized. Theoretically, one could mass expire current feature, but he should have first read
072: * into memory all of them to rewrite them as new (which may not be possible).
073: *
074: * @author aaime
075: * @since 2.4
076: *
077: */
078: public class VersionedPostgisFeatureStore extends AbstractFeatureStore
079: implements VersioningFeatureStore {
080:
081: private VersionedPostgisDataStore store;
082:
083: private FeatureLocking locking;
084:
085: private FeatureType schema;
086:
087: public VersionedPostgisFeatureStore(FeatureType schema,
088: VersionedPostgisDataStore store) throws IOException {
089: this .store = store;
090: this .schema = schema;
091: this .locking = (FeatureLocking) store.wrapped
092: .getFeatureSource(schema.getTypeName());
093: }
094:
095: // -----------------------------------------------------------------------------------------------
096: // STANDARD FEATURE STORE METHODS
097: // -----------------------------------------------------------------------------------------------
098:
099: public Transaction getTransaction() {
100: return locking.getTransaction();
101: }
102:
103: public void setTransaction(Transaction transaction) {
104: locking.setTransaction(transaction);
105: }
106:
107: public Envelope getBounds() throws IOException {
108: return getBounds(Query.ALL);
109: }
110:
111: public Envelope getBounds(Query query) throws IOException {
112: DefaultQuery versionedQuery = store
113: .buildVersionedQuery(getTypedQuery(query));
114: return locking.getBounds(versionedQuery);
115: }
116:
117: public int getCount(Query query) throws IOException {
118: DefaultQuery versionedQuery = store
119: .buildVersionedQuery(getTypedQuery(query));
120: return locking.getCount(versionedQuery);
121: }
122:
123: public DataStore getDataStore() {
124: return store;
125: }
126:
127: public void addFeatureListener(FeatureListener listener) {
128: store.listenerManager.addFeatureListener(this , listener);
129: }
130:
131: public FeatureType getSchema() {
132: return schema;
133: }
134:
135: public void removeFeatureListener(FeatureListener listener) {
136: store.listenerManager.removeFeatureListener(this , listener);
137: }
138:
139: public void modifyFeatures(AttributeType type, Object value,
140: Filter filter) throws IOException {
141: super .modifyFeatures(type, value, filter);
142: }
143:
144: public void modifyFeatures(AttributeType[] type, Object[] value,
145: Filter filter) throws IOException {
146: super .modifyFeatures(type, value, filter);
147: }
148:
149: public void removeFeatures(Filter filter) throws IOException {
150: // this we can optimize, it's a matter of mass updating the last
151: // revisions (and before that, we have to compute the modified envelope)
152: Filter versionedFilter = (Filter) store.buildVersionedFilter(
153: schema.getTypeName(), filter, new RevisionInfo());
154: Envelope bounds = locking.getBounds(new DefaultQuery(schema
155: .getTypeName(), versionedFilter));
156: Transaction t = getTransaction();
157: boolean autoCommit = false;
158: if (Transaction.AUTO_COMMIT.equals(t)) {
159: t = new DefaultTransaction();
160: autoCommit = true;
161: }
162: VersionedJdbcTransactionState state = store.wrapped
163: .getVersionedJdbcTransactionState(t);
164: locking.modifyFeatures(locking.getSchema().getAttributeType(
165: "expired"), new Long(state.getRevision()),
166: versionedFilter);
167: if (autoCommit) {
168: t.commit();
169: t.close();
170: }
171: store.listenerManager.fireFeaturesRemoved(schema.getTypeName(),
172: t, bounds, false);
173: }
174:
175: public void setFeatures(FeatureReader reader) throws IOException {
176: // remove everything, then add back
177: removeFeatures(Filter.INCLUDE);
178: addFeatures(reader);
179: }
180:
181: public FeatureCollection getFeatures(Query query)
182: throws IOException {
183: // feature collection is writable unfortunately, we have to rely on the
184: // default behaviour otherwise writes won't be versioned
185: // TODO: build a versioned feature collection that can do better, if possible at all
186: return super .getFeatures(query);
187: }
188:
189: public FeatureCollection getFeatures(Filter filter)
190: throws IOException {
191: // feature collection is writable unfortunately, we have to rely on the
192: // default behaviour otherwise writes won't be versioned
193: return super .getFeatures(filter);
194: }
195:
196: public FeatureCollection getFeatures() throws IOException {
197: // feature collection is writable unfortunately, we have to rely on the
198: // default behaviour otherwise writes won't be versioned
199: return super .getFeatures();
200: }
201:
202: public FeatureCollection getVersionedFeatures(Query query)
203: throws IOException {
204: final FeatureType ft = getSchema();
205:
206: // check the feature type is the right one
207: final String typeName = ft.getTypeName();
208: if (query.getTypeName() != null
209: && !query.getTypeName().equals(typeName))
210: throw new IOException(
211: "Incompatible type, this class can access only "
212: + typeName);
213:
214: // make sure the view is around
215: if (!Arrays.asList(store.wrapped.getTypeNames()).contains(
216: store.getVFCViewName(typeName)))
217: store.createVersionedFeatureCollectionView(typeName);
218:
219: // we have to hit the view
220: DefaultQuery vq = new DefaultQuery(query);
221: vq.setTypeName(VersionedPostgisDataStore
222: .getVFCViewName(typeName));
223: vq = store.buildVersionedQuery(vq);
224: FeatureCollection fc = store.wrapped.getFeatureSource(
225: VersionedPostgisDataStore.getVFCViewName(typeName))
226: .getFeatures(vq);
227: try {
228: final FeatureType fcSchema = fc.getSchema();
229: FeatureType renamedFt = FeatureTypes.newFeatureType(
230: fcSchema.getAttributeTypes(), ft.getTypeName(), ft
231: .getNamespace(), ft.isAbstract(), ft
232: .getAncestors(), fcSchema
233: .getDefaultGeometry());
234: return new ReTypingFeatureCollection(fc, renamedFt);
235: } catch (SchemaException e) {
236: throw new DataSourceException("Error ranming feature type",
237: e);
238: }
239: }
240:
241: public FeatureCollection getVersionedFeatures(Filter filter)
242: throws IOException {
243: return getVersionedFeatures(new DefaultQuery(null, filter));
244: }
245:
246: public FeatureCollection getVersionedFeatures() throws IOException {
247: return getVersionedFeatures(new DefaultQuery(getSchema()
248: .getTypeName()));
249: }
250:
251: // ---------------------------------------------------------------------------------------------
252: // VERSIONING EXTENSIONS
253: // ---------------------------------------------------------------------------------------------
254:
255: public void rollback(String toVersion, Filter filter,
256: String[] userIds) throws IOException {
257: // TODO: build an optimized version of this that can do the same work with a couple
258: // of queries assuming the filter is fully encodable
259:
260: // Gather feature modified after toVersion
261: ModifiedFeatureIds mfids = store.getModifiedFeatureFIDs(schema
262: .getTypeName(), toVersion, null, filter, userIds,
263: getTransaction());
264: FilterFactory ff = CommonFactoryFinder.getFilterFactory(null);
265:
266: // remove all features that have been created and not deleted
267: Set fidsToRemove = new HashSet(mfids.getCreated());
268: fidsToRemove.removeAll(mfids.getDeleted());
269: if (!fidsToRemove.isEmpty())
270: removeFeatures(store.buildFidFilter(ff, fidsToRemove));
271:
272: // reinstate all features that were there before toVersion and that
273: // have been deleted after it. Notice this is an insertion, so to preserve
274: // the fids I have to use low level writers where I can set all attributes manually
275: // (we work on the assumption the wrapped data store maps all attributes of the primary
276: // key in the feature itself)
277: Set fidsToRecreate = new HashSet(mfids.getDeleted());
278: fidsToRecreate.removeAll(mfids.getCreated());
279: if (!fidsToRecreate.isEmpty()) {
280: long revision = store.wrapped
281: .getVersionedJdbcTransactionState(getTransaction())
282: .getRevision();
283: Filter recreateFilter = store.buildVersionedFilter(schema
284: .getTypeName(), store.buildFidFilter(ff,
285: fidsToRecreate), new RevisionInfo(toVersion));
286: FeatureReader fr = null;
287: FeatureWriter fw = null;
288: try {
289: DefaultQuery q = new DefaultQuery(schema.getTypeName(),
290: recreateFilter);
291: fr = store.wrapped
292: .getFeatureReader(q, getTransaction());
293: fw = store.wrapped.getFeatureWriterAppend(schema
294: .getTypeName(), getTransaction());
295: while (fr.hasNext()) {
296: Feature original = fr.next();
297: Feature restored = fw.next();
298: for (int i = 0; i < original.getFeatureType()
299: .getAttributeCount(); i++) {
300: restored.setAttribute(i, original
301: .getAttribute(i));
302: }
303: restored.setAttribute("revision",
304: new Long(revision));
305: restored.setAttribute("expired", new Long(
306: Long.MAX_VALUE));
307: fw.write();
308: }
309: } catch (IllegalAttributeException iae) {
310: throw new DataSourceException(
311: "Unexpected error occurred while "
312: + "restoring deleted featues", iae);
313: } finally {
314: if (fr != null)
315: fr.close();
316: if (fw != null)
317: fw.close();
318: }
319: }
320:
321: // Now onto the modified features, that were there, and still are there.
322: // Since we cannot get a sorted writer we have to do a kind of inner loop scan
323: // (note, a parellel scan of similarly sorted reader and writer would be more
324: // efficient, but writer sorting is not there...)
325: // Here it's possible to work against the external API, thought it would be more
326: // efficient (but more complex) to work against the wrapped one.
327: if (!mfids.getModified().isEmpty()) {
328: Filter modifiedIdFilter = store.buildFidFilter(ff, mfids
329: .getModified());
330: Filter mifCurrent = store.buildVersionedFilter(schema
331: .getTypeName(), modifiedIdFilter,
332: new RevisionInfo());
333: FeatureReader fr = null;
334: FeatureWriter fw = null;
335: try {
336: fw = store.getFeatureWriter(schema.getTypeName(),
337: mifCurrent, getTransaction());
338: while (fw.hasNext()) {
339: Feature current = fw.next();
340: Filter currIdFilter = ff.id(Collections
341: .singleton(ff.featureId(current.getID())));
342: Filter cidToVersion = store.buildVersionedFilter(
343: schema.getTypeName(), currIdFilter,
344: new RevisionInfo(toVersion));
345: DefaultQuery q = new DefaultQuery(schema
346: .getTypeName(), cidToVersion);
347: q.setVersion(toVersion);
348: fr = store.getFeatureReader(q, getTransaction());
349: Feature original = fr.next();
350: for (int i = 0; i < original.getFeatureType()
351: .getAttributeCount(); i++) {
352: current.setAttribute(i, original
353: .getAttribute(i));
354: }
355: fr.close();
356: fw.write();
357: }
358: } catch (IllegalAttributeException iae) {
359: throw new DataSourceException(
360: "Unexpected error occurred while "
361: + "restoring deleted featues", iae);
362: } finally {
363: if (fr != null)
364: fr.close();
365: if (fw != null)
366: fw.close();
367: }
368: }
369:
370: }
371:
372: public FeatureCollection getLog(String fromVersion,
373: String toVersion, Filter filter, String[] userIds,
374: int maxRows) throws IOException {
375: if (filter == null)
376: filter = Filter.INCLUDE;
377: RevisionInfo r1 = new RevisionInfo(fromVersion);
378: RevisionInfo r2 = new RevisionInfo(toVersion);
379:
380: boolean swapped = false;
381: if (r1.revision > r2.revision) {
382: // swap them
383: RevisionInfo tmpr = r1;
384: r1 = r2;
385: r2 = tmpr;
386: String tmps = toVersion;
387: toVersion = fromVersion;
388: fromVersion = tmps;
389: swapped = true;
390: }
391:
392: // We implement this exactly as described. Happily, it seems Postgis does not have
393: // sql lentgh limitations. Yet, if would be a lot better if we could encode this
394: // as a single sql query with subqueries... (but not all filters are encodable...)
395: ModifiedFeatureIds mfids = store.getModifiedFeatureFIDs(schema
396: .getTypeName(), fromVersion, toVersion, filter,
397: userIds, getTransaction());
398: Set ids = new HashSet(mfids.getCreated());
399: ids.addAll(mfids.getDeleted());
400: ids.addAll(mfids.getModified());
401:
402: // no changes?
403: if (ids.isEmpty())
404: return new EmptyFeatureCollection(schema);
405:
406: // Create a filter that sounds like:
407: // (revision > r1 and revision <= r2) or (expired > r1 and expired <= r2) and fid in
408: // (fidlist)
409: FilterFactory ff = CommonFactoryFinder.getFilterFactory(null);
410: Filter fidFilter = store.buildFidFilter(ff, ids);
411: Filter transformedFidFilter = store.transformFidFilter(schema
412: .getTypeName(), fidFilter);
413: Filter revGrR1 = ff.greater(ff.property("revision"), ff
414: .literal(r1.revision));
415: Filter revLeR2 = ff.lessOrEqual(ff.property("revision"), ff
416: .literal(r2.revision));
417: Filter expGrR1 = ff.greater(ff.property("expired"), ff
418: .literal(r1.revision));
419: Filter expLeR2 = ff.lessOrEqual(ff.property("expired"), ff
420: .literal(r2.revision));
421: Filter versionFilter = ff.and(transformedFidFilter, ff.or(ff
422: .and(revGrR1, revLeR2), ff.and(expGrR1, expLeR2)));
423:
424: // We just want the revision and expired, build a query against the real feature type
425: DefaultQuery q = new DefaultQuery(schema.getTypeName(),
426: versionFilter, new String[] { "revision", "expired" });
427: FeatureReader fr = null;
428: SortedSet revisions = new TreeSet();
429: try {
430: fr = store.wrapped.getFeatureReader(q, getTransaction());
431: while (fr.hasNext()) {
432: Feature f = fr.next();
433: Long revision = (Long) f.getAttribute(0);
434: if (revision.longValue() > r1.revision)
435: revisions.add(revision);
436: Long expired = (Long) f.getAttribute(1);
437: if (expired.longValue() != Long.MAX_VALUE
438: && expired.longValue() > r1.revision)
439: revisions.add(expired);
440: }
441: } catch (Exception e) {
442: throw new DataSourceException(
443: "Error reading modified revisions from datastore",
444: e);
445: } finally {
446: if (fr != null)
447: fr.close();
448: }
449:
450: // now, we have a list of revisions between a min and a max
451: // let's try to build a fid filter with revisions from the biggest to the smallest
452: Set revisionIdSet = new HashSet();
453: for (Iterator it = revisions.iterator(); it.hasNext();) {
454: Long rev = (Long) it.next();
455: revisionIdSet.add(ff.featureId(rev.toString()));
456: }
457: Filter revisionFilter = ff.id(revisionIdSet);
458:
459: // return the changelog
460: // TODO: sort on revision descending. Unfortunately, to do so we have to fix fid mappers,
461: // so that auto-increment can return revision among the attributes, and at the same
462: // time simply allow not include fid attributes in the insert queries (or provide a
463: // "default"
464: // value for them).
465: FeatureSource changesets = (FeatureSource) store
466: .getFeatureSource(VersionedPostgisDataStore.TBL_CHANGESETS);
467: DefaultQuery sq = new DefaultQuery();
468: sq.setFilter(revisionFilter);
469: final SortOrder order = swapped ? SortOrder.ASCENDING
470: : SortOrder.DESCENDING;
471: sq.setSortBy(new SortBy[] { ff.sort("revision", order) });
472: if (maxRows > 0)
473: sq.setMaxFeatures(maxRows);
474: return changesets.getFeatures(sq);
475: }
476:
477: public FeatureDiffReader getDifferences(String fromVersion,
478: String toVersion, Filter filter, String[] userIds)
479: throws IOException {
480: if (filter == null)
481: filter = Filter.INCLUDE;
482:
483: RevisionInfo r1 = new RevisionInfo(fromVersion);
484: RevisionInfo r2 = new RevisionInfo(toVersion);
485:
486: // gather modified ids
487: ModifiedFeatureIds mfids = store.getModifiedFeatureFIDs(schema
488: .getTypeName(), fromVersion, toVersion, filter,
489: userIds, getTransaction());
490:
491: // build all the filters to gather created, deleted and modified features at the appropriate
492: // revisions, depending also on wheter creation/deletion should be swapped or not
493: FilterFactory ff = CommonFactoryFinder.getFilterFactory(null);
494: VersionedFIDMapper mapper = (VersionedFIDMapper) store
495: .getFIDMapper(schema.getTypeName());
496:
497: return new FeatureDiffReader(store, getTransaction(), schema,
498: r1, r2, mapper, mfids);
499: }
500:
501: // ----------------------------------------------------------------------------------------------
502: // INTERNAL SUPPORT METHODS
503: // ----------------------------------------------------------------------------------------------
504:
505: /**
506: * Clones the query and sets the proper type name into it
507: *
508: * @param query
509: * @return
510: */
511: private Query getTypedQuery(Query query) {
512: DefaultQuery q = new DefaultQuery(query);
513: q.setTypeName(schema.getTypeName());
514: return q;
515: }
516:
517: public Set getSupportedHints() {
518: VersionedPostgisDataStore ds = (VersionedPostgisDataStore) getDataStore();
519: if (ds.wrapped.isWKBEnabled()) {
520: HashSet set = new HashSet();
521: set.add(Hints.JTS_COORDINATE_SEQUENCE_FACTORY);
522: set.add(Hints.JTS_GEOMETRY_FACTORY);
523: return set;
524: } else {
525: return Collections.EMPTY_SET;
526: }
527: }
528:
529: }
|