001: /*
002: * All content copyright (c) 2003-2007 Terracotta, Inc., except as may otherwise be noted in a separate copyright
003: * notice. All rights reserved.
004: */
005: package com.tc.l2.objectserver;
006:
007: import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList;
008:
009: import com.tc.async.api.Sink;
010: import com.tc.l2.context.ManagedObjectSyncContext;
011: import com.tc.logging.TCLogger;
012: import com.tc.logging.TCLogging;
013: import com.tc.net.groups.NodeID;
014: import com.tc.objectserver.api.ObjectManager;
015: import com.tc.objectserver.tx.ServerTransactionManager;
016: import com.tc.objectserver.tx.TxnsInSystemCompletionLister;
017: import com.tc.util.Assert;
018: import com.tc.util.State;
019: import com.tc.util.concurrent.CopyOnWriteArrayMap;
020:
021: import java.util.Collection;
022: import java.util.HashMap;
023: import java.util.HashSet;
024: import java.util.Iterator;
025: import java.util.Map;
026: import java.util.Set;
027:
028: public class L2ObjectStateManagerImpl implements L2ObjectStateManager {
029:
030: private static final TCLogger logger = TCLogging
031: .getLogger(L2ObjectStateManagerImpl.class);
032:
033: private final ObjectManager objectManager;
034: private final CopyOnWriteArrayMap nodes = new CopyOnWriteArrayMap();
035: private final CopyOnWriteArrayList listeners = new CopyOnWriteArrayList();
036: private final ServerTransactionManager transactionManager;
037:
038: public L2ObjectStateManagerImpl(ObjectManager objectManager,
039: ServerTransactionManager transactionManager) {
040: this .objectManager = objectManager;
041: this .transactionManager = transactionManager;
042: }
043:
044: public void registerForL2ObjectStateChangeEvents(
045: L2ObjectStateListener listener) {
046: listeners.add(listener);
047: }
048:
049: private void fireMissingObjectsStateEvent(NodeID nodeID,
050: int missingObjects) {
051: for (Iterator i = listeners.iterator(); i.hasNext();) {
052: L2ObjectStateListener l = (L2ObjectStateListener) i.next();
053: l.missingObjectsFor(nodeID, missingObjects);
054: }
055: }
056:
057: private void fireObjectSyncCompleteEvent(NodeID nodeID) {
058: for (Iterator i = listeners.iterator(); i.hasNext();) {
059: L2ObjectStateListener l = (L2ObjectStateListener) i.next();
060: l.objectSyncCompleteFor(nodeID);
061: }
062: }
063:
064: public int getL2Count() {
065: return nodes.size();
066: }
067:
068: public void removeL2(NodeID nodeID) {
069: Object l2State = nodes.remove(nodeID);
070: if (l2State == null) {
071: logger.warn("L2State Not found for " + nodeID);
072: }
073: }
074:
075: public boolean addL2(NodeID nodeID, Set oids) {
076: L2ObjectStateImpl l2State;
077: synchronized (nodes) {
078: l2State = (L2ObjectStateImpl) nodes.get(nodeID);
079: if (l2State != null) {
080: logger
081: .warn("L2State already present for "
082: + nodeID
083: + ". "
084: + l2State
085: + " IGNORING setExistingObjectsList : oids count = "
086: + oids.size());
087: return false;
088: }
089: l2State = new L2ObjectStateImpl(nodeID, oids);
090: nodes.put(nodeID, l2State);
091: }
092: final L2ObjectStateImpl _l2State = l2State;
093: transactionManager
094: .callBackOnTxnsInSystemCompletion(new TxnsInSystemCompletionLister() {
095: public void onCompletion() {
096: _l2State.moveToReadyToSyncState();
097: }
098: });
099: return true;
100: }
101:
102: public ManagedObjectSyncContext getSomeObjectsToSyncContext(
103: NodeID nodeID, int count, Sink sink) {
104: L2ObjectStateImpl l2State = (L2ObjectStateImpl) nodes
105: .get(nodeID);
106: if (l2State != null) {
107: return l2State.getSomeObjectsToSyncContext(count, sink);
108: } else {
109: logger.warn("L2 State Object Not found for " + nodeID);
110: return null;
111: }
112: }
113:
114: public void close(ManagedObjectSyncContext mosc) {
115: L2ObjectStateImpl l2State = (L2ObjectStateImpl) nodes.get(mosc
116: .getNodeID());
117: if (l2State != null) {
118: l2State.close(mosc);
119: } else {
120: logger.warn("close() : L2 State Object Not found for "
121: + mosc.getNodeID());
122: }
123: }
124:
125: public Collection getL2ObjectStates() {
126: return nodes.values();
127: }
128:
129: private static final State START = new State("START");
130: private static final State READY_TO_SYNC = new State(
131: "READY_TO_SYNC");
132: private static final State SYNC_STARTED = new State("SYNC_STARTED");
133: private static final State IN_SYNC_PENDING_NOTIFY = new State(
134: "IN_SYNC_PENDING_NOTIFY");
135: private static final State IN_SYNC = new State("IN_SYNC");
136:
137: private final class L2ObjectStateImpl implements L2ObjectState {
138:
139: private final NodeID nodeID;
140:
141: private Set missingOids;
142: private Map missingRoots;
143: private Set existingOids;
144:
145: private volatile State state = START;
146:
147: private ManagedObjectSyncContext syncingContext = null;
148:
149: public L2ObjectStateImpl(NodeID nodeID, Set oids) {
150: this .nodeID = nodeID;
151: this .existingOids = oids;
152: }
153:
154: private void close(ManagedObjectSyncContext mosc) {
155: Assert.assertTrue(mosc == syncingContext);
156: syncingContext = null;
157: if (missingOids.isEmpty()) {
158: state = IN_SYNC_PENDING_NOTIFY;
159: transactionManager
160: .callBackOnTxnsInSystemCompletion(new TxnsInSystemCompletionLister() {
161: public void onCompletion() {
162: moveToInSyncState();
163: }
164: });
165: }
166: }
167:
168: private ManagedObjectSyncContext getSomeObjectsToSyncContext(
169: int count, Sink sink) {
170: Assert.assertTrue(state == SYNC_STARTED);
171: Assert.assertNull(syncingContext);
172: if (isRootsMissing()) {
173: return getMissingRootsSynccontext(sink);
174: }
175: Set oids = new HashSet(count);
176: for (Iterator i = missingOids.iterator(); i.hasNext()
177: && --count >= 0;) {
178: oids.add(i.next());
179: // XXX::FIXME This has to be commented because ObjectIDSet2 doesnt support remove().
180: // i.remove();
181: }
182: missingOids.removeAll(oids); // @see above comment
183: syncingContext = new ManagedObjectSyncContext(nodeID, oids,
184: !missingOids.isEmpty(), sink);
185: return syncingContext;
186: }
187:
188: private ManagedObjectSyncContext getMissingRootsSynccontext(
189: Sink sink) {
190: missingOids.removeAll(this .missingRoots.values());
191: syncingContext = new ManagedObjectSyncContext(nodeID,
192: new HashMap(this .missingRoots), !missingOids
193: .isEmpty(), sink);
194: this .missingRoots.clear();
195: return syncingContext;
196: }
197:
198: private boolean isRootsMissing() {
199: return !this .missingRoots.isEmpty();
200: }
201:
202: private int computeDiff() {
203: this .missingOids = objectManager.getAllObjectIDs();
204: this .missingRoots = objectManager.getRootNamesToIDsMap();
205: int objectCount = missingOids.size();
206: Set missingHere = new HashSet();
207: for (Iterator i = existingOids.iterator(); i.hasNext();) {
208: Object o = i.next();
209: if (!missingOids.remove(o)) {
210: missingHere.add(o);
211: }
212: }
213: existingOids = null; // Let GC work for us
214: missingRoots.values().retainAll(this .missingOids);
215: logger.info(nodeID + " : is missing " + missingOids.size()
216: + " out of " + objectCount
217: + " objects of which missing roots = "
218: + missingRoots.size());
219: if (!missingHere.isEmpty()) {
220: // XXX:: This is possible because some message (Transaction message with new object creation or object delete
221: // message from GC) from previous active reached the other node and not this node and the active crashed
222: logger.warn("Object IDs MISSING HERE : "
223: + missingHere.size() + " : " + missingHere);
224: }
225: int missingCount = missingOids.size();
226: if (missingCount == 0) {
227: state = IN_SYNC;
228: } else {
229: state = SYNC_STARTED;
230: }
231: return missingCount;
232: }
233:
234: public NodeID getNodeID() {
235: return nodeID;
236: }
237:
238: public String toString() {
239: return "L2StateObjectImpl [ "
240: + nodeID
241: + " ] : "
242: + (missingOids != null ? "missing = "
243: + missingOids.size() : "") + " state = "
244: + state;
245: }
246:
247: private void moveToReadyToSyncState() {
248: state = READY_TO_SYNC;
249: int missingObjects = computeDiff();
250: fireMissingObjectsStateEvent(this .nodeID, missingObjects);
251: }
252:
253: private void moveToInSyncState() {
254: state = IN_SYNC;
255: fireObjectSyncCompleteEvent(this.nodeID);
256: }
257: }
258: }
|