001: /*
002: * Copyright 2004-2006 the original author or authors.
003: *
004: * Licensed under the Apache License, Version 2.0 (the "License");
005: * you may not use this file except in compliance with the License.
006: * You may obtain a copy of the License at
007: *
008: * http://www.apache.org/licenses/LICENSE-2.0
009: *
010: * Unless required by applicable law or agreed to in writing, software
011: * distributed under the License is distributed on an "AS IS" BASIS,
012: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013: * See the License for the specific language governing permissions and
014: * limitations under the License.
015: */
016:
017: package org.compass.gps.device.hibernate.scrollable;
018:
019: import java.lang.reflect.Array;
020: import java.util.ArrayList;
021: import java.util.Iterator;
022: import java.util.List;
023:
024: import org.compass.core.CompassException;
025: import org.compass.core.CompassSession;
026: import org.compass.core.Resource;
027: import org.compass.core.config.CommonMetaDataLookup;
028: import org.compass.core.mapping.CascadeMapping;
029: import org.compass.core.spi.InternalCompass;
030: import org.compass.core.spi.InternalCompassSession;
031: import org.compass.gps.ActiveMirrorGpsDevice;
032: import org.compass.gps.CompassGpsException;
033: import org.compass.gps.device.AbstractGpsDevice;
034: import org.compass.gps.device.hibernate.HibernateGpsDeviceException;
035: import org.compass.gps.device.hibernate.scrollable.snapshot.ConfigureSnapshotEvent;
036: import org.compass.gps.device.hibernate.scrollable.snapshot.CreateAndUpdateSnapshotEvent;
037: import org.compass.gps.device.hibernate.scrollable.snapshot.DeleteSnapshotEvent;
038: import org.compass.gps.device.hibernate.scrollable.snapshot.HibernateAliasRowSnapshot;
039: import org.compass.gps.device.hibernate.scrollable.snapshot.HibernateAliasSnapshot;
040: import org.compass.gps.device.hibernate.scrollable.snapshot.HibernateSnapshot;
041: import org.compass.gps.device.hibernate.scrollable.snapshot.HibernateSnapshotEventListener;
042: import org.compass.gps.device.hibernate.scrollable.snapshot.HibernateSnapshotPersister;
043: import org.compass.gps.device.hibernate.scrollable.snapshot.RAMHibernateSnapshotPersister;
044: import org.compass.gps.device.jdbc.AbstractJdbcGpsDevice.IndexExecution;
045: import org.compass.gps.device.jdbc.mapping.ColumnMapping;
046: import org.compass.gps.device.jdbc.mapping.ColumnToPropertyMapping;
047: import org.compass.gps.device.jdbc.mapping.ResultSetToResourceMapping;
048: import org.hibernate.CacheMode;
049: import org.hibernate.HibernateException;
050: import org.hibernate.Query;
051: import org.hibernate.ScrollMode;
052: import org.hibernate.ScrollableResults;
053: import org.hibernate.Session;
054: import org.hibernate.SessionFactory;
055: import org.hibernate.Transaction;
056:
057: public class Hibernate3ScrollableResultsGpsDevice extends
058: AbstractGpsDevice implements ActiveMirrorGpsDevice {
059:
060: protected List mappings = new ArrayList();
061:
062: private boolean autoDetectVersionColumnSqlType = true;
063:
064: private SessionFactory sessionFactory;
065:
066: private boolean mirrorDataChanges;
067:
068: private HibernateSnapshot snapshot;
069:
070: private HibernateSnapshotPersister snapshotPersister = new RAMHibernateSnapshotPersister();
071:
072: private HibernateSnapshotEventListener snapshotEventListener = new ScrollableResultsSnapshotEventListener();
073:
074: private boolean saveSnapshotAfterMirror = false;
075:
076: protected static interface HibernateSessionWrapper {
077: void open() throws HibernateGpsDeviceException;
078:
079: void close();
080:
081: void closeOnError();
082: }
083:
084: protected int fetchCount = 200;
085:
086: public Hibernate3ScrollableResultsGpsDevice() {
087: super ();
088: }
089:
090: public Hibernate3ScrollableResultsGpsDevice(String name,
091: SessionFactory sessionFactory) {
092: setName(name);
093: this .sessionFactory = sessionFactory;
094: }
095:
096: public void setSessionFactory(SessionFactory sessionFactory) {
097: this .sessionFactory = sessionFactory;
098: }
099:
100: public void setFetchCount(int fetchCount) {
101: this .fetchCount = fetchCount;
102: }
103:
104: public boolean isMirrorDataChanges() {
105: return mirrorDataChanges;
106: }
107:
108: public void setMirrorDataChanges(boolean mirrorDataChanges) {
109: this .mirrorDataChanges = mirrorDataChanges;
110: }
111:
112: protected void doStart() throws CompassGpsException {
113: super .doStart();
114: // support for meta data lookup
115: CommonMetaDataLookup commonMetaDataLookup = new CommonMetaDataLookup(
116: ((InternalCompass) compassGps.getIndexCompass())
117: .getMetaData());
118: for (Iterator it = mappings.iterator(); it.hasNext();) {
119: ResultSetToResourceMapping rsMapping = (ResultSetToResourceMapping) it
120: .next();
121: rsMapping.setAlias(commonMetaDataLookup
122: .lookupAliasName(rsMapping.getAlias()));
123: for (Iterator it1 = rsMapping.mappingsIt(); it1.hasNext();) {
124: List columns = (List) it1.next();
125: for (Iterator it2 = columns.iterator(); it2.hasNext();) {
126: ColumnMapping columnMapping = (ColumnMapping) it2
127: .next();
128: if (columnMapping instanceof ColumnToPropertyMapping) {
129: ColumnToPropertyMapping columnToPropertyMapping = (ColumnToPropertyMapping) columnMapping;
130:
131: columnToPropertyMapping
132: .setPropertyName(commonMetaDataLookup
133: .lookupMetaDataName(columnToPropertyMapping
134: .getPropertyName()));
135: }
136: }
137: }
138: }
139: // double check that all the result set mapping have Compass::Core
140: // resource mapping
141: for (Iterator it = mappings.iterator(); it.hasNext();) {
142: ResultSetToResourceMapping rsMapping = (ResultSetToResourceMapping) it
143: .next();
144: if (!compassGps.hasMappingForEntityForMirror(rsMapping
145: .getAlias(), CascadeMapping.Cascade.ALL)) {
146: throw new IllegalStateException(
147: buildMessage("No resource mapping defined in gps mirror compass for alias ["
148: + rsMapping.getAlias()
149: + "]. Did you defined a jdbc mapping builder?"));
150: }
151: if (!compassGps.hasMappingForEntityForIndex(rsMapping
152: .getAlias())) {
153: throw new IllegalStateException(
154: buildMessage("No resource mapping defined in gps index compass for alias ["
155: + rsMapping.getAlias()
156: + "]. Did you defined a jdbc mapping builder?"));
157: }
158: }
159:
160: if (isMirrorDataChanges()) {
161: if (log.isInfoEnabled()) {
162: log
163: .info(buildMessage("Using mirroring, loading snapshot data"));
164: }
165: // set up the snapshot
166: snapshot = getSnapshotPersister().load();
167: for (Iterator it = mappings.iterator(); it.hasNext();) {
168: ResultSetToResourceMapping mapping = (ResultSetToResourceMapping) it
169: .next();
170: if (mapping.supportsVersioning()
171: && snapshot
172: .getAliasSnapshot(mapping.getAlias()) == null) {
173: if (log.isDebugEnabled()) {
174: log
175: .debug(buildMessage("Alias ["
176: + mapping.getAlias()
177: + "] not found in snapshot data, creating..."));
178: }
179: HibernateAliasSnapshot aliasSnapshot = new HibernateAliasSnapshot(
180: mapping.getAlias());
181: snapshot.putAliasSnapshot(aliasSnapshot);
182: }
183: }
184: // configure the snapshot event listener
185: //TODO: hibernate session is not needed for ConfigureSnapshotEvent
186: getSnapshotEventListener().configure(
187: new ConfigureSnapshotEvent(null, mappings));
188:
189: }
190:
191: if (log.isDebugEnabled()) {
192: for (Iterator it = mappings.iterator(); it.hasNext();) {
193: log
194: .debug(buildMessage("Using DB Mapping "
195: + it.next()));
196: }
197: }
198: }
199:
200: protected void doStop() throws CompassGpsException {
201: getSnapshotPersister().save(snapshot);
202: super .doStop();
203: }
204:
205: /**
206: * Adds a mapping to be indexed and mirrored.
207: */
208: public void addMapping(ResultSetToResourceMapping mapping) {
209: this .mappings.add(mapping);
210: }
211:
212: /**
213: * Adds an array of mappings to be indexed and mirrored.
214: */
215: public void setMappings(ResultSetToResourceMapping[] mappingsArr) {
216: for (int i = 0; i < mappingsArr.length; i++) {
217: addMapping(mappingsArr[i]);
218: }
219: }
220:
221: /**
222: * Should the device auto detect the version columns jdbc type.
223: */
224: public boolean isAutoDetectVersionColumnSqlType() {
225: return autoDetectVersionColumnSqlType;
226: }
227:
228: /**
229: * Sets if the device auto detect the version columns jdbc type.
230: */
231: public void setAutoDetectVersionColumnSqlType(
232: boolean autoDetectVersionColumnSqlType) {
233: this .autoDetectVersionColumnSqlType = autoDetectVersionColumnSqlType;
234: }
235:
236: /**
237: * Returns the array of index execution with a size of the number of
238: * mappings.
239: */
240: protected IndexExecution[] doGetIndexExecutions() {
241: IndexExecution[] indexExecutions = new IndexExecution[mappings
242: .size()];
243: for (int i = 0; i < indexExecutions.length; i++) {
244: ResultSetToResourceMapping mapping = (ResultSetToResourceMapping) mappings
245: .get(i);
246: indexExecutions[i] = new IndexExecution(mapping, mapping
247: .getSelectQuery());
248: }
249: return indexExecutions;
250: }
251:
252: /**
253: * Indexes the data
254: */
255: protected void doIndex(CompassSession session)
256: throws CompassException {
257: // reset the snapshot data before we perform the index operation
258: snapshot = new HibernateSnapshot();
259: for (Iterator it = mappings.iterator(); it.hasNext();) {
260: ResultSetToResourceMapping mapping = (ResultSetToResourceMapping) it
261: .next();
262: if (mapping.supportsVersioning()) {
263: HibernateAliasSnapshot aliasSnapshot = new HibernateAliasSnapshot(
264: mapping.getAlias());
265: snapshot.putAliasSnapshot(aliasSnapshot);
266: }
267: }
268:
269: if (log.isInfoEnabled()) {
270: log
271: .info(buildMessage("Indexing the database with fetch count ["
272: + fetchCount + "]"));
273: }
274:
275: IndexExecution[] indexExecutions = doGetIndexExecutions();
276: for (int i = 0; i != indexExecutions.length; i++) {
277: IndexExecution indexExecution = indexExecutions[i];
278:
279: HibernateSessionWrapper sessionWrapper = doGetHibernateSessionWrapper();
280:
281: try {
282: sessionWrapper.open();
283:
284: Session hibernateSession = ((Hibernate3SessionWrapper) sessionWrapper)
285: .getSession();
286:
287: String queryString = indexExecution.getStatementQuery();
288:
289: if (log.isDebugEnabled()) {
290: log.debug("queryString: " + queryString);
291: }
292:
293: Query query = hibernateSession.createQuery(queryString)
294: .setCacheMode(CacheMode.IGNORE);
295: String[] returnAliases = query.getReturnAliases();
296:
297: ScrollableResults rs = query
298: .scroll(ScrollMode.FORWARD_ONLY);
299: int count = 0;
300: while (rs.next()) {
301: processRow(indexExecution.getDescription(), rs,
302: returnAliases, session);
303: if (++count % fetchCount == 0) {
304: // release memory
305: hibernateSession.flush();
306: hibernateSession.clear();
307: }
308: }
309: rs.close();
310:
311: } catch (Exception e) {
312: log.error(buildMessage("Failed to index the database"),
313: e);
314: sessionWrapper.closeOnError();
315: if (!(e instanceof HibernateGpsDeviceException)) {
316: throw new HibernateGpsDeviceException(
317: buildMessage("Failed to index the database"),
318: e);
319: }
320: throw (HibernateGpsDeviceException) e;
321: }
322:
323: }
324:
325: if (log.isInfoEnabled()) {
326: log.info(buildMessage("Finished indexing the database"));
327: }
328:
329: // save the sanpshot data
330: getSnapshotPersister().save(snapshot);
331: }
332:
333: protected void processRow(Object description, ScrollableResults rs,
334: String[] returnAliases, CompassSession session)
335: throws CompassException {
336: Object value = processRowValue(description, rs, returnAliases,
337: session);
338: if (value != null) {
339: if (value.getClass().isArray()) {
340: int length = Array.getLength(value);
341: for (int i = 0; i < length; i++) {
342: Object value1 = Array.get(value, i);
343: session.create(value1);
344: }
345: } else {
346: session.create(value);
347: }
348: }
349: }
350:
351: protected HibernateSessionWrapper doGetHibernateSessionWrapper() {
352: return new Hibernate3SessionWrapper(sessionFactory);
353: }
354:
355: /**
356: * A helper method that returns the actual session factory for event
357: * registration. Can be used by subclasses if the
358: * <code>SessionFactory</code> is proxied.
359: */
360: protected SessionFactory doGetActualSessionFactory() {
361: return this .sessionFactory;
362: }
363:
364: /**
365: * Index the given <code>ResultSet</code> row into a Compass
366: * <code>Resource</code>.
367: */
368: protected Object processRowValue(Object description,
369: ScrollableResults rs, String[] returnAliases,
370: CompassSession session) throws CompassException {
371:
372: if (log.isDebugEnabled()) {
373: StringBuffer sb = new StringBuffer();
374: sb.append(buildMessage("Indexing data row with values "));
375: for (int i = 0; i != returnAliases.length; i++) {
376: sb.append("[").append(returnAliases[i]).append(":");
377: Object value = rs.get(i);
378: sb.append(value);
379: sb.append("] ");
380: }
381: log.debug(sb.toString());
382: }
383:
384: ResultSetToResourceMapping mapping = (ResultSetToResourceMapping) description;
385:
386: HibernateAliasRowSnapshot rowSnapshot = null;
387: if (shouldMirrorDataChanges() && mapping.supportsVersioning()) {
388: rowSnapshot = new HibernateAliasRowSnapshot();
389: }
390: Resource resource = ((InternalCompassSession) session)
391: .getCompass().getResourceFactory().createResource(
392: mapping.getAlias());
393: Hibernate3ScrollableResultsRowMarshallHelper marshallHelper = new Hibernate3ScrollableResultsRowMarshallHelper(
394: mapping, session, resource, rowSnapshot);
395: marshallHelper.marshallResultSet(rs, returnAliases);
396:
397: if (shouldMirrorDataChanges() && mapping.supportsVersioning()) {
398: snapshot.getAliasSnapshot(mapping.getAlias()).putRow(
399: rowSnapshot);
400: }
401:
402: return resource;
403: }
404:
405: /**
406: * Performs the data change mirroring operation.
407: */
408: public synchronized void performMirroring()
409: throws HibernateGpsDeviceException {
410: if (!shouldMirrorDataChanges() || isPerformingIndexOperation()) {
411: return;
412: }
413: if (snapshot == null) {
414: throw new IllegalStateException(
415: buildMessage("Versioning data was not properly initialized, did you index the device or loaded the data?"));
416: }
417:
418: HibernateSessionWrapper sessionWrapper = doGetHibernateSessionWrapper();
419:
420: try {
421: sessionWrapper.open();
422: for (Iterator it = mappings.iterator(); it.hasNext();) {
423: ResultSetToResourceMapping mapping = (ResultSetToResourceMapping) it
424: .next();
425: if (!mapping.supportsVersioning()) {
426: continue;
427: }
428: HibernateAliasSnapshot oldAliasSnapshot = snapshot
429: .getAliasSnapshot(mapping.getAlias());
430: if (oldAliasSnapshot == null) {
431: log
432: .warn(buildMessage("No snapshot for alias ["
433: + mapping.getAlias()
434: + "] even though there should be support for versioning ignoring the alias"));
435: continue;
436: }
437: HibernateAliasSnapshot newAliasSnapshot = new HibernateAliasSnapshot(
438: mapping.getAlias());
439: ArrayList createdRows = new ArrayList();
440: ArrayList updatedRows = new ArrayList();
441: ArrayList deletedRows = new ArrayList();
442: if (log.isDebugEnabled()) {
443: log.debug(buildMessage("Executing version query ["
444: + mapping.getVersionQuery() + "]"));
445: }
446:
447: String[] returnAliases = null;
448:
449: Session hibernateSession = ((Hibernate3SessionWrapper) sessionWrapper)
450: .getSession();
451:
452: String queryString = mapping.getVersionQuery();
453:
454: if (log.isDebugEnabled()) {
455: log.debug("queryString: " + queryString);
456: }
457:
458: Query query = hibernateSession.createQuery(queryString)
459: .setCacheMode(CacheMode.IGNORE);
460: returnAliases = query.getReturnAliases();
461:
462: ScrollableResults rs = query
463: .scroll(ScrollMode.FORWARD_ONLY);
464: int count = 0;
465: while (rs.next()) {
466: if (log.isDebugEnabled()) {
467: StringBuffer sb = new StringBuffer();
468: sb
469: .append(buildMessage("Version row with values "));
470: for (int i = 0; i != returnAliases.length; i++) {
471: sb.append("[").append(returnAliases[i])
472: .append(":");
473: Object value = rs.get(i);
474: sb.append(value);
475: sb.append("] ");
476: }
477: log.debug(sb.toString());
478: }
479:
480: HibernateAliasRowSnapshot newRowSnapshot = new HibernateAliasRowSnapshot();
481: Hibernate3ScrollableResultsRowMarshallHelper marshallHelper = new Hibernate3ScrollableResultsRowMarshallHelper(
482: mapping, newRowSnapshot, compassGps
483: .getMirrorCompass());
484: marshallHelper.marshallResultSet(rs, returnAliases);
485:
486: // new and old have the same ids
487: HibernateAliasRowSnapshot oldRowSnapshot = oldAliasSnapshot
488: .getRow(newRowSnapshot);
489:
490: // new row or updated row
491: if (oldRowSnapshot == null) {
492: createdRows.add(newRowSnapshot);
493: } else if (oldRowSnapshot
494: .isOlderThan(newRowSnapshot)) {
495: updatedRows.add(newRowSnapshot);
496: }
497:
498: newAliasSnapshot.putRow(newRowSnapshot);
499:
500: if (++count % fetchCount == 0) {
501: // release memory
502: hibernateSession.flush();
503: hibernateSession.clear();
504: }
505: }
506: rs.close();
507:
508: for (Iterator oldRowIt = oldAliasSnapshot
509: .rowSnapshotIt(); oldRowIt.hasNext();) {
510: HibernateAliasRowSnapshot tmpRow = (HibernateAliasRowSnapshot) oldRowIt
511: .next();
512: // deleted row
513: if (newAliasSnapshot.getRow(tmpRow) == null) {
514: deletedRows.add(tmpRow);
515: }
516: }
517: if (!createdRows.isEmpty() || !updatedRows.isEmpty()) {
518: getSnapshotEventListener().onCreateAndUpdate(
519: new CreateAndUpdateSnapshotEvent(
520: hibernateSession, mapping,
521: createdRows, updatedRows,
522: compassGps));
523: }
524: if (!deletedRows.isEmpty()) {
525: getSnapshotEventListener().onDelete(
526: new DeleteSnapshotEvent(hibernateSession,
527: mapping, deletedRows, compassGps));
528: }
529: snapshot.putAliasSnapshot(newAliasSnapshot);
530: }
531: } catch (Exception e) {
532: throw new HibernateGpsDeviceException(
533: buildMessage("Failed while mirroring data changes"),
534: e);
535: } finally {
536: sessionWrapper.close();
537: }
538: if (isSaveSnapshotAfterMirror()) {
539: getSnapshotPersister().save(snapshot);
540: }
541: }
542:
543: public HibernateSnapshotEventListener getSnapshotEventListener() {
544: return snapshotEventListener;
545: }
546:
547: public void setSnapshotEventListener(
548: HibernateSnapshotEventListener snapshotEventListener) {
549: this .snapshotEventListener = snapshotEventListener;
550: }
551:
552: public HibernateSnapshotPersister getSnapshotPersister() {
553: return snapshotPersister;
554: }
555:
556: public void setSnapshotPersister(
557: HibernateSnapshotPersister snapshotPersister) {
558: this .snapshotPersister = snapshotPersister;
559: }
560:
561: public boolean isSaveSnapshotAfterMirror() {
562: return saveSnapshotAfterMirror;
563: }
564:
565: public void setSaveSnapshotAfterMirror(
566: boolean saveSnapshotAfterMirror) {
567: this .saveSnapshotAfterMirror = saveSnapshotAfterMirror;
568: }
569:
570: private class Hibernate3SessionWrapper implements
571: HibernateSessionWrapper {
572:
573: private SessionFactory sessionFactory;
574:
575: private Session session;
576:
577: private Transaction tr;
578:
579: public Hibernate3SessionWrapper(SessionFactory sessionFactory) {
580: this .sessionFactory = sessionFactory;
581: }
582:
583: public Session getSession() {
584: return session;
585: }
586:
587: public void open() throws HibernateGpsDeviceException {
588: try {
589: session = sessionFactory.openSession();
590: } catch (HibernateException e) {
591: throw new HibernateGpsDeviceException(
592: buildMessage("Failed to open session to fetch data"),
593: e);
594: }
595: try {
596: tr = session.beginTransaction();
597: } catch (HibernateException e) {
598: throw new HibernateGpsDeviceException(
599: buildMessage("Failed to begin transaction to fetch data"),
600: e);
601: }
602: }
603:
604: public void close() {
605: if (tr != null) {
606: try {
607: tr.commit();
608: } catch (HibernateException e) {
609: throw new HibernateGpsDeviceException(
610: "Failed to commit hibernate transaction");
611: }
612: }
613: try {
614: if (session.isOpen()) {
615: session.close();
616: }
617: } catch (HibernateException e) {
618: log.error("Failed to close Hibernate session", e);
619: }
620: }
621:
622: public void closeOnError() {
623: if (tr != null) {
624: try {
625: tr.rollback();
626: } catch (HibernateException e1) {
627: log
628: .error(
629: buildMessage("Failed to rollback hibernate transaction"),
630: e1);
631: }
632: }
633: try {
634: if (session.isOpen()) {
635: session.close();
636: }
637: } catch (HibernateException e) {
638: log
639: .error(
640: buildMessage("Failed to close Hibernate session"),
641: e);
642: }
643: }
644: }
645:
646: }
|