001: /*
002: * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright
003: * notice. All rights reserved.
004: */
005: package com.tc.objectserver.impl;
006:
007: import com.tc.logging.TCLogger;
008: import com.tc.logging.TCLogging;
009: import com.tc.net.groups.NodeID;
010: import com.tc.object.tx.ServerTransactionID;
011: import com.tc.objectserver.api.ObjectManager;
012: import com.tc.objectserver.api.ObjectRequestManager;
013: import com.tc.objectserver.context.ManagedObjectRequestContext;
014: import com.tc.objectserver.tx.ServerTransactionListener;
015: import com.tc.objectserver.tx.ServerTransactionManager;
016: import com.tc.util.State;
017:
018: import java.util.Collection;
019: import java.util.HashSet;
020: import java.util.Iterator;
021: import java.util.LinkedList;
022: import java.util.List;
023: import java.util.Set;
024:
025: public class ObjectRequestManagerImpl implements ObjectRequestManager,
026: ServerTransactionListener {
027:
028: private final static TCLogger logger = TCLogging
029: .getLogger(ObjectRequestManagerImpl.class);
030:
031: private final static State INIT = new State("INITIAL");
032: private final static State STARTING = new State("STARTING");
033: private final static State STARTED = new State("STARTED");
034:
035: private final ObjectManager objectManager;
036: private final ServerTransactionManager transactionManager;
037:
038: private final List pendingRequests = new LinkedList();
039: private final Set resentTransactionIDs = new HashSet();
040: private volatile State state = INIT;
041:
042: public ObjectRequestManagerImpl(ObjectManager objectManager,
043: ServerTransactionManager transactionManager) {
044: this .objectManager = objectManager;
045: this .transactionManager = transactionManager;
046: transactionManager.addTransactionListener(this );
047:
048: }
049:
050: public synchronized void transactionManagerStarted(Set cids) {
051: state = STARTING;
052: objectManager.start();
053: moveToStartedIfPossible();
054: }
055:
056: private void moveToStartedIfPossible() {
057: if (state == STARTING && resentTransactionIDs.isEmpty()) {
058: state = STARTED;
059: transactionManager.removeTransactionListener(this );
060: processPending();
061: }
062: }
063:
064: public void requestObjects(
065: ManagedObjectRequestContext responseContext,
066: int maxReachableObjects) {
067: synchronized (this ) {
068: if (state != STARTED) {
069: pendingRequests.add(new PendingRequest(responseContext,
070: maxReachableObjects));
071: return;
072: }
073: }
074: objectManager.lookupObjectsAndSubObjectsFor(responseContext
075: .getRequestedNodeID(), responseContext,
076: maxReachableObjects);
077: }
078:
079: public synchronized void addResentServerTransactionIDs(
080: Collection sTxIDs) {
081: if (state != INIT) {
082: throw new AssertionError(
083: "Cant add Resent transactions after start up ! "
084: + sTxIDs.size() + "Txns : " + state);
085: }
086: resentTransactionIDs.addAll(sTxIDs);
087: logger.info("resentTransactions = "
088: + resentTransactionIDs.size());
089: }
090:
091: public void transactionCompleted(ServerTransactionID stxID) {
092: return;
093: }
094:
095: public void incomingTransactions(NodeID source, Set serverTxnIDs) {
096: return;
097: }
098:
099: public synchronized void clearAllTransactionsFor(NodeID client) {
100: if (state == STARTED)
101: return;
102: for (Iterator iter = resentTransactionIDs.iterator(); iter
103: .hasNext();) {
104: ServerTransactionID stxID = (ServerTransactionID) iter
105: .next();
106: if (stxID.getSourceID().equals(client)) {
107: iter.remove();
108: }
109: }
110: moveToStartedIfPossible();
111: }
112:
113: private void processPending() {
114: logger.info("Processing Pending Lookups = "
115: + pendingRequests.size());
116: for (Iterator iter = pendingRequests.iterator(); iter.hasNext();) {
117: PendingRequest request = (PendingRequest) iter.next();
118: logger.info("Processing pending Looking up : "
119: + request.getResponseContext());
120: objectManager.lookupObjectsAndSubObjectsFor(request
121: .getResponseContext().getRequestedNodeID(), request
122: .getResponseContext(), request
123: .getMaxReachableObjects());
124: }
125: }
126:
127: public synchronized void transactionApplied(
128: ServerTransactionID stxID) {
129: resentTransactionIDs.remove(stxID);
130: moveToStartedIfPossible();
131: }
132:
133: private final static class PendingRequest {
134:
135: private final ManagedObjectRequestContext responseContext;
136: private final int maxReachableObjects;
137:
138: public PendingRequest(
139: ManagedObjectRequestContext responseContext,
140: int maxReachableObjects) {
141: this .responseContext = responseContext;
142: this .maxReachableObjects = maxReachableObjects;
143: }
144:
145: public int getMaxReachableObjects() {
146: return maxReachableObjects;
147: }
148:
149: public ManagedObjectRequestContext getResponseContext() {
150: return responseContext;
151: }
152: }
153:
154: }
|