001: /*
002: * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright
003: * notice. All rights reserved.
004: */
005: package com.tc.objectserver.handshakemanager;
006:
007: import com.tc.async.api.Sink;
008: import com.tc.async.impl.NullSink;
009: import com.tc.logging.TCLogger;
010: import com.tc.net.groups.ClientID;
011: import com.tc.net.protocol.tcm.ChannelID;
012: import com.tc.net.protocol.transport.ConnectionID;
013: import com.tc.object.lockmanager.api.LockContext;
014: import com.tc.object.lockmanager.api.TryLockContext;
015: import com.tc.object.lockmanager.api.WaitContext;
016: import com.tc.object.msg.ClientHandshakeMessage;
017: import com.tc.object.net.DSOChannelManager;
018: import com.tc.objectserver.l1.api.ClientStateManager;
019: import com.tc.objectserver.lockmanager.api.LockManager;
020: import com.tc.objectserver.tx.ServerTransactionManager;
021: import com.tc.util.SequenceValidator;
022: import com.tc.util.TCTimer;
023: import com.tc.util.sequence.ObjectIDSequence;
024:
025: import java.util.Collections;
026: import java.util.HashSet;
027: import java.util.Iterator;
028: import java.util.Set;
029: import java.util.TimerTask;
030:
031: public class ServerClientHandshakeManager {
032:
033: private static final State INIT = new State("INIT");
034: private static final State STARTING = new State("STARTING");
035: private static final State STARTED = new State("STARTED");
036: private static final int BATCH_SEQUENCE_SIZE = 10000;
037:
038: public static final Sink NULL_SINK = new NullSink();
039:
040: private State state = INIT;
041:
042: private final TCTimer timer;
043: private final ReconnectTimerTask reconnectTimerTask;
044: private final ClientStateManager clientStateManager;
045: private final LockManager lockManager;
046: private final Sink lockResponseSink;
047: private final long reconnectTimeout;
048: private final DSOChannelManager channelManager;
049: private final TCLogger logger;
050: private final SequenceValidator sequenceValidator;
051: private final Set existingUnconnectedClients = new HashSet();
052: private final ObjectIDSequence oidSequence;
053: private final Set clientsRequestingObjectIDSequence = new HashSet();
054: private final boolean persistent;
055: private final ServerTransactionManager transactionManager;
056: private final TCLogger consoleLogger;
057:
058: public ServerClientHandshakeManager(TCLogger logger,
059: DSOChannelManager channelManager,
060: ServerTransactionManager transactionManager,
061: SequenceValidator sequenceValidator,
062: ClientStateManager clientStateManager,
063: LockManager lockManager, Sink lockResponseSink,
064: ObjectIDSequence oidSequence, TCTimer timer,
065: long reconnectTimeout, boolean persistent,
066: TCLogger consoleLogger) {
067: this .logger = logger;
068: this .channelManager = channelManager;
069: this .transactionManager = transactionManager;
070: this .sequenceValidator = sequenceValidator;
071: this .clientStateManager = clientStateManager;
072: this .lockManager = lockManager;
073: this .lockResponseSink = lockResponseSink;
074: this .oidSequence = oidSequence;
075: this .reconnectTimeout = reconnectTimeout;
076: this .timer = timer;
077: this .persistent = persistent;
078: this .consoleLogger = consoleLogger;
079: this .reconnectTimerTask = new ReconnectTimerTask(this , timer);
080: }
081:
082: public synchronized boolean isStarting() {
083: return state == STARTING;
084: }
085:
086: public synchronized boolean isStarted() {
087: return state == STARTED;
088: }
089:
090: public void notifyClientConnect(ClientHandshakeMessage handshake)
091: throws ClientHandshakeException {
092: ClientID clientID = handshake.getClientID();
093: logger.info("Client connected " + clientID);
094: synchronized (this ) {
095: logger.debug("Handling client handshake...");
096: clientStateManager.startupNode(clientID);
097: if (state == STARTED) {
098: if (handshake.getObjectIDs().size() > 0) {
099: //
100: throw new ClientHandshakeException(
101: "Clients connected after startup should have no existing object references.");
102: }
103: if (handshake.getWaitContexts().size() > 0) {
104: //
105: throw new ClientHandshakeException(
106: "Clients connected after startup should have no existing wait contexts.");
107: }
108: if (!handshake.getResentTransactionIDs().isEmpty()) {
109: //
110: throw new ClientHandshakeException(
111: "Clients connected after startup should not resend transactions.");
112: }
113: if (handshake.isObjectIDsRequested()) {
114: logger.debug("Client " + clientID
115: + " requested Object ID Sequences ");
116: clientsRequestingObjectIDSequence.add(clientID);
117: }
118: // XXX: It would be better to not have two different code paths that both call sendAckMessageFor(..)
119: sendAckMessageFor(clientID);
120: return;
121: }
122:
123: if (state == STARTING) {
124: channelManager.makeChannelActiveNoAck(handshake
125: .getChannel());
126: transactionManager.setResentTransactionIDs(clientID,
127: handshake.getResentTransactionIDs());
128: }
129:
130: this .sequenceValidator.initSequence(clientID, handshake
131: .getTransactionSequenceIDs());
132:
133: clientStateManager.addReferences(clientID, handshake
134: .getObjectIDs());
135:
136: for (Iterator i = handshake.getLockContexts().iterator(); i
137: .hasNext();) {
138: LockContext ctxt = (LockContext) i.next();
139: lockManager.reestablishLock(ctxt.getLockID(), ctxt
140: .getNodeID(), ctxt.getThreadID(), ctxt
141: .getLockLevel(), lockResponseSink);
142: }
143:
144: for (Iterator i = handshake.getWaitContexts().iterator(); i
145: .hasNext();) {
146: WaitContext ctxt = (WaitContext) i.next();
147: lockManager.reestablishWait(ctxt.getLockID(), ctxt
148: .getNodeID(), ctxt.getThreadID(), ctxt
149: .getLockLevel(), ctxt.getWaitInvocation(),
150: lockResponseSink);
151: }
152:
153: for (Iterator i = handshake.getPendingLockContexts()
154: .iterator(); i.hasNext();) {
155: LockContext ctxt = (LockContext) i.next();
156: lockManager.requestLock(ctxt.getLockID(), ctxt
157: .getNodeID(), ctxt.getThreadID(), ctxt
158: .getLockLevel(), lockResponseSink);
159: }
160:
161: for (Iterator i = handshake.getPendingTryLockContexts()
162: .iterator(); i.hasNext();) {
163: TryLockContext ctxt = (TryLockContext) i.next();
164: lockManager.tryRequestLock(ctxt.getLockID(), ctxt
165: .getNodeID(), ctxt.getThreadID(), ctxt
166: .getLockLevel(), ctxt.getWaitInvocation(),
167: lockResponseSink);
168: }
169:
170: if (handshake.isObjectIDsRequested()) {
171: logger.debug("Client " + clientID
172: + " requested Object ID Sequences ");
173: clientsRequestingObjectIDSequence.add(clientID);
174: }
175:
176: if (state == STARTING) {
177: logger.debug("Removing client " + clientID
178: + " from set of existing unconnected clients.");
179: existingUnconnectedClients.remove(clientID);
180: if (existingUnconnectedClients.isEmpty()) {
181: logger.debug("Last existing unconnected client ("
182: + clientID
183: + ") now connected. Cancelling timer");
184: timer.cancel();
185: start();
186: }
187: } else {
188: sendAckMessageFor(clientID);
189: }
190: }
191: }
192:
193: private void sendAckMessageFor(ClientID clientID) {
194: logger
195: .debug("Sending handshake acknowledgement to "
196: + clientID);
197:
198: final long startIDs;
199: final long endIDs;
200: if (clientsRequestingObjectIDSequence.remove(clientID)) {
201: final long ids = oidSequence
202: .nextObjectIDBatch(BATCH_SEQUENCE_SIZE);
203: logger.debug("Giving out Object ID Sequences to "
204: + clientID + " from " + ids + " to "
205: + (ids + BATCH_SEQUENCE_SIZE));
206:
207: startIDs = ids;
208: endIDs = ids + BATCH_SEQUENCE_SIZE;
209: } else {
210: startIDs = endIDs = 0;
211: }
212:
213: // NOTE: handshake ack message initialize()/send() must be done atomically with making the channel active
214: // and is thus done inside this channel manager call
215: channelManager.makeChannelActive(clientID, startIDs, endIDs,
216: persistent);
217: }
218:
219: public synchronized void notifyTimeout() {
220: assertNotStarted();
221: logger
222: .info("Reconnect window closing. Killing any previously connected clients that failed to connect in time: "
223: + existingUnconnectedClients);
224: this .channelManager.closeAll(existingUnconnectedClients);
225: for (Iterator i = existingUnconnectedClients.iterator(); i
226: .hasNext();) {
227: ClientID deadClient = (ClientID) i.next();
228: this .clientStateManager.shutdownNode(deadClient);
229: i.remove();
230: }
231: logger
232: .info("Reconnect window closed. All dead clients removed.");
233: start();
234: }
235:
236: // Should be called from within the sync block
237: private void start() {
238: logger.info("Starting DSO services...");
239: lockManager.start();
240: Set cids = Collections.unmodifiableSet(channelManager
241: .getAllClientIDs());
242: transactionManager.start(cids);
243: for (Iterator i = cids.iterator(); i.hasNext();) {
244: ClientID clientID = (ClientID) i.next();
245: sendAckMessageFor(clientID);
246: }
247: state = STARTED;
248: }
249:
250: public synchronized void setStarting(Set existingConnections) {
251: assertInit();
252: state = STARTING;
253: if (existingConnections.isEmpty()) {
254: start();
255: } else {
256: for (Iterator i = existingConnections.iterator(); i
257: .hasNext();) {
258: existingUnconnectedClients.add(channelManager
259: .getClientIDFor(new ChannelID(((ConnectionID) i
260: .next()).getChannelID())));
261: }
262:
263: consoleLogger.info("Starting reconnect window: "
264: + this .reconnectTimeout + " ms.");
265: timer.schedule(reconnectTimerTask, this .reconnectTimeout);
266: }
267: }
268:
269: private void assertInit() {
270: if (state != INIT)
271: throw new AssertionError("Should be in STARTING state: "
272: + state);
273: }
274:
275: private void assertNotStarted() {
276: if (state == STARTED)
277: throw new AssertionError(
278: "In STARTING state, but shouldn't be.");
279: }
280:
281: /**
282: * Notifies handshake manager that the reconnect time has passed.
283: *
284: * @author orion
285: */
286: private static class ReconnectTimerTask extends TimerTask {
287:
288: private final TCTimer timer;
289: private final ServerClientHandshakeManager handshakeManager;
290:
291: private ReconnectTimerTask(
292: ServerClientHandshakeManager handshakeManager,
293: TCTimer timer) {
294: this .handshakeManager = handshakeManager;
295: this .timer = timer;
296: }
297:
298: public void run() {
299: timer.cancel();
300: handshakeManager.notifyTimeout();
301: }
302:
303: }
304:
305: private static class State {
306: private final String name;
307:
308: private State(String name) {
309: this .name = name;
310: }
311:
312: public String toString() {
313: return getClass().getName() + "[" + name + "]";
314: }
315: }
316:
317: }
|