001: /*
002: * Copyright 2004-2006 the original author or authors.
003: *
004: * Licensed under the Apache License, Version 2.0 (the "License");
005: * you may not use this file except in compliance with the License.
006: * You may obtain a copy of the License at
007: *
008: * http://www.apache.org/licenses/LICENSE-2.0
009: *
010: * Unless required by applicable law or agreed to in writing, software
011: * distributed under the License is distributed on an "AS IS" BASIS,
012: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013: * See the License for the specific language governing permissions and
014: * limitations under the License.
015: */
016:
017: package org.compass.needle.gigaspaces.datasource;
018:
019: import java.util.List;
020: import java.util.Properties;
021:
022: import com.gigaspaces.datasource.BulkDataPersister;
023: import com.gigaspaces.datasource.BulkItem;
024: import com.gigaspaces.datasource.DataIterator;
025: import com.gigaspaces.datasource.DataPersister;
026: import com.gigaspaces.datasource.DataSourceException;
027: import org.compass.core.Compass;
028: import org.compass.core.CompassCallbackWithoutResult;
029: import org.compass.core.CompassException;
030: import org.compass.core.CompassSession;
031: import org.compass.core.CompassTemplate;
032: import org.compass.core.config.CompassConfigurationFactory;
033: import org.compass.core.spi.InternalCompass;
034:
035: /**
036: * A Compass data source to be used with GigaSpaces mirror.
037: *
038: * <p>GigaSpaces mirror allows to mirror changes done to the Space into an external data source
039: * in a reliable async manner. The Compass data source allows to mirror changes done to the
040: * Space into the search engine using Compass.
041: *
042: * <p>This extenal data source can be injected with a {@link Compass} instance, or it can be
043: * configured using the extenal data source proeprties configuration to create a Compass instance.
044: * The proeprty <code>compass-config-file</code> will point to Compass configuration file.
045: *
046: * @author kimchy
047: */
048: public class CompassDataSource implements DataPersister<Object>,
049: BulkDataPersister {
050:
051: public static final String COMPASS_CFG_PROPERTY = "compass-config-file";
052:
053: private Compass compass;
054:
055: private CompassTemplate compassTemplate;
056:
057: public void setCompass(Compass compass) {
058: this .compass = compass;
059: }
060:
061: public void init(Properties properties) throws DataSourceException {
062: if (compass == null) {
063: String compassConfig = properties
064: .getProperty(COMPASS_CFG_PROPERTY);
065: compass = CompassConfigurationFactory.newConfiguration()
066: .configure(compassConfig).buildCompass();
067: }
068: compassTemplate = new CompassTemplate(compass);
069: }
070:
071: public void shutdown() throws DataSourceException {
072: if (compass != null) {
073: compass.close();
074: compass = null;
075: }
076: }
077:
078: /**
079: * Write given new object to the data store
080: */
081: public void write(Object object) throws DataSourceException {
082: if (hasMapping(object)) {
083: compassTemplate.save(object);
084: }
085: }
086:
087: /**
088: * Update the given object in the data store
089: */
090: public void update(Object object) throws DataSourceException {
091: if (hasMapping(object)) {
092: compassTemplate.save(object);
093: }
094: }
095:
096: /**
097: * Remove the given object from the data store
098: */
099: public void remove(Object object) throws DataSourceException {
100: if (hasMapping(object)) {
101: compassTemplate.delete(object);
102: }
103: }
104:
105: /**
106: * Write given new objects to the data store.
107: *
108: * <p>If the implementation uses transactions, all the objects must be written
109: * in one transaction.
110: */
111: public void writeBatch(final List<Object> objects)
112: throws DataSourceException {
113: compassTemplate.execute(new CompassCallbackWithoutResult() {
114: protected void doInCompassWithoutResult(
115: CompassSession session) throws CompassException {
116: for (Object object : objects) {
117: if (hasMapping(object)) {
118: session.save(object);
119: }
120: }
121: }
122: });
123: }
124:
125: /**
126: * Update given objects in the data store.
127: *
128: * <p>If the implementation uses transactions, all the objects must be updated
129: * in one transaction.
130: *
131: * <p>* This operation is not currently supported by the space.
132: */
133: public void updateBatch(final List<Object> objects)
134: throws DataSourceException {
135: compassTemplate.execute(new CompassCallbackWithoutResult() {
136: protected void doInCompassWithoutResult(
137: CompassSession session) throws CompassException {
138: for (Object object : objects) {
139: if (hasMapping(object)) {
140: session.save(object);
141: }
142: }
143: }
144: });
145: }
146:
147: /**
148: * Remove given objects from the data store.
149: *
150: * <p>If the implementation uses transactions, All the objects must be removed
151: * in one transaction.
152: *
153: * <p>This operation is not currently supported by the space.<br>
154: */
155: public void removeBatch(final List<Object> objects)
156: throws DataSourceException {
157: compassTemplate.execute(new CompassCallbackWithoutResult() {
158: protected void doInCompassWithoutResult(
159: CompassSession session) throws CompassException {
160: for (Object object : objects) {
161: if (hasMapping(object)) {
162: session.delete(object);
163: }
164: }
165: }
166: });
167: }
168:
169: /**
170: * Execute given bulk of operations. Each {@link BulkItem} contains one of
171: * the following operation -<br>
172: *
173: * <p>
174: * WRITE - given object should be inserted to the data store,<br>
175: * UPDATE - given object should be updated in the data store,<br>
176: * REMOVE - given object should be deleted from the data store<br>
177: * <br>
178: * If the implementation uses transactions,<br>
179: * all the bulk operations must be executed in one transaction.<br>
180: */
181: public void executeBulk(final List<BulkItem> bulkItems)
182: throws DataSourceException {
183: compassTemplate.execute(new CompassCallbackWithoutResult() {
184: protected void doInCompassWithoutResult(
185: CompassSession session) throws CompassException {
186: for (BulkItem bulkItem : bulkItems) {
187: switch (bulkItem.getOperation()) {
188: case BulkItem.REMOVE:
189: if (hasMapping(bulkItem.getItem())) {
190: session.delete(bulkItem.getItem());
191: }
192: break;
193: case BulkItem.WRITE:
194: if (hasMapping(bulkItem.getItem())) {
195: session.save(bulkItem.getItem());
196: }
197: break;
198: case BulkItem.UPDATE:
199: if (hasMapping(bulkItem.getItem())) {
200: session.save(bulkItem.getItem());
201: }
202: break;
203: default:
204: break;
205: }
206: }
207: }
208: });
209: }
210:
211: /**
212: * Read one object that matches the given template. <br>
213: * Used by the space for read templates with UID.<br>
214: */
215: public Object read(Object template) throws DataSourceException {
216: throw new UnsupportedOperationException();
217: }
218:
219: /**
220: * Create an iterator over all objects that match the given template.<br>
221: * Note: null value can be passed - in case of a null template or at initial
222: * space load
223: */
224: public DataIterator<Object> iterator(Object template)
225: throws DataSourceException {
226: throw new UnsupportedOperationException();
227: }
228:
229: /**
230: * Count number of object in the data source that match given template.<br>
231: * Note: null value can be passed - in case of a null template.
232: */
233: public int count(Object template) throws DataSourceException {
234: throw new UnsupportedOperationException();
235: }
236:
237: /**
238: * Creates and returns an iterator over all the entries that should be
239: * loaded into space.
240: *
241: * @return a {@link DataIterator} or null if no data should be loaded into
242: * space
243: * @throws DataSourceException
244: */
245: public DataIterator<Object> initialLoad()
246: throws DataSourceException {
247: throw new UnsupportedOperationException();
248: }
249:
250: private boolean hasMapping(Object object) {
251: return ((InternalCompass) compass).getMapping()
252: .getRootMappingByClass(object.getClass()) != null;
253: }
254: }
|