001: /*
002: * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright
003: * notice. All rights reserved.
004: */
005: package com.tc.objectserver.handler;
006:
007: import com.tc.async.api.AbstractEventHandler;
008: import com.tc.async.api.ConfigurationContext;
009: import com.tc.async.api.EventContext;
010: import com.tc.async.api.Sink;
011: import com.tc.object.gtx.GlobalTransactionID;
012: import com.tc.object.gtx.GlobalTransactionManager;
013: import com.tc.object.lockmanager.api.Notify;
014: import com.tc.object.tx.ServerTransactionID;
015: import com.tc.objectserver.api.ObjectInstanceMonitor;
016: import com.tc.objectserver.context.ApplyTransactionContext;
017: import com.tc.objectserver.context.BroadcastChangeContext;
018: import com.tc.objectserver.core.api.ServerConfigurationContext;
019: import com.tc.objectserver.lockmanager.api.LockManager;
020: import com.tc.objectserver.lockmanager.api.NotifiedWaiters;
021: import com.tc.objectserver.managedobject.BackReferences;
022: import com.tc.objectserver.tx.ServerTransaction;
023: import com.tc.objectserver.tx.ServerTransactionManager;
024: import com.tc.objectserver.tx.TransactionalObjectManager;
025:
026: import java.util.Iterator;
027:
028: /**
029: * Applies all the changes in a transaction then releases the objects and passes the changes off to be broadcast to the
030: * interested client sessions
031: *
032: * @author steve
033: */
034: public class ApplyTransactionChangeHandler extends AbstractEventHandler {
035:
036: // Every 100 transactions, it updates the LWM
037: private static final int LOW_WATER_MARK_UPDATE_FREQUENCY = 100;
038:
039: private ServerTransactionManager transactionManager;
040: private LockManager lockManager;
041: private Sink broadcastChangesSink;
042: private final ObjectInstanceMonitor instanceMonitor;
043: private final GlobalTransactionManager gtxm;
044: private TransactionalObjectManager txnObjectMgr;
045:
046: private int count = 0;
047: private GlobalTransactionID lowWaterMark = GlobalTransactionID.NULL_ID;
048:
049: public ApplyTransactionChangeHandler(
050: ObjectInstanceMonitor instanceMonitor,
051: GlobalTransactionManager gtxm) {
052: this .instanceMonitor = instanceMonitor;
053: this .gtxm = gtxm;
054: }
055:
056: public void handleEvent(EventContext context) {
057: ApplyTransactionContext atc = (ApplyTransactionContext) context;
058: final ServerTransaction txn = atc.getTxn();
059:
060: NotifiedWaiters notifiedWaiters = new NotifiedWaiters();
061: final ServerTransactionID stxnID = txn.getServerTransactionID();
062: final BackReferences includeIDs = new BackReferences();
063:
064: if (atc.needsApply()) {
065: transactionManager.apply(txn, atc.getObjects(), includeIDs,
066: instanceMonitor);
067: txnObjectMgr.applyTransactionComplete(stxnID);
068: } else {
069: transactionManager.skipApplyAndCommit(txn);
070: getLogger().warn(
071: "Not applying previously applied transaction: "
072: + stxnID);
073: }
074:
075: for (Iterator i = txn.getNotifies().iterator(); i.hasNext();) {
076: Notify notify = (Notify) i.next();
077: lockManager.notify(notify.getLockID(), txn.getSourceID(),
078: notify.getThreadID(), notify.getIsAll(),
079: notifiedWaiters);
080: }
081:
082: if (txn.needsBroadcast()) {
083: if (count == 0) {
084: lowWaterMark = gtxm
085: .getLowGlobalTransactionIDWatermark();
086: }
087: count = count++ % LOW_WATER_MARK_UPDATE_FREQUENCY;
088: broadcastChangesSink.add(new BroadcastChangeContext(txn,
089: lowWaterMark, notifiedWaiters, includeIDs));
090: }
091: }
092:
093: public void initialize(ConfigurationContext context) {
094: super .initialize(context);
095: ServerConfigurationContext scc = (ServerConfigurationContext) context;
096: this.transactionManager = scc.getTransactionManager();
097: this.broadcastChangesSink = scc.getStage(
098: ServerConfigurationContext.BROADCAST_CHANGES_STAGE)
099: .getSink();
100: this.txnObjectMgr = scc.getTransactionalObjectManager();
101: this.lockManager = scc.getLockManager();
102: }
103: }
|