001: // $Id: ActionQueue.java 11403 2007-04-11 14:25:13Z steve.ebersole@jboss.com $
002: package org.hibernate.engine;
003:
004: import org.hibernate.action.EntityInsertAction;
005: import org.hibernate.action.EntityDeleteAction;
006: import org.hibernate.action.Executable;
007: import org.hibernate.action.EntityUpdateAction;
008: import org.hibernate.action.CollectionRecreateAction;
009: import org.hibernate.action.CollectionRemoveAction;
010: import org.hibernate.action.CollectionUpdateAction;
011: import org.hibernate.action.EntityIdentityInsertAction;
012: import org.hibernate.action.BulkOperationCleanupAction;
013: import org.hibernate.HibernateException;
014: import org.hibernate.AssertionFailure;
015: import org.hibernate.cache.CacheException;
016: import org.apache.commons.logging.Log;
017: import org.apache.commons.logging.LogFactory;
018:
019: import java.util.ArrayList;
020: import java.util.List;
021: import java.util.Set;
022: import java.util.HashMap;
023: import java.util.Iterator;
024: import java.io.ObjectInputStream;
025: import java.io.IOException;
026: import java.io.Serializable;
027: import java.io.ObjectOutputStream;
028:
029: /**
030: * Responsible for maintaining the queue of actions related to events.
031: * </p>
032: * The ActionQueue holds the DML operations queued as part of a session's
033: * transactional-write-behind semantics. DML operations are queued here
034: * until a flush forces them to be executed against the database.
035: *
036: * @author Steve Ebersole
037: */
038: public class ActionQueue {
039:
040: private static final Log log = LogFactory.getLog(ActionQueue.class);
041: private static final int INIT_QUEUE_LIST_SIZE = 5;
042:
043: private SessionImplementor session;
044:
045: // Object insertions, updates, and deletions have list semantics because
046: // they must happen in the right order so as to respect referential
047: // integrity
048: private ArrayList insertions;
049: private ArrayList deletions;
050: private ArrayList updates;
051: // Actually the semantics of the next three are really "Bag"
052: // Note that, unlike objects, collection insertions, updates,
053: // deletions are not really remembered between flushes. We
054: // just re-use the same Lists for convenience.
055: private ArrayList collectionCreations;
056: private ArrayList collectionUpdates;
057: private ArrayList collectionRemovals;
058:
059: private ArrayList executions;
060:
061: /**
062: * Constructs an action queue bound to the given session.
063: *
064: * @param session The session "owning" this queue.
065: */
066: public ActionQueue(SessionImplementor session) {
067: this .session = session;
068: init();
069: }
070:
071: private void init() {
072: insertions = new ArrayList(INIT_QUEUE_LIST_SIZE);
073: deletions = new ArrayList(INIT_QUEUE_LIST_SIZE);
074: updates = new ArrayList(INIT_QUEUE_LIST_SIZE);
075:
076: collectionCreations = new ArrayList(INIT_QUEUE_LIST_SIZE);
077: collectionRemovals = new ArrayList(INIT_QUEUE_LIST_SIZE);
078: collectionUpdates = new ArrayList(INIT_QUEUE_LIST_SIZE);
079:
080: executions = new ArrayList(INIT_QUEUE_LIST_SIZE * 3);
081: }
082:
083: public void clear() {
084: updates.clear();
085: insertions.clear();
086: deletions.clear();
087:
088: collectionCreations.clear();
089: collectionRemovals.clear();
090: collectionUpdates.clear();
091: }
092:
093: public void addAction(EntityInsertAction action) {
094: insertions.add(action);
095: }
096:
097: public void addAction(EntityDeleteAction action) {
098: deletions.add(action);
099: }
100:
101: public void addAction(EntityUpdateAction action) {
102: updates.add(action);
103: }
104:
105: public void addAction(CollectionRecreateAction action) {
106: collectionCreations.add(action);
107: }
108:
109: public void addAction(CollectionRemoveAction action) {
110: collectionRemovals.add(action);
111: }
112:
113: public void addAction(CollectionUpdateAction action) {
114: collectionUpdates.add(action);
115: }
116:
117: public void addAction(EntityIdentityInsertAction insert) {
118: insertions.add(insert);
119: }
120:
121: public void addAction(BulkOperationCleanupAction cleanupAction) {
122: // Add these directly to the executions queue
123: executions.add(cleanupAction);
124: }
125:
126: /**
127: * Perform all currently queued entity-insertion actions.
128: *
129: * @throws HibernateException error executing queued insertion actions.
130: */
131: public void executeInserts() throws HibernateException {
132: executeActions(insertions);
133: }
134:
135: /**
136: * Perform all currently queued actions.
137: *
138: * @throws HibernateException error executing queued actions.
139: */
140: public void executeActions() throws HibernateException {
141: executeActions(insertions);
142: executeActions(updates);
143: executeActions(collectionRemovals);
144: executeActions(collectionUpdates);
145: executeActions(collectionCreations);
146: executeActions(deletions);
147: }
148:
149: /**
150: * Prepares the internal action queues for execution.
151: *
152: * @throws HibernateException error preparing actions.
153: */
154: public void prepareActions() throws HibernateException {
155: prepareActions(collectionRemovals);
156: prepareActions(collectionUpdates);
157: prepareActions(collectionCreations);
158: }
159:
160: /**
161: * Performs cleanup of any held cache softlocks.
162: *
163: * @param success Was the transaction successful.
164: */
165: public void afterTransactionCompletion(boolean success) {
166: int size = executions.size();
167: final boolean invalidateQueryCache = session.getFactory()
168: .getSettings().isQueryCacheEnabled();
169: for (int i = 0; i < size; i++) {
170: try {
171: Executable exec = (Executable) executions.get(i);
172: try {
173: exec.afterTransactionCompletion(success);
174: } finally {
175: if (invalidateQueryCache) {
176: session.getFactory().getUpdateTimestampsCache()
177: .invalidate(exec.getPropertySpaces());
178: }
179: }
180: } catch (CacheException ce) {
181: log.error("could not release a cache lock", ce);
182: // continue loop
183: } catch (Exception e) {
184: throw new AssertionFailure(
185: "Exception releasing cache locks", e);
186: }
187: }
188: executions.clear();
189: }
190:
191: /**
192: * Check whether the given tables/query-spaces are to be executed against
193: * given the currently queued actions.
194: *
195: * @param tables The table/query-spaces to check.
196: * @return True if we contain pending actions against any of the given
197: * tables; false otherwise.
198: */
199: public boolean areTablesToBeUpdated(Set tables) {
200: return areTablesToUpdated(updates, tables)
201: || areTablesToUpdated(insertions, tables)
202: || areTablesToUpdated(deletions, tables)
203: || areTablesToUpdated(collectionUpdates, tables)
204: || areTablesToUpdated(collectionCreations, tables)
205: || areTablesToUpdated(collectionRemovals, tables);
206: }
207:
208: /**
209: * Check whether any insertion or deletion actions are currently queued.
210: *
211: * @return True if insertions or deletions are currently queued; false otherwise.
212: */
213: public boolean areInsertionsOrDeletionsQueued() {
214: return (insertions.size() > 0 || deletions.size() > 0);
215: }
216:
217: private static boolean areTablesToUpdated(List executables,
218: Set tablespaces) {
219: int size = executables.size();
220: for (int j = 0; j < size; j++) {
221: Serializable[] spaces = ((Executable) executables.get(j))
222: .getPropertySpaces();
223: for (int i = 0; i < spaces.length; i++) {
224: if (tablespaces.contains(spaces[i])) {
225: if (log.isDebugEnabled())
226: log.debug("changes must be flushed to space: "
227: + spaces[i]);
228: return true;
229: }
230: }
231: }
232: return false;
233: }
234:
235: private void executeActions(List list) throws HibernateException {
236: int size = list.size();
237: for (int i = 0; i < size; i++) {
238: execute((Executable) list.get(i));
239: }
240: list.clear();
241: session.getBatcher().executeBatch();
242: }
243:
244: public void execute(Executable executable) {
245: final boolean lockQueryCache = session.getFactory()
246: .getSettings().isQueryCacheEnabled();
247: if (executable.hasAfterTransactionCompletion()
248: || lockQueryCache) {
249: executions.add(executable);
250: }
251: if (lockQueryCache) {
252: session.getFactory().getUpdateTimestampsCache()
253: .preinvalidate(executable.getPropertySpaces());
254: }
255: executable.execute();
256: }
257:
258: private void prepareActions(List queue) throws HibernateException {
259: int size = queue.size();
260: for (int i = 0; i < size; i++) {
261: Executable executable = (Executable) queue.get(i);
262: executable.beforeExecutions();
263: }
264: }
265:
266: /**
267: * Returns a string representation of the object.
268: *
269: * @return a string representation of the object.
270: */
271: public String toString() {
272: return new StringBuffer().append("ActionQueue[insertions=")
273: .append(insertions).append(" updates=").append(updates)
274: .append(" deletions=").append(deletions).append(
275: " collectionCreations=").append(
276: collectionCreations).append(
277: " collectionRemovals=").append(
278: collectionRemovals).append(
279: " collectionUpdates=")
280: .append(collectionUpdates).append("]").toString();
281: }
282:
283: public int numberOfCollectionRemovals() {
284: return collectionRemovals.size();
285: }
286:
287: public int numberOfCollectionUpdates() {
288: return collectionUpdates.size();
289: }
290:
291: public int numberOfCollectionCreations() {
292: return collectionCreations.size();
293: }
294:
295: public int numberOfDeletions() {
296: return deletions.size();
297: }
298:
299: public int numberOfUpdates() {
300: return updates.size();
301: }
302:
303: public int numberOfInsertions() {
304: return insertions.size();
305: }
306:
307: public void sortCollectionActions() {
308: if (session.getFactory().getSettings().isOrderUpdatesEnabled()) {
309: //sort the updates by fk
310: java.util.Collections.sort(collectionCreations);
311: java.util.Collections.sort(collectionUpdates);
312: java.util.Collections.sort(collectionRemovals);
313: }
314: }
315:
316: public void sortActions() {
317: if (session.getFactory().getSettings().isOrderUpdatesEnabled()) {
318: //sort the updates by pk
319: java.util.Collections.sort(updates);
320: }
321: if (session.getFactory().getSettings().isOrderInsertsEnabled()) {
322: sortInsertActions();
323: }
324: }
325:
326: /**
327: * Order the {@link #insertions} queue such that we group inserts
328: * against the same entity together (without violating constraints). The
329: * original order is generated by cascade order, which in turn is based on
330: * the directionality of foreign-keys. So even though we will be changing
331: * the ordering here, we need to make absolutely certain that we do not
332: * circumvent this FK ordering to the extent of causing constraint
333: * violations
334: */
335: private void sortInsertActions() {
336: // IMPLEMENTATION NOTES:
337: //
338: // The main data structure in this ordering algorithm is the 'positionToAction'
339: // map. Essentially this can be thought of as an put-ordered map (the problem with
340: // actually implementing it that way and doing away with the 'nameList' is that
341: // we'd end up having potential duplicate key values). 'positionToAction' maitains
342: // a mapping from a position within the 'nameList' structure to a "partial queue"
343: // of actions.
344:
345: HashMap positionToAction = new HashMap();
346: List nameList = new ArrayList();
347:
348: loopInsertion: while (!insertions.isEmpty()) {
349: EntityInsertAction action = (EntityInsertAction) insertions
350: .remove(0);
351: String this EntityName = action.getEntityName();
352:
353: // see if we have already encountered this entity-name...
354: if (!nameList.contains(this EntityName)) {
355: // we have not, so create the proper entries in nameList and positionToAction
356: ArrayList segmentedActionQueue = new ArrayList();
357: segmentedActionQueue.add(action);
358: nameList.add(this EntityName);
359: positionToAction
360: .put(new Integer(nameList
361: .indexOf(this EntityName)),
362: segmentedActionQueue);
363: } else {
364: // we have seen it before, so we need to determine if this insert action is
365: // is depenedent upon a previously processed action in terms of FK
366: // relationships (this FK checking is done against the entity's property-state
367: // associated with the action...)
368: int lastPos = nameList.lastIndexOf(this EntityName);
369: Object[] states = action.getState();
370: for (int i = 0; i < states.length; i++) {
371: for (int j = 0; j < nameList.size(); j++) {
372: ArrayList tmpList = (ArrayList) positionToAction
373: .get(new Integer(j));
374: for (int k = 0; k < tmpList.size(); k++) {
375: final EntityInsertAction checkAction = (EntityInsertAction) tmpList
376: .get(k);
377: if (checkAction.getInstance() == states[i]
378: && j > lastPos) {
379: // 'checkAction' is inserting an entity upon which 'action'
380: // depends...
381: // note: this is an assumption and may not be correct in the case of one-to-one
382: ArrayList segmentedActionQueue = new ArrayList();
383: segmentedActionQueue.add(action);
384: nameList.add(this EntityName);
385: positionToAction
386: .put(
387: new Integer(
388: nameList
389: .lastIndexOf(this EntityName)),
390: segmentedActionQueue);
391: continue loopInsertion;
392: }
393: }
394: }
395: }
396:
397: ArrayList actionQueue = (ArrayList) positionToAction
398: .get(new Integer(lastPos));
399: actionQueue.add(action);
400: }
401: }
402:
403: // now iterate back through positionToAction map and move entityInsertAction back to insertion list
404: for (int p = 0; p < nameList.size(); p++) {
405: ArrayList actionQueue = (ArrayList) positionToAction
406: .get(new Integer(p));
407: Iterator itr = actionQueue.iterator();
408: while (itr.hasNext()) {
409: insertions.add(itr.next());
410: }
411: }
412: }
413:
414: public ArrayList cloneDeletions() {
415: return (ArrayList) deletions.clone();
416: }
417:
418: public void clearFromFlushNeededCheck(
419: int previousCollectionRemovalSize) {
420: collectionCreations.clear();
421: collectionUpdates.clear();
422: updates.clear();
423: // collection deletions are a special case since update() can add
424: // deletions of collections not loaded by the session.
425: for (int i = collectionRemovals.size() - 1; i >= previousCollectionRemovalSize; i--) {
426: collectionRemovals.remove(i);
427: }
428: }
429:
430: public boolean hasAnyQueuedActions() {
431: return updates.size() > 0 || insertions.size() > 0
432: || deletions.size() > 0 || collectionUpdates.size() > 0
433: || collectionRemovals.size() > 0
434: || collectionCreations.size() > 0;
435: }
436:
437: /**
438: * Used by the owning session to explicitly control serialization of the
439: * action queue
440: *
441: * @param oos The stream to which the action queue should get written
442: * @throws IOException
443: */
444: public void serialize(ObjectOutputStream oos) throws IOException {
445: log.trace("serializing action-queue");
446:
447: int queueSize = insertions.size();
448: log.trace("starting serialization of [" + queueSize
449: + "] insertions entries");
450: oos.writeInt(queueSize);
451: for (int i = 0; i < queueSize; i++) {
452: oos.writeObject(insertions.get(i));
453: }
454:
455: queueSize = deletions.size();
456: log.trace("starting serialization of [" + queueSize
457: + "] deletions entries");
458: oos.writeInt(queueSize);
459: for (int i = 0; i < queueSize; i++) {
460: oos.writeObject(deletions.get(i));
461: }
462:
463: queueSize = updates.size();
464: log.trace("starting serialization of [" + queueSize
465: + "] updates entries");
466: oos.writeInt(queueSize);
467: for (int i = 0; i < queueSize; i++) {
468: oos.writeObject(updates.get(i));
469: }
470:
471: queueSize = collectionUpdates.size();
472: log.trace("starting serialization of [" + queueSize
473: + "] collectionUpdates entries");
474: oos.writeInt(queueSize);
475: for (int i = 0; i < queueSize; i++) {
476: oos.writeObject(collectionUpdates.get(i));
477: }
478:
479: queueSize = collectionRemovals.size();
480: log.trace("starting serialization of [" + queueSize
481: + "] collectionRemovals entries");
482: oos.writeInt(queueSize);
483: for (int i = 0; i < queueSize; i++) {
484: oos.writeObject(collectionRemovals.get(i));
485: }
486:
487: queueSize = collectionCreations.size();
488: log.trace("starting serialization of [" + queueSize
489: + "] collectionCreations entries");
490: oos.writeInt(queueSize);
491: for (int i = 0; i < queueSize; i++) {
492: oos.writeObject(collectionCreations.get(i));
493: }
494: }
495:
496: /**
497: * Used by the owning session to explicitly control deserialization of the
498: * action queue
499: *
500: * @param ois The stream from which to read the action queue
501: * @throws IOException
502: */
503: public static ActionQueue deserialize(ObjectInputStream ois,
504: SessionImplementor session) throws IOException,
505: ClassNotFoundException {
506: log.trace("deserializing action-queue");
507: ActionQueue rtn = new ActionQueue(session);
508:
509: int queueSize = ois.readInt();
510: log.trace("starting deserialization of [" + queueSize
511: + "] insertions entries");
512: rtn.insertions = new ArrayList(queueSize);
513: for (int i = 0; i < queueSize; i++) {
514: rtn.insertions.add(ois.readObject());
515: }
516:
517: queueSize = ois.readInt();
518: log.trace("starting deserialization of [" + queueSize
519: + "] deletions entries");
520: rtn.deletions = new ArrayList(queueSize);
521: for (int i = 0; i < queueSize; i++) {
522: rtn.deletions.add(ois.readObject());
523: }
524:
525: queueSize = ois.readInt();
526: log.trace("starting deserialization of [" + queueSize
527: + "] updates entries");
528: rtn.updates = new ArrayList(queueSize);
529: for (int i = 0; i < queueSize; i++) {
530: rtn.updates.add(ois.readObject());
531: }
532:
533: queueSize = ois.readInt();
534: log.trace("starting deserialization of [" + queueSize
535: + "] collectionUpdates entries");
536: rtn.collectionUpdates = new ArrayList(queueSize);
537: for (int i = 0; i < queueSize; i++) {
538: rtn.collectionUpdates.add(ois.readObject());
539: }
540:
541: queueSize = ois.readInt();
542: log.trace("starting deserialization of [" + queueSize
543: + "] collectionRemovals entries");
544: rtn.collectionRemovals = new ArrayList(queueSize);
545: for (int i = 0; i < queueSize; i++) {
546: rtn.collectionRemovals.add(ois.readObject());
547: }
548:
549: queueSize = ois.readInt();
550: log.trace("starting deserialization of [" + queueSize
551: + "] collectionCreations entries");
552: rtn.collectionCreations = new ArrayList(queueSize);
553: for (int i = 0; i < queueSize; i++) {
554: rtn.collectionCreations.add(ois.readObject());
555: }
556: return rtn;
557: }
558:
559: }
|