001: /**
002: * Copyright (C) 2001-2005 France Telecom R&D
003: *
004: * This library is free software; you can redistribute it and/or
005: * modify it under the terms of the GNU Lesser General Public
006: * License as published by the Free Software Foundation; either
007: * version 2 of the License, or (at your option) any later version.
008: *
009: * This library is distributed in the hope that it will be useful,
010: * but WITHOUT ANY WARRANTY; without even the implied warranty of
011: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
012: * Lesser General Public License for more details.
013: *
014: * You should have received a copy of the GNU Lesser General Public
015: * License along with this library; if not, write to the Free Software
016: * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
017: */package org.objectweb.speedo.workingset.lib;
018:
019: import org.objectweb.fractal.api.Component;
020: import org.objectweb.fractal.api.control.LifeCycleController;
021: import org.objectweb.jorm.api.PMapper;
022: import org.objectweb.medor.eval.prefetch.api.PrefetchCache;
023: import org.objectweb.perseus.persistence.api.ConnectionHolder;
024: import org.objectweb.perseus.persistence.api.PersistenceException;
025: import org.objectweb.perseus.persistence.api.State;
026: import org.objectweb.perseus.persistence.api.TransactionalPersistenceManager;
027: import org.objectweb.perseus.persistence.api.TransactionalWorkingSet;
028: import org.objectweb.perseus.persistence.api.VirtualState;
029: import org.objectweb.perseus.persistence.api.WorkingSet;
030: import org.objectweb.perseus.persistence.api.WorkingSetLifeCycle;
031: import org.objectweb.perseus.persistence.lib.BasicWorkingSet;
032: import org.objectweb.speedo.api.ExceptionHelper;
033: import org.objectweb.speedo.api.SpeedoProperties;
034: import org.objectweb.speedo.api.SpeedoRuntimeException;
035: import org.objectweb.speedo.api.TransactionListener;
036: import org.objectweb.speedo.mim.api.StateItf;
037: import org.objectweb.speedo.pm.api.POManagerItf;
038: import org.objectweb.speedo.workingset.api.TransactionItf;
039: import org.objectweb.util.monolog.api.BasicLevel;
040:
041: import java.util.ArrayList;
042: import java.util.Iterator;
043:
044: import javax.transaction.Status;
045: import javax.transaction.Synchronization;
046:
047: /**
048: *
049: *
050: * @author S.Chassande-Barrioz
051: */
052: public abstract class AbstractTransaction extends BasicWorkingSet
053: implements TransactionItf, LifeCycleController {
054:
055: public final static String PO_MANAGER_BINDING = "po-manager";
056: public final static String MAPPER_BINDING = "mapper";
057: public final static String TRANSACTIONAL_PERSISTENCE_MANAGER_BINDING = "transactional-persistence-manager";
058: public final static String COMPONENT_BINDING = "component";
059:
060: public static TransactionListener txListener = null;
061:
062: /**
063: * is the mapper permitting to reach the prefetch cache and to invalidate
064: * prefetched buffer at working set closing time.
065: */
066: protected PMapper mapper = null;
067:
068: /**
069: * Is used to delegates working set/transaction demercation
070: */
071: protected TransactionalPersistenceManager tpm = null;
072:
073: protected boolean nontransactionalRead;
074: protected boolean nontransactionalWrite;
075:
076: /**
077: * Indicates if the transaction is optimistic.
078: */
079: protected boolean optimistic;
080:
081: /**
082: * The JDO user synchronization registered (can be null if none has been
083: * registered).
084: */
085: protected Synchronization synchronization = null;
086:
087: /**
088: * indicates if the transaction is managed by a J2EE environnement.
089: */
090: protected boolean managedEnv = false;
091:
092: /**
093: * Indicates if the jdo transaction must be rolledback.
094: */
095: protected boolean rollbackOnly = false;
096:
097: /**
098: * the reference to this component in Speedo
099: */
100: protected TransactionItf this T;
101:
102: /**
103: * Is the linked po manager.
104: */
105: protected POManagerItf pm = null;
106:
107: /**
108: *
109: */
110: public AbstractTransaction() {
111: super ();
112: }
113:
114: /**
115: * Attaches an entry to the transaction.
116: * Plus version update.
117: * @param state the state which must be attached to the transaction
118: * @param mode the action that stared the binding: either read or write intention
119: */
120: public State bind(State state, Object oid, byte mode) {
121: State old = super .bind(state, oid, mode);
122: if (!(state instanceof VirtualState)) {
123: if (mode == BasicWorkingSet.WRITE_INTENTION) {
124: StateItf sa = (StateItf) state;
125: sa.speedoChangeVersion();
126: }
127: }
128: return old;
129: }
130:
131: /**
132: * Invalidates the prefetch buffer associated to this working set.
133: *
134: * @throws PersistenceException
135: */
136: public void beforeWSPrepare() throws PersistenceException {
137: logger.log(BasicLevel.DEBUG, "Starting beforeWSPrepare");
138: Iterator it = oid2state.values().iterator();
139: ArrayList exceptions = null;
140: while (it.hasNext()) {
141: org.objectweb.perseus.persistence.api.State state = (org.objectweb.perseus.persistence.api.State) it
142: .next();
143: if (state == VirtualState.instance) {
144: continue;
145: }
146: try {
147: ((StateItf) state).prepareWrite();
148: } catch (Exception e) {
149: if (exceptions == null) {
150: exceptions = new ArrayList();
151: }
152: exceptions.add(e);
153: int level = e instanceof RuntimeException ? BasicLevel.DEBUG
154: : BasicLevel.ERROR;
155: if (logger.isLoggable(level)) {
156: logger.log(level,
157: "Error on StateItf preparation for flushing: "
158: + "\n\tstate.ce.identifier="
159: + state.getCacheEntry()
160: .getCeIdentifier()
161: + "\n\tstate=" + state
162: + "\n\texception: ", e);
163: }
164: }
165: }
166: // close the prefetch buffers associated to the context
167: PrefetchCache pc = mapper.getPrefetchCache();
168: if (pc != null) {
169: pc.invalidatePrefetchBuffer(this T);
170: }
171: logger.log(BasicLevel.DEBUG, "Ending beforeWSPrepare");
172: if (exceptions != null) {
173: throw new PersistenceException(new SpeedoRuntimeException(
174: "Impossible to prepare instances before flushing",
175: (Exception[]) exceptions
176: .toArray(new Exception[exceptions.size()])));
177: }
178: }
179:
180: /**
181: * Signal to the persistent instances reached in the working set that the
182: * current working set is closed. Some actions on persistent instances at
183: * this time can be done, such as reference unswizlling
184: */
185: public void onWSEnd() {
186: logger.log(BasicLevel.DEBUG, "Starting onWSEnd");
187: ArrayList exceptions = null;
188: if (!oid2state.isEmpty()) {
189: Iterator it = oid2state.values().iterator();
190: while (it.hasNext()) {
191: org.objectweb.perseus.persistence.api.State state = (org.objectweb.perseus.persistence.api.State) it
192: .next();
193: if (state == VirtualState.instance) {
194: continue;
195: }
196: try {
197: ((StateItf) state).workingSetClosed();
198: } catch (Exception e) {
199: if (exceptions == null) {
200: exceptions = new ArrayList();
201: }
202: exceptions.add(e);
203: if (!(e instanceof RuntimeException)) {
204: logger.log(BasicLevel.ERROR,
205: "Error on workingSetClosed for the StateItf: "
206: + "\n\tstate.ce.identifier="
207: + state.getCacheEntry()
208: .getCeIdentifier()
209: + "\n\tstate=" + state
210: + "\n\texception: ", e);
211: }
212: }
213: }
214: }
215: if (txListener != null) {
216: txListener.transactionPreValidate(this , oid2state.size());
217: }
218: logger.log(BasicLevel.DEBUG, "Ending onWSEnd");
219: if (exceptions != null) {
220: throw new SpeedoRuntimeException(
221: "Error when signal the close of working set on states:",
222: (Exception[]) exceptions
223: .toArray(new Exception[exceptions.size()]));
224: }
225: }
226:
227: // IMPLEMENTATION OF THE LifeCycleController INTERFACE //
228: //-----------------------------------------------------//
229:
230: public String getFcState() {
231: return null;
232: }
233:
234: public void startFc() {
235: managedEnv = pm.getPOManagerFactory().getProperties()
236: .getProperty(SpeedoProperties.MANAGED, "").equals(
237: "true");
238: }
239:
240: public void stopFc() {
241: }
242:
243: // IMPLEMENTATION OF THE UserBindingController INTERFACE //
244: //-------------------------------------------------------//
245:
246: public String[] listFc() {
247: String[] names = super .listFc();
248: String[] itfs = new String[names.length + 3];
249: itfs[0] = PO_MANAGER_BINDING;
250: itfs[1] = TRANSACTIONAL_PERSISTENCE_MANAGER_BINDING;
251: itfs[2] = MAPPER_BINDING;
252: System.arraycopy(names, 0, itfs, 3, names.length);
253: return itfs;
254: }
255:
256: public Object lookupFc(String c) {
257: if (PO_MANAGER_BINDING.equals(c))
258: return pm;
259: else if (TRANSACTIONAL_PERSISTENCE_MANAGER_BINDING.equals(c))
260: return tpm;
261: else if (MAPPER_BINDING.equals(c))
262: return mapper;
263: else
264: return super .lookupFc(c);
265: }
266:
267: public void bindFc(String c, Object s) {
268: if (PO_MANAGER_BINDING.equals(c))
269: pm = (POManagerItf) s;
270: else if (TRANSACTIONAL_PERSISTENCE_MANAGER_BINDING.equals(c))
271: tpm = (TransactionalPersistenceManager) s;
272: else if (MAPPER_BINDING.equals(c))
273: mapper = (PMapper) s;
274: else if (COMPONENT_BINDING.equals(c)) {
275: try {
276: this T = (TransactionItf) ((Component) s)
277: .getFcInterface("transaction");
278: } catch (Exception e) {
279: throw new SpeedoRuntimeException(
280: "Impossible to get self transaction",
281: ExceptionHelper.getNested(e));
282: }
283: } else
284: super .bindFc(c, s);
285: }
286:
287: public void unbindFc(String c) {
288: if (PO_MANAGER_BINDING.equals(c))
289: pm = null;
290: else if (TRANSACTIONAL_PERSISTENCE_MANAGER_BINDING.equals(c))
291: tpm = null;
292: else if (MAPPER_BINDING.equals(c))
293: mapper = null;
294: else
295: super .unbindFc(c);
296: }
297:
298: public void setStatus(byte status) throws PersistenceException {
299: try {
300: switch (status) {
301: case TransactionalWorkingSet.CTX_PREPARED:
302: beforeWSPrepare();
303: break;
304: case TransactionalWorkingSet.CTX_COMMITTED:
305: case TransactionalWorkingSet.CTX_ABORTED:
306: onWSEnd();
307: break;
308: case TransactionalWorkingSet.CTX_CLOSED:
309: if (oid2state.isEmpty()) {
310: // close the prefetch buffers associated to the context
311: PrefetchCache pc = mapper.getPrefetchCache();
312: if (pc != null) {
313: pc.invalidatePrefetchBuffer(this T);
314: }
315: }
316: onWSEnd();
317: break;
318: }
319: } finally {
320: super .setStatus(status);
321: }
322: }
323:
324: public boolean isActive() {
325: switch (status) {
326: case TransactionalWorkingSet.CTX_ACTIVE_TRANSACTIONAL:
327: case TransactionalWorkingSet.CTX_PREPARED:
328: case TransactionalWorkingSet.CTX_PREPARED_OK:
329: case TransactionalWorkingSet.CTX_PREPARED_FAIL:
330: return true;
331: case TransactionalWorkingSet.CTX_ACTIVE:
332: case TransactionalWorkingSet.CTX_COMMITTED:
333: case TransactionalWorkingSet.CTX_ABORTED:
334: case TransactionalWorkingSet.CTX_CLOSED:
335: default:
336: return false;
337: }
338: }
339:
340: /**
341: * @see org.objectweb.speedo.workingset.api.TransactionItf#begin()
342: */
343: public void begin() {
344: logger.log(BasicLevel.INFO, "Begin the transaction");
345: rollbackOnly = false; //initialize the flag
346: try {
347: tpm.begin(this T);
348: } catch (PersistenceException e) {
349: Exception ie = ExceptionHelper.getNested(e);
350: logger.log(BasicLevel.ERROR,
351: "Error during the begin of the transaction:", ie);
352: throw new SpeedoRuntimeException("", ie);
353: }
354: if (txListener != null) {
355: txListener.transactionBegun(this );
356: }
357: }
358:
359: /**
360: * @see org.objectweb.speedo.workingset.api.TransactionItf#commit()
361: */
362: public void commit() {
363: int size = oid2state.size();
364: logger.log(BasicLevel.INFO,
365: "Commit the transaction, working set size: " + size);
366: if (synchronization != null) {
367: synchronization.beforeCompletion();
368: }
369: //register working set size for statistics
370: boolean validated = rollbackOnly;
371: try {
372: if (rollbackOnly) {
373: tpm.rollback(this T);
374: } else {
375: tpm.commit(this T);
376: validated = true;
377: }
378: } catch (PersistenceException e) {
379: Exception ie = ExceptionHelper.getNested(e);
380: if (ie instanceof RuntimeException) {
381: throw (RuntimeException) ie;
382: } else {
383: ie = new SpeedoRuntimeException(
384: "JDOTransactionItf rolledback due to an exception at commit time: ",
385: ie);
386: logger.log(BasicLevel.INFO, ie.getMessage(), ie);
387: throw (SpeedoRuntimeException) ie;
388: }
389: } finally {
390: if (synchronization != null)
391: synchronization
392: .afterCompletion((validated ? Status.STATUS_COMMITTED
393: : Status.STATUS_ROLLEDBACK));
394: if (txListener != null) {
395: if (validated) {
396: txListener.transactionCommitted(this , size);
397: } else {
398: txListener.transactionAborted(this , size);
399: }
400: }
401: }
402: }
403:
404: /**
405: * @see org.objectweb.speedo.workingset.api.TransactionItf#rollback()
406: */
407: public void rollback() {
408: logger.log(BasicLevel.INFO, "Roll back a transaction: ");
409: int size = oid2state.size();
410: try {
411: tpm.rollback(this T);
412: } catch (PersistenceException e) {
413: Exception ie = ExceptionHelper.getNested(e);
414: logger
415: .log(
416: BasicLevel.ERROR,
417: "Error during the rollback of the transaction:",
418: ie);
419: throw new SpeedoRuntimeException("", ie);
420: } finally {
421: if (txListener != null) {
422: txListener.transactionAborted(this , size);
423: }
424: if (synchronization != null) {
425: synchronization
426: .afterCompletion(Status.STATUS_ROLLEDBACK);
427: }
428: }
429: }
430:
431: /**
432: * It activates the working set. This is used to delimit the begining of
433: * the working set.
434: */
435: public void activate() throws PersistenceException {
436: tpm.createWS(this T);
437: try {
438: status = WorkingSetLifeCycle.getNextStatus(status,
439: WorkingSetLifeCycle.ACTIVE_ACTION);
440: } catch (PersistenceException e) {
441: logger.log(BasicLevel.WARN,
442: "Bad initial state of the working set:", e);
443: status = WorkingSet.CTX_ACTIVE;
444: }
445: }
446:
447: public boolean isManagedEnv() {
448: return managedEnv;
449: }
450:
451: public void setConnectionHolder(ConnectionHolder ch) {
452: connectionHolder = ch;
453: connectionHolder.bindWorkingSet(this T);
454: }
455:
456: public boolean getRollbackOnly() {
457: return rollbackOnly;
458: }
459:
460: public void setRollbackOnly() {
461: rollbackOnly = true;
462: }
463:
464: public POManagerItf getPOManager() {
465: return pm;
466: }
467: }
|