001: /*
002: * Copyright 2004-2006 the original author or authors.
003: *
004: * Licensed under the Apache License, Version 2.0 (the "License");
005: * you may not use this file except in compliance with the License.
006: * You may obtain a copy of the License at
007: *
008: * http://www.apache.org/licenses/LICENSE-2.0
009: *
010: * Unless required by applicable law or agreed to in writing, software
011: * distributed under the License is distributed on an "AS IS" BASIS,
012: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013: * See the License for the specific language governing permissions and
014: * limitations under the License.
015: */
016:
017: package org.compass.gps.device.jpa.indexer;
018:
019: import java.util.Arrays;
020: import java.util.HashMap;
021: import java.util.Map;
022:
023: import org.apache.commons.logging.Log;
024: import org.apache.commons.logging.LogFactory;
025: import org.compass.core.CompassSession;
026: import org.compass.gps.device.jpa.EntityManagerWrapper;
027: import org.compass.gps.device.jpa.JpaGpsDevice;
028: import org.compass.gps.device.jpa.JpaGpsDeviceException;
029: import org.compass.gps.device.jpa.entities.EntityInformation;
030: import org.compass.gps.device.jpa.queryprovider.HibernateJpaQueryProvider;
031: import org.compass.gps.device.support.parallel.IndexEntity;
032: import org.hibernate.CacheMode;
033: import org.hibernate.Criteria;
034: import org.hibernate.ObjectNotFoundException;
035: import org.hibernate.ScrollMode;
036: import org.hibernate.ScrollableResults;
037: import org.hibernate.Session;
038: import org.hibernate.criterion.Order;
039: import org.hibernate.ejb.HibernateEntityManager;
040: import org.hibernate.ejb.HibernateQuery;
041: import org.hibernate.metadata.ClassMetadata;
042:
043: /**
044: * A Hibernate indexer uses Hibernate <code>ScrollableResults</code> to index the database
045: * instead of using <code>setFirstResult</code> and <code>setMaxResults</code>. Using scrollable
046: * results yields better performance especially for large result set.
047: *
048: * <p>Also takes into accont if using {@link HibernateJpaQueryProvider} by called its <code>createCriteria</code>
049: * instead of the default <code>createQuery</code>. The criteria better handles outer joins, allows to set the
050: * fetch size, and automatically supports ordering by the ids of the entities.
051: *
052: * <p>Note, if using {@link org.compass.gps.device.jpa.JpaGpsDevice#setIndexSelectQuery(Class, String)} will cause
053: * not to be able to use <code>Criteria</code>. Instead, make sure to use {@link org.compass.gps.device.jpa.JpaGpsDevice#setIndexQueryProvider(Class, org.compass.gps.device.jpa.queryprovider.JpaQueryProvider)}
054: * and provider your own extension on top of {@link org.compass.gps.device.jpa.queryprovider.HibernateJpaQueryProvider}
055: * that returns your own <code>Criteria</code>.
056: *
057: * @author kimchy
058: */
059: public class HibernateJpaIndexEntitiesIndexer implements
060: JpaIndexEntitiesIndexer {
061:
062: private static final Log log = LogFactory
063: .getLog(HibernateJpaIndexEntitiesIndexer.class);
064:
065: private JpaGpsDevice jpaGpsDevice;
066:
067: private boolean performOrderById = true;
068:
069: private Map<String, Boolean> performOrderByPerEntity = new HashMap<String, Boolean>();
070:
071: public void setJpaGpsDevice(JpaGpsDevice jpaGpsDevice) {
072: this .jpaGpsDevice = jpaGpsDevice;
073: }
074:
075: /**
076: * Should this indxer order by the ids when <code>Criteria</code> is available.
077: * Defaults to <code>true</code>.
078: */
079: public void setPerformOrderById(boolean performOrderById) {
080: this .performOrderById = performOrderById;
081: }
082:
083: /**
084: * Should this indxer order by the ids when <code>Criteria</code> is available for
085: * the given entity. Defaults to {@link #setPerformOrderById(boolean)}.
086: */
087: public void setPerformOrderById(String entity,
088: boolean performOrderById) {
089: performOrderByPerEntity.put(entity, performOrderById);
090: }
091:
092: public void performIndex(CompassSession session,
093: IndexEntity[] entities) {
094: for (IndexEntity indexEntity : entities) {
095: EntityInformation entityInformation = (EntityInformation) indexEntity;
096: if (jpaGpsDevice.isFilteredForIndex(entityInformation
097: .getName())) {
098: continue;
099: }
100: int fetchCount = jpaGpsDevice.getFetchCount();
101: if (!jpaGpsDevice.isRunning()) {
102: return;
103: }
104: EntityManagerWrapper wrapper = jpaGpsDevice
105: .getEntityManagerWrapper().newInstance();
106: ScrollableResults cursor = null;
107: try {
108: wrapper.open();
109: HibernateEntityManager entityManager = (HibernateEntityManager) wrapper
110: .getEntityManager();
111: entityManager.getSession().setCacheMode(
112: CacheMode.IGNORE);
113: if (log.isDebugEnabled()) {
114: log.debug(jpaGpsDevice
115: .buildMessage("Indexing entities ["
116: + entityInformation.getName()
117: + "] using query ["
118: + entityInformation
119: .getQueryProvider() + "]"));
120: }
121:
122: if (entityInformation.getQueryProvider() instanceof HibernateJpaQueryProvider) {
123: Criteria criteria = ((HibernateJpaQueryProvider) entityInformation
124: .getQueryProvider()).createCriteria(
125: entityManager, entityInformation);
126: if (criteria != null) {
127: if (performOrderById) {
128: Boolean performOrder = performOrderByPerEntity
129: .get(entityInformation.getName());
130: if (performOrder == null || performOrder) {
131: ClassMetadata metadata = entityManager
132: .getSession()
133: .getSessionFactory()
134: .getClassMetadata(
135: entityInformation
136: .getName());
137: String idPropName = metadata
138: .getIdentifierPropertyName();
139: if (idPropName != null) {
140: criteria.addOrder(Order
141: .asc(idPropName));
142: }
143: }
144: }
145: criteria.setFetchSize(fetchCount);
146: cursor = criteria
147: .scroll(ScrollMode.FORWARD_ONLY);
148: }
149: }
150: if (cursor == null) {
151: HibernateQuery query = (HibernateQuery) entityInformation
152: .getQueryProvider().createQuery(
153: entityManager, entityInformation);
154: cursor = query.getHibernateQuery().scroll(
155: ScrollMode.FORWARD_ONLY);
156: }
157:
158: // store things in row buffer to allow using batch fetching in Hibernate
159: RowBuffer buffer = new RowBuffer(session, entityManager
160: .getSession(), fetchCount);
161: Object prev = null;
162: while (true) {
163: try {
164: if (!cursor.next()) {
165: break;
166: }
167: } catch (ObjectNotFoundException e) {
168: continue;
169: }
170: Object item = cursor.get(0);
171: if (item != prev && prev != null) {
172: buffer.put(prev);
173: }
174: prev = item;
175: if (buffer.shouldFlush()) {
176: // put also the item/prev since we are clearing the session
177: // in the flush process
178: buffer.put(prev);
179: buffer.flush();
180: prev = null;
181: }
182: }
183: if (prev != null) {
184: buffer.put(prev);
185: }
186: buffer.close();
187: cursor.close();
188:
189: entityManager.clear();
190: wrapper.close();
191: } catch (Exception e) {
192: log.error(jpaGpsDevice
193: .buildMessage("Failed to index the database"),
194: e);
195: if (cursor != null) {
196: try {
197: cursor.close();
198: } catch (Exception e1) {
199: log
200: .warn(
201: jpaGpsDevice
202: .buildMessage("Failed to close cursor on error, ignoring"),
203: e1);
204: }
205: }
206: wrapper.closeOnError();
207: if (!(e instanceof JpaGpsDeviceException)) {
208: throw new JpaGpsDeviceException(
209: jpaGpsDevice
210: .buildMessage("Failed to index the database"),
211: e);
212: }
213: throw (JpaGpsDeviceException) e;
214: }
215: }
216: }
217:
218: private class RowBuffer {
219: private Object[] buffer;
220: private int fetchCount;
221: private int index = 0;
222: private CompassSession compassSession;
223: private Session hibernateSession;
224:
225: RowBuffer(CompassSession compassSession,
226: Session hibernateSession, int fetchCount) {
227: this .compassSession = compassSession;
228: this .hibernateSession = hibernateSession;
229: this .fetchCount = fetchCount;
230: this .buffer = new Object[fetchCount + 1];
231: }
232:
233: public void put(Object row) {
234: buffer[index] = row;
235: index++;
236: }
237:
238: public boolean shouldFlush() {
239: return index >= fetchCount;
240: }
241:
242: public void close() {
243: flush();
244: buffer = null;
245: }
246:
247: private void flush() {
248: for (int i = 0; i < index; i++) {
249: compassSession.create(buffer[i]);
250: }
251: // clear buffer and sessions to allow for GC
252: Arrays.fill(buffer, null);
253: compassSession.evictAll();
254: hibernateSession.clear();
255: index = 0;
256: }
257: }
258:
259: }
|