001: /*
002: * Copyright 2004-2006 the original author or authors.
003: *
004: * Licensed under the Apache License, Version 2.0 (the "License");
005: * you may not use this file except in compliance with the License.
006: * You may obtain a copy of the License at
007: *
008: * http://www.apache.org/licenses/LICENSE-2.0
009: *
010: * Unless required by applicable law or agreed to in writing, software
011: * distributed under the License is distributed on an "AS IS" BASIS,
012: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013: * See the License for the specific language governing permissions and
014: * limitations under the License.
015: */
016:
017: package org.compass.spring.transaction;
018:
019: import org.apache.commons.logging.Log;
020: import org.apache.commons.logging.LogFactory;
021: import org.compass.core.CompassException;
022: import org.compass.core.CompassSession;
023: import org.compass.core.spi.InternalCompassSession;
024: import org.compass.core.transaction.AbstractTransaction;
025: import org.compass.core.transaction.TransactionException;
026: import org.compass.core.transaction.TransactionFactory;
027: import org.springframework.transaction.PlatformTransactionManager;
028: import org.springframework.transaction.TransactionStatus;
029: import org.springframework.transaction.support.DefaultTransactionDefinition;
030: import org.springframework.transaction.support.TransactionSynchronization;
031: import org.springframework.transaction.support.TransactionSynchronizationManager;
032:
033: public class SpringSyncTransaction extends AbstractTransaction {
034:
035: private static final Log log = LogFactory
036: .getLog(SpringSyncTransaction.class);
037:
038: private TransactionStatus status;
039:
040: /**
041: * This flag means if this transaction controls the COMPASS transaction (i.e. it is the top level compass
042: * transaction)
043: */
044: private boolean controllingNewTransaction = false;
045:
046: private boolean commitFailed;
047:
048: private PlatformTransactionManager transactionManager;
049:
050: private InternalCompassSession session;
051:
052: public SpringSyncTransaction(TransactionFactory transactionFactory) {
053: super (transactionFactory);
054: }
055:
056: public void begin(PlatformTransactionManager transactionManager,
057: InternalCompassSession session,
058: TransactionIsolation transactionIsolation,
059: boolean commitBeforeCompletion) {
060:
061: this .session = session;
062: this .transactionManager = transactionManager;
063:
064: // the factory called begin, so we are in charge, if we were not, than
065: // it would not call begin (we are in charge of the COMPASS transaction,
066: // the spring one is handled later)
067: controllingNewTransaction = true;
068:
069: if (transactionManager != null) {
070: DefaultTransactionDefinition transactionDefinition = new DefaultTransactionDefinition();
071: transactionDefinition
072: .setPropagationBehavior(DefaultTransactionDefinition.PROPAGATION_REQUIRED);
073: boolean readOnly = false;
074: if (transactionIsolation == TransactionIsolation.READ_ONLY_READ_COMMITTED) {
075: readOnly = true;
076: }
077: transactionDefinition.setReadOnly(readOnly);
078: status = transactionManager
079: .getTransaction(transactionDefinition);
080: }
081:
082: session.getSearchEngine().begin(transactionIsolation);
083:
084: SpringTransactionSynchronization sync;
085: if (transactionManager != null) {
086: if (log.isDebugEnabled()) {
087: if (status.isNewTransaction()) {
088: log
089: .debug("Beginning new Spring transaction, and a new compass transaction on thread ["
090: + Thread.currentThread().getName()
091: + "] with isolation ["
092: + transactionIsolation + "]");
093: } else {
094: log
095: .debug("Joining Spring transaction, and starting a new compass transaction on thread ["
096: + Thread.currentThread().getName()
097: + "] with isolation ["
098: + transactionIsolation + "]");
099: }
100: }
101: sync = new SpringTransactionSynchronization(session, status
102: .isNewTransaction(), commitBeforeCompletion,
103: transactionFactory);
104: } else {
105: if (log.isDebugEnabled()) {
106: log
107: .debug("Joining Spring transaction, and starting a new compass transaction on thread ["
108: + Thread.currentThread().getName()
109: + "]");
110: }
111: sync = new SpringTransactionSynchronization(session, false,
112: commitBeforeCompletion, transactionFactory);
113: }
114: TransactionSynchronizationManager.registerSynchronization(sync);
115:
116: setBegun(true);
117: }
118:
119: /**
120: * Called by factory when already in a running compass transaction
121: */
122: public void join(InternalCompassSession session)
123: throws CompassException {
124: this .session = session;
125: controllingNewTransaction = false;
126: if (log.isDebugEnabled()) {
127: log
128: .debug("Joining an existing compass transcation on thread ["
129: + Thread.currentThread().getName() + "]");
130: }
131: }
132:
133: protected void doCommit() throws CompassException {
134: if (!controllingNewTransaction) {
135: if (log.isDebugEnabled()) {
136: log
137: .debug("Not committing transaction since compass does not control it on thread ["
138: + Thread.currentThread().getName()
139: + "]");
140: }
141: return;
142: }
143:
144: if (transactionManager == null) {
145: // do nothing, it could only get here if the spring transaction was
146: // started and we synch on it
147: return;
148: }
149:
150: if (status.isNewTransaction()) {
151: if (log.isDebugEnabled()) {
152: log
153: .debug("Committing Spring transaction controlled by compass on thread ["
154: + Thread.currentThread().getName()
155: + "]");
156: }
157: try {
158: transactionManager.commit(status);
159: } catch (Exception e) {
160: commitFailed = true;
161: // so the transaction is already rolled back, by JTA spec
162: throw new TransactionException("Commit failed", e);
163: }
164: } else {
165: if (log.isDebugEnabled()) {
166: log
167: .debug("Commit called, let Spring synchronization commit the transaciton on thread ["
168: + Thread.currentThread().getName()
169: + "]");
170: }
171: }
172: }
173:
174: protected void doRollback() throws CompassException {
175:
176: if (transactionManager == null) {
177: // do nothing, it could only get here if the spring transaction was
178: // started and we synch on it
179: return;
180: }
181:
182: try {
183: if (status.isNewTransaction()) {
184: if (log.isDebugEnabled()) {
185: log
186: .debug("Rolling back Spring transaction controlled by compass on thread ["
187: + Thread.currentThread().getName()
188: + "]");
189: }
190: if (!commitFailed)
191: transactionManager.rollback(status);
192: } else {
193: if (log.isDebugEnabled()) {
194: log
195: .debug("Marking Spring transaction as rolled back since compass controlls it on thread ["
196: + Thread.currentThread().getName()
197: + "]");
198: }
199: status.setRollbackOnly();
200: }
201: } catch (Exception e) {
202: throw new TransactionException(
203: "Rollback failed with exception", e);
204: }
205: }
206:
207: public boolean wasRolledBack() throws CompassException {
208: throw new TransactionException("Not supported");
209: }
210:
211: public boolean wasCommitted() throws CompassException {
212: throw new TransactionException("Not supported");
213: }
214:
215: public CompassSession getSession() {
216: return this .session;
217: }
218:
219: public static class SpringTransactionSynchronization implements
220: TransactionSynchronization {
221:
222: private InternalCompassSession session;
223:
224: private boolean compassControledTransaction;
225:
226: private boolean commitBeforeCompletion;
227:
228: private TransactionFactory transactionFactory;
229:
230: public SpringTransactionSynchronization(
231: InternalCompassSession session,
232: boolean compassControledTransaction,
233: boolean commitBeforeCompletion,
234: TransactionFactory transactionFactory) {
235: this .session = session;
236: this .compassControledTransaction = compassControledTransaction;
237: this .commitBeforeCompletion = commitBeforeCompletion;
238: this .transactionFactory = transactionFactory;
239: }
240:
241: public InternalCompassSession getSession() {
242: return this .session;
243: }
244:
245: public void suspend() {
246: }
247:
248: public void resume() {
249: }
250:
251: public void beforeCommit(boolean readOnly) {
252: }
253:
254: public void beforeCompletion() {
255: if (!commitBeforeCompletion) {
256: return;
257: }
258: if (log.isDebugEnabled()) {
259: log
260: .debug("Committing compass transaction using Spring synchronization beforeCompletion on thread ["
261: + Thread.currentThread().getName()
262: + "]");
263: }
264: session.getSearchEngine().commit(true);
265: }
266:
267: public void afterCommit() {
268:
269: }
270:
271: public void afterCompletion(int status) {
272: try {
273: if (status == STATUS_COMMITTED) {
274: if (!commitBeforeCompletion) {
275: if (log.isDebugEnabled()) {
276: log
277: .debug("Committing compass transaction using Spring synchronization afterCompletion on thread ["
278: + Thread.currentThread()
279: .getName() + "]");
280: }
281: session.getSearchEngine().commit(true);
282: }
283: } else {
284: // in case of STATUS_ROLLBACK or STATUS_UNKNOWN
285: if (log.isDebugEnabled()) {
286: log
287: .debug("Rolling back compass transaction using Spring synchronization afterCompletion on thread ["
288: + Thread.currentThread()
289: .getName() + "]");
290: }
291: session.getSearchEngine().rollback();
292: }
293: } catch (Exception e) {
294: log.error(
295: "Exception occured when sync with transaction",
296: e);
297: // TODO swallow??????
298: } finally {
299: ((SpringSyncTransactionFactory) transactionFactory)
300: .unbindSessionFromTransaction(this );
301: session.evictAll();
302: // close the session AFTER we cleared it from the transaction,
303: // so it will be actually closed. Also close it only if we do
304: // not contoll the transaction
305: // (otherwise it will be closed by the calling template)
306: if (!compassControledTransaction) {
307: session.close();
308: }
309: }
310: }
311:
312: }
313:
314: }
|