001: /*
002: * Copyright 2004-2006 the original author or authors.
003: *
004: * Licensed under the Apache License, Version 2.0 (the "License");
005: * you may not use this file except in compliance with the License.
006: * You may obtain a copy of the License at
007: *
008: * http://www.apache.org/licenses/LICENSE-2.0
009: *
010: * Unless required by applicable law or agreed to in writing, software
011: * distributed under the License is distributed on an "AS IS" BASIS,
012: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013: * See the License for the specific language governing permissions and
014: * limitations under the License.
015: */
016:
017: package org.compass.gps.device.hibernate.indexer;
018:
019: import java.util.Arrays;
020: import java.util.HashMap;
021: import java.util.Map;
022:
023: import org.apache.commons.logging.Log;
024: import org.apache.commons.logging.LogFactory;
025: import org.compass.core.CompassSession;
026: import org.compass.gps.device.hibernate.HibernateGpsDevice;
027: import org.compass.gps.device.hibernate.HibernateGpsDeviceException;
028: import org.compass.gps.device.hibernate.entities.EntityInformation;
029: import org.compass.gps.device.support.parallel.IndexEntity;
030: import org.hibernate.CacheMode;
031: import org.hibernate.Criteria;
032: import org.hibernate.ObjectNotFoundException;
033: import org.hibernate.Query;
034: import org.hibernate.ScrollMode;
035: import org.hibernate.ScrollableResults;
036: import org.hibernate.Session;
037: import org.hibernate.Transaction;
038: import org.hibernate.criterion.Order;
039: import org.hibernate.metadata.ClassMetadata;
040:
041: /**
042: * A Hibernate indexer uses Hibernate <code>ScrollableResults</code> to index the database
043: * instead of using <code>setFirstResult</code> and <code>setMaxResults</code>. Using scrollable
044: * results yields better performance especially for large result set.
045: *
046: * <p>First tries to call {@link org.compass.gps.device.hibernate.HibernateQueryProvider#createCriteria(org.hibernate.Session, org.compass.gps.device.hibernate.entities.EntityInformation)}
047: * in order to use Hibernate <code>Criteria</code> to construct the cursor. If no criteria is returned (<code>null</code>
048: * is returned), Hibernate <code>Query</code> is used by calling {@link org.compass.gps.device.hibernate.HibernateQueryProvider#createQuery(org.hibernate.Session, org.compass.gps.device.hibernate.entities.EntityInformation)}.
049: *
050: * <p>When using Criteria, by default, orders the results by entity id. This can be turned off either globablly using
051: * {@link #setPerformOrderById(boolean)}, or per entity using {@link #setPerformOrderById(String, boolean)}.
052: *
053: * @author kimchy
054: */
055: public class ScrollableHibernateIndexEntitiesIndexer implements
056: HibernateIndexEntitiesIndexer {
057:
058: private static final Log log = LogFactory
059: .getLog(ScrollableHibernateIndexEntitiesIndexer.class);
060:
061: private HibernateGpsDevice device;
062:
063: private boolean performOrderById = true;
064:
065: private Map<String, Boolean> performOrderByPerEntity = new HashMap<String, Boolean>();
066:
067: public void setHibernateGpsDevice(HibernateGpsDevice device) {
068: this .device = device;
069: }
070:
071: /**
072: * Should this indxer order by the ids when <code>Criteria</code> is available.
073: * Defaults to <code>true</code>.
074: */
075: public void setPerformOrderById(boolean performOrderById) {
076: this .performOrderById = performOrderById;
077: }
078:
079: /**
080: * Should this indxer order by the ids when <code>Criteria</code> is available for
081: * the given entity. Defaults to {@link #setPerformOrderById(boolean)}.
082: */
083: public void setPerformOrderById(String entity,
084: boolean performOrderById) {
085: performOrderByPerEntity.put(entity, performOrderById);
086: }
087:
088: public void performIndex(CompassSession session,
089: IndexEntity[] entities) {
090: for (IndexEntity entity : entities) {
091: EntityInformation entityInformation = (EntityInformation) entity;
092: if (device.isFilteredForIndex(entityInformation.getName())) {
093: continue;
094: }
095: if (!device.isRunning()) {
096: return;
097: }
098: ScrollableResults cursor = null;
099: Session hibernateSession = device.getSessionFactory()
100: .openSession();
101: hibernateSession.setCacheMode(CacheMode.IGNORE);
102: Transaction hibernateTransaction = null;
103: try {
104: hibernateTransaction = hibernateSession
105: .beginTransaction();
106: if (log.isDebugEnabled()) {
107: log.debug(device.buildMessage("Indexing entities ["
108: + entityInformation.getName()
109: + "] using query ["
110: + entityInformation.getQueryProvider()
111: + "]"));
112: }
113:
114: Criteria criteria = entityInformation
115: .getQueryProvider().createCriteria(
116: hibernateSession, entityInformation);
117: if (criteria != null) {
118: if (performOrderById) {
119: Boolean performOrder = performOrderByPerEntity
120: .get(entityInformation.getName());
121: if (performOrder == null || performOrder) {
122: ClassMetadata metadata = hibernateSession
123: .getSessionFactory()
124: .getClassMetadata(
125: entityInformation.getName());
126: String idPropName = metadata
127: .getIdentifierPropertyName();
128: if (idPropName != null) {
129: criteria
130: .addOrder(Order.asc(idPropName));
131: }
132: }
133: }
134: criteria.setFetchSize(device.getFetchCount());
135: cursor = criteria.scroll(ScrollMode.FORWARD_ONLY);
136: } else {
137: Query query = entityInformation.getQueryProvider()
138: .createQuery(hibernateSession,
139: entityInformation);
140: cursor = query.scroll(ScrollMode.FORWARD_ONLY);
141: }
142:
143: // store things in row buffer to allow using batch fetching in Hibernate
144: RowBuffer buffer = new RowBuffer(session,
145: hibernateSession, device.getFetchCount());
146: Object prev = null;
147: while (true) {
148: try {
149: if (!cursor.next()) {
150: break;
151: }
152: } catch (ObjectNotFoundException e) {
153: continue;
154: }
155: Object item = cursor.get(0);
156: if (prev != null && item != prev) {
157: buffer.put(prev);
158: }
159: prev = item;
160: if (buffer.shouldFlush()) {
161: // put also the item/prev since we are clearing the session
162: // in the flush process
163: buffer.put(prev);
164: buffer.flush();
165: prev = null;
166: }
167: }
168: if (prev != null) {
169: buffer.put(prev);
170: }
171: buffer.close();
172: cursor.close();
173:
174: hibernateTransaction.commit();
175: } catch (Exception e) {
176: log.error(device
177: .buildMessage("Failed to index the database"),
178: e);
179: if (cursor != null) {
180: try {
181: cursor.close();
182: } catch (Exception e1) {
183: log
184: .warn(
185: device
186: .buildMessage("Failed to close cursor on error, ignoring"),
187: e1);
188: }
189: }
190: if (hibernateTransaction != null) {
191: try {
192: hibernateTransaction.rollback();
193: } catch (Exception e1) {
194: log.warn("Failed to rollback Hibernate", e1);
195: }
196: }
197: if (!(e instanceof HibernateGpsDeviceException)) {
198: throw new HibernateGpsDeviceException(
199: device
200: .buildMessage("Failed to index the database"),
201: e);
202: }
203: throw (HibernateGpsDeviceException) e;
204: } finally {
205: hibernateSession.close();
206: session.close();
207: }
208: }
209: }
210:
211: private class RowBuffer {
212: private Object[] buffer;
213: private int fetchCount;
214: private int index = 0;
215: private CompassSession compassSession;
216: private Session hibernateSession;
217:
218: RowBuffer(CompassSession compassSession,
219: Session hibernateSession, int fetchCount) {
220: this .compassSession = compassSession;
221: this .hibernateSession = hibernateSession;
222: this .fetchCount = fetchCount;
223: this .buffer = new Object[fetchCount + 1];
224: }
225:
226: public void put(Object row) {
227: buffer[index] = row;
228: index++;
229: }
230:
231: public boolean shouldFlush() {
232: return index >= fetchCount;
233: }
234:
235: public void close() {
236: flush();
237: buffer = null;
238: }
239:
240: private void flush() {
241: for (int i = 0; i < index; i++) {
242: compassSession.create(buffer[i]);
243: }
244: // clear buffer and sessions to allow for GC
245: Arrays.fill(buffer, null);
246: compassSession.evictAll();
247: hibernateSession.clear();
248: index = 0;
249: }
250: }
251:
252: }
|