001: /*
002: * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright
003: * notice. All rights reserved.
004: */
005: package com.tc.object.handshakemanager;
006:
007: import com.tc.async.api.Sink;
008: import com.tc.async.api.Stage;
009: import com.tc.cluster.Cluster;
010: import com.tc.logging.TCLogger;
011: import com.tc.net.protocol.tcm.ChannelEvent;
012: import com.tc.net.protocol.tcm.ChannelEventListener;
013: import com.tc.net.protocol.tcm.ChannelEventType;
014: import com.tc.object.ClientIDProvider;
015: import com.tc.object.ClientObjectManager;
016: import com.tc.object.ObjectID;
017: import com.tc.object.PauseListener;
018: import com.tc.object.RemoteObjectManager;
019: import com.tc.object.context.PauseContext;
020: import com.tc.object.gtx.ClientGlobalTransactionManager;
021: import com.tc.object.lockmanager.api.ClientLockManager;
022: import com.tc.object.lockmanager.api.LockContext;
023: import com.tc.object.lockmanager.api.LockRequest;
024: import com.tc.object.lockmanager.api.TryLockContext;
025: import com.tc.object.lockmanager.api.TryLockRequest;
026: import com.tc.object.lockmanager.api.WaitContext;
027: import com.tc.object.lockmanager.api.WaitLockRequest;
028: import com.tc.object.msg.ClientHandshakeAckMessage;
029: import com.tc.object.msg.ClientHandshakeMessage;
030: import com.tc.object.msg.ClientHandshakeMessageFactory;
031: import com.tc.object.session.SessionManager;
032: import com.tc.object.tx.RemoteTransactionManager;
033: import com.tc.properties.TCPropertiesImpl;
034: import com.tc.util.State;
035: import com.tc.util.Util;
036: import com.tc.util.sequence.BatchSequenceReceiver;
037:
038: import java.util.Collection;
039: import java.util.HashSet;
040: import java.util.Iterator;
041:
042: public class ClientHandshakeManager implements ChannelEventListener {
043: private static final State PAUSED = new State("PAUSED");
044: private static final State STARTING = new State("STARTING");
045: private static final State RUNNING = new State("RUNNING");
046:
047: private final ClientObjectManager objectManager;
048: private final ClientLockManager lockManager;
049: private final ClientIDProvider cidp;
050: private final ClientHandshakeMessageFactory chmf;
051: private final RemoteObjectManager remoteObjectManager;
052: private final ClientGlobalTransactionManager gtxManager;
053: private final TCLogger logger;
054: private final Collection stagesToPauseOnDisconnect;
055: private final Sink pauseSink;
056: private final SessionManager sessionManager;
057: private final PauseListener pauseListener;
058: private final BatchSequenceReceiver sequenceReceiver;
059: private final Cluster cluster;
060: private final String clientVersion;
061:
062: private State state = PAUSED;
063: private boolean stagesPaused = false;
064: private boolean serverIsPersistent = false;
065:
066: public ClientHandshakeManager(TCLogger logger,
067: ClientIDProvider clientIDProvider,
068: ClientHandshakeMessageFactory chmf,
069: ClientObjectManager objectManager,
070: RemoteObjectManager remoteObjectManager,
071: ClientLockManager lockManager,
072: RemoteTransactionManager remoteTransactionManager,
073: ClientGlobalTransactionManager gtxManager,
074: Collection stagesToPauseOnDisconnect, Sink pauseSink,
075: SessionManager sessionManager, PauseListener pauseListener,
076: BatchSequenceReceiver sequenceReceiver, Cluster cluster,
077: String clientVersion) {
078: this .logger = logger;
079: this .cidp = clientIDProvider;
080: this .chmf = chmf;
081: this .objectManager = objectManager;
082: this .remoteObjectManager = remoteObjectManager;
083: this .lockManager = lockManager;
084: this .gtxManager = gtxManager;
085: this .stagesToPauseOnDisconnect = stagesToPauseOnDisconnect;
086: this .pauseSink = pauseSink;
087: this .sessionManager = sessionManager;
088: this .pauseListener = pauseListener;
089: this .sequenceReceiver = sequenceReceiver;
090: this .cluster = cluster;
091: this .clientVersion = clientVersion;
092: pauseManagers();
093: }
094:
095: public void initiateHandshake() {
096: logger.debug("Initiating handshake...");
097: changeState(STARTING);
098: notifyManagersStarting();
099:
100: ClientHandshakeMessage handshakeMessage = chmf
101: .newClientHandshakeMessage();
102:
103: handshakeMessage.setClientVersion(clientVersion);
104:
105: handshakeMessage.setTransactionSequenceIDs(gtxManager
106: .getTransactionSequenceIDs());
107: handshakeMessage.setResentTransactionIDs(gtxManager
108: .getResentTransactionIDs());
109:
110: logger.debug("Getting object ids...");
111: for (Iterator i = objectManager.getAllObjectIDsAndClear(
112: new HashSet()).iterator(); i.hasNext();) {
113: handshakeMessage.addObjectID((ObjectID) i.next());
114: }
115:
116: logger.debug("Getting lock holders...");
117: for (Iterator i = lockManager.addAllHeldLocksTo(new HashSet())
118: .iterator(); i.hasNext();) {
119: LockRequest request = (LockRequest) i.next();
120: LockContext ctxt = new LockContext(request.lockID(), cidp
121: .getClientID(), request.threadID(), request
122: .lockLevel());
123: handshakeMessage.addLockContext(ctxt);
124: }
125:
126: logger.debug("Getting lock waiters...");
127: for (Iterator i = lockManager.addAllWaitersTo(new HashSet())
128: .iterator(); i.hasNext();) {
129: WaitLockRequest request = (WaitLockRequest) i.next();
130: WaitContext ctxt = new WaitContext(request.lockID(), cidp
131: .getClientID(), request.threadID(), request
132: .lockLevel(), request.getWaitInvocation());
133: handshakeMessage.addWaitContext(ctxt);
134: }
135:
136: logger.debug("Getting pending lock requests...");
137: for (Iterator i = lockManager.addAllPendingLockRequestsTo(
138: new HashSet()).iterator(); i.hasNext();) {
139: LockRequest request = (LockRequest) i.next();
140: LockContext ctxt = new LockContext(request.lockID(), cidp
141: .getClientID(), request.threadID(), request
142: .lockLevel());
143: handshakeMessage.addPendingLockContext(ctxt);
144: }
145:
146: logger.debug("Getting pending tryLock requests...");
147: for (Iterator i = lockManager.addAllPendingTryLockRequestsTo(
148: new HashSet()).iterator(); i.hasNext();) {
149: TryLockRequest request = (TryLockRequest) i.next();
150: LockContext ctxt = new TryLockContext(request.lockID(),
151: cidp.getClientID(), request.threadID(), request
152: .lockLevel(), request.getWaitInvocation());
153: handshakeMessage.addPendingTryLockContext(ctxt);
154: }
155:
156: logger
157: .debug("Checking to see if is object ids sequence is needed ...");
158: handshakeMessage.setIsObjectIDsRequested(!sequenceReceiver
159: .hasNext());
160:
161: logger.debug("Sending handshake message...");
162: handshakeMessage.send();
163: }
164:
165: public void notifyChannelEvent(ChannelEvent event) {
166: if (event.getType() == ChannelEventType.TRANSPORT_DISCONNECTED_EVENT) {
167: cluster.this NodeDisconnected();
168: pauseSink.add(PauseContext.PAUSE);
169: } else if (event.getType() == ChannelEventType.TRANSPORT_CONNECTED_EVENT) {
170: pauseSink.add(PauseContext.UNPAUSE);
171: } else if (event.getType() == ChannelEventType.CHANNEL_CLOSED_EVENT) {
172: cluster.this NodeDisconnected();
173: }
174: }
175:
176: public void pause() {
177: logger.info("Pause " + getState());
178: if (getState() == PAUSED) {
179: logger.warn("pause called while already PAUSED");
180: // ClientMessageChannel moves to next SessionID, need to move to newSession here too.
181: } else {
182: pauseStages();
183: pauseManagers();
184: changeState(PAUSED);
185: }
186: // all the activities paused then can switch to new session
187: sessionManager.newSession();
188: logger
189: .info("ClientHandshakeManager moves to "
190: + sessionManager);
191: }
192:
193: public void unpause() {
194: logger.info("Unpause " + getState());
195: if (getState() != PAUSED) {
196: logger.warn("unpause called while not PAUSED: "
197: + getState());
198: return;
199: }
200: unpauseStages();
201: initiateHandshake();
202: }
203:
204: public void acknowledgeHandshake(
205: ClientHandshakeAckMessage handshakeAck) {
206: acknowledgeHandshake(handshakeAck.getObjectIDSequenceStart(),
207: handshakeAck.getObjectIDSequenceEnd(), handshakeAck
208: .getPersistentServer(), handshakeAck
209: .getThisNodeId(), handshakeAck.getAllNodes(),
210: handshakeAck.getServerVersion());
211: }
212:
213: protected void acknowledgeHandshake(long objectIDStart,
214: long objectIDEnd, boolean persistentServer,
215: String this NodeId, String[] clusterMembers,
216: String serverVersion) {
217: if (getState() != STARTING) {
218: logger.warn("Handshake acknowledged while not STARTING: "
219: + getState());
220: return;
221: }
222:
223: final boolean checkVersionMatches = TCPropertiesImpl
224: .getProperties().getBoolean(
225: "l1.connect.versionMatchCheck.enabled");
226: if (checkVersionMatches) {
227: checkClientServerVersionMatch(logger, clientVersion,
228: serverVersion);
229: }
230:
231: this .serverIsPersistent = persistentServer;
232:
233: cluster.this NodeConnected(this NodeId, clusterMembers);
234:
235: if (objectIDStart < objectIDEnd) {
236: logger.debug("Setting the ObjectID sequence to: "
237: + objectIDStart + " , " + objectIDEnd);
238: sequenceReceiver.setNextBatch(objectIDStart, objectIDEnd);
239: }
240:
241: logger.debug("Re-requesting outstanding object requests...");
242: remoteObjectManager.requestOutstanding();
243:
244: logger
245: .debug("Handshake acknowledged. Resending incomplete transactions...");
246: gtxManager.resendOutstandingAndUnpause();
247: unpauseManagers();
248:
249: changeState(RUNNING);
250: }
251:
252: protected static void checkClientServerVersionMatch(
253: TCLogger logger, String clientVersion, String serverVersion) {
254: if (!clientVersion.equals(serverVersion)) {
255: final String msg = "Client/Server Version Mismatch Error: Client Version: "
256: + clientVersion
257: + ", Server Version: "
258: + serverVersion + ". Terminating client now.";
259: throw new RuntimeException(msg);
260: }
261: }
262:
263: private void pauseManagers() {
264: lockManager.pause();
265: objectManager.pause();
266: remoteObjectManager.pause();
267: gtxManager.pause();
268: pauseListener.notifyPause();
269: }
270:
271: private void notifyManagersStarting() {
272: lockManager.starting();
273: objectManager.starting();
274: remoteObjectManager.starting();
275: gtxManager.starting();
276: }
277:
278: // XXX:: Note that gtxmanager is actually unpaused outside this method as it
279: // has to resend transactions and unpause in a single step.
280: private void unpauseManagers() {
281: lockManager.unpause();
282: objectManager.unpause();
283: remoteObjectManager.unpause();
284: pauseListener.notifyUnpause();
285: }
286:
287: private void pauseStages() {
288: if (!stagesPaused) {
289: logger.debug("Pausing stages...");
290: for (Iterator i = stagesToPauseOnDisconnect.iterator(); i
291: .hasNext();) {
292: ((Stage) i.next()).pause();
293: }
294: stagesPaused = true;
295: } else {
296: logger
297: .debug("pauseStages(): Stages are paused; not pausing stages.");
298: }
299: }
300:
301: private void unpauseStages() {
302: if (stagesPaused) {
303: logger.debug("Unpausing stages...");
304: for (Iterator i = stagesToPauseOnDisconnect.iterator(); i
305: .hasNext();) {
306: ((Stage) i.next()).unpause();
307: }
308: stagesPaused = false;
309: } else {
310: logger
311: .debug("unpauseStages(): Stages not paused; not unpausing stages.");
312: }
313: }
314:
315: /**
316: *
317: */
318: public boolean serverIsPersistent() {
319: return this .serverIsPersistent;
320: }
321:
322: public synchronized void waitForHandshake() {
323: boolean isInterrupted = false;
324: while (state != RUNNING) {
325: try {
326: wait();
327: } catch (InterruptedException e) {
328: logger.error("Interrupted while waiting for handshake");
329: isInterrupted = true;
330: }
331: }
332: Util.selfInterruptIfNeeded(isInterrupted);
333: }
334:
335: private synchronized void changeState(State newState) {
336: state = newState;
337: notifyAll();
338: }
339:
340: private synchronized State getState() {
341: return state;
342: }
343:
344: }
|