001: /*
002: * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright
003: * notice. All rights reserved.
004: */
005: package com.tc.net.protocol.transport;
006:
007: import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
008:
009: import com.tc.logging.TCLogger;
010: import com.tc.net.MaxConnectionsExceededException;
011: import com.tc.net.TCSocketAddress;
012: import com.tc.net.core.ConnectionAddressIterator;
013: import com.tc.net.core.ConnectionAddressProvider;
014: import com.tc.net.core.ConnectionInfo;
015: import com.tc.net.core.TCConnection;
016: import com.tc.net.core.TCConnectionManager;
017: import com.tc.util.Assert;
018: import com.tc.util.TCTimeoutException;
019: import com.tc.util.concurrent.NoExceptionLinkedQueue;
020:
021: import java.io.IOException;
022:
023: /**
024: * This guy establishes a connection to the server for the Client.
025: */
026: public class ClientConnectionEstablisher {
027:
028: private static final long CONNECT_RETRY_INTERVAL = 1000;
029:
030: private final String desc;
031: private final int maxReconnectTries;
032: private final int timeout;
033: private final ConnectionAddressProvider connAddressProvider;
034: private final TCConnectionManager connManager;
035:
036: private final SynchronizedBoolean asyncReconnecting = new SynchronizedBoolean(
037: false);
038:
039: private Thread connectionEstablisher;
040:
041: private NoExceptionLinkedQueue reconnectRequest = new NoExceptionLinkedQueue(); // <ConnectionRequest>
042:
043: public ClientConnectionEstablisher(TCConnectionManager connManager,
044: ConnectionAddressProvider connAddressProvider,
045: int maxReconnectTries, int timeout) {
046: this .connManager = connManager;
047: this .connAddressProvider = connAddressProvider;
048: this .maxReconnectTries = maxReconnectTries;
049: this .timeout = timeout;
050:
051: if (maxReconnectTries == 0)
052: desc = "none";
053: else if (maxReconnectTries < 0)
054: desc = "unlimited";
055: else
056: desc = "" + maxReconnectTries;
057:
058: }
059:
060: /**
061: * Blocking open. Causes a connection to be made. Will throw exceptions if the connect fails.
062: *
063: * @throws TCTimeoutException
064: * @throws IOException
065: * @throws TCTimeoutException
066: * @throws MaxConnectionsExceededException
067: */
068: public TCConnection open(ClientMessageTransport cmt)
069: throws TCTimeoutException, IOException {
070: synchronized (asyncReconnecting) {
071: Assert
072: .eval(
073: "Can't call open() while asynch reconnect occurring",
074: !asyncReconnecting.get());
075: return connectTryAllOnce(cmt);
076: }
077: }
078:
079: private TCConnection connectTryAllOnce(ClientMessageTransport cmt)
080: throws TCTimeoutException, IOException {
081: final ConnectionAddressIterator addresses = connAddressProvider
082: .getIterator();
083: TCConnection rv = null;
084: while (addresses.hasNext()) {
085: final ConnectionInfo connInfo = addresses.next();
086: try {
087: final TCSocketAddress csa = new TCSocketAddress(
088: connInfo);
089: rv = connect(csa, cmt);
090: break;
091: } catch (TCTimeoutException e) {
092: if (!addresses.hasNext()) {
093: throw e;
094: }
095: } catch (IOException e) {
096: if (!addresses.hasNext()) {
097: throw e;
098: }
099: }
100: }
101: return rv;
102: }
103:
104: /**
105: * Tries to make a connection. This is a blocking call.
106: *
107: * @return
108: * @throws TCTimeoutException
109: * @throws IOException
110: * @throws MaxConnectionsExceededException
111: */
112: TCConnection connect(TCSocketAddress sa, ClientMessageTransport cmt)
113: throws TCTimeoutException, IOException {
114:
115: TCConnection connection = this .connManager.createConnection(cmt
116: .getProtocolAdapter());
117: cmt.fireTransportConnectAttemptEvent();
118: try {
119: connection.connect(sa, timeout);
120: } catch (IOException e) {
121: connection.close(100);
122: throw e;
123: } catch (TCTimeoutException e) {
124: connection.close(100);
125: throw e;
126: }
127: return connection;
128: }
129:
130: public String toString() {
131: return "ClientConnectionEstablisher[" + connAddressProvider
132: + ", timeout=" + timeout + "]";
133: }
134:
135: private void reconnect(ClientMessageTransport cmt)
136: throws MaxConnectionsExceededException {
137: try {
138:
139: boolean connected = cmt.isConnected();
140: if (connected) {
141: cmt.logger
142: .warn("Got reconnect request for ClientMessageTransport that is connected. skipping");
143: }
144:
145: asyncReconnecting.set(true);
146: for (int i = 0; ((maxReconnectTries < 0) || (i < maxReconnectTries))
147: && !connected; i++) {
148: ConnectionAddressIterator addresses = connAddressProvider
149: .getIterator();
150: while (addresses.hasNext() && !connected) {
151: TCConnection connection = null;
152: final ConnectionInfo connInfo = addresses.next();
153: try {
154: if (i % 20 == 0) {
155: cmt.logger.warn("Reconnect attempt " + i
156: + " of " + desc
157: + " reconnect tries to " + connInfo
158: + ", timeout=" + timeout);
159: }
160: connection = connect(new TCSocketAddress(
161: connInfo), cmt);
162: cmt.reconnect(connection);
163: connected = true;
164: } catch (MaxConnectionsExceededException e) {
165: throw e;
166: } catch (TCTimeoutException e) {
167: handleConnectException(e, false, cmt.logger,
168: connection);
169: } catch (IOException e) {
170: handleConnectException(e, false, cmt.logger,
171: connection);
172: } catch (Exception e) {
173: handleConnectException(e, true, cmt.logger,
174: connection);
175: }
176:
177: }
178: }
179: cmt.endIfDisconnected();
180: } finally {
181: asyncReconnecting.set(false);
182: }
183: }
184:
185: private void restoreConnection(ClientMessageTransport cmt,
186: TCSocketAddress sa, long timeoutMillis,
187: RestoreConnectionCallback callback) {
188: final long deadline = System.currentTimeMillis()
189: + timeoutMillis;
190: boolean connected = cmt.isConnected();
191: if (connected) {
192: cmt.logger
193: .warn("Got restoreConnection request for ClientMessageTransport that is connected. skipping");
194: }
195:
196: asyncReconnecting.set(true);
197: for (int i = 0; !connected; i++) {
198: TCConnection connection = null;
199: try {
200: connection = connect(sa, cmt);
201: cmt.reconnect(connection);
202: connected = true;
203: } catch (MaxConnectionsExceededException e) {
204: // nothing
205: } catch (TCTimeoutException e) {
206: handleConnectException(e, false, cmt.logger, connection);
207: } catch (IOException e) {
208: handleConnectException(e, false, cmt.logger, connection);
209: } catch (Exception e) {
210: handleConnectException(e, true, cmt.logger, connection);
211: }
212: if (connected || System.currentTimeMillis() > deadline) {
213: break;
214: }
215: }
216: asyncReconnecting.set(false);
217: if (!connected) {
218: callback.restoreConnectionFailed(cmt);
219: }
220: }
221:
222: private void handleConnectException(Exception e,
223: boolean logFullException, TCLogger logger,
224: TCConnection connection) {
225: if (connection != null)
226: connection.close(100);
227:
228: if (logger.isDebugEnabled() || logFullException) {
229: logger.error("Connect Exception", e);
230: } else {
231: logger.warn(e.getMessage());
232: }
233: try {
234: Thread.sleep(CONNECT_RETRY_INTERVAL);
235: } catch (InterruptedException e1) {
236: //
237: }
238: }
239:
240: public void asyncReconnect(ClientMessageTransport cmt) {
241: synchronized (asyncReconnecting) {
242: if (asyncReconnecting.get())
243: return;
244: putReconnectRequest(new ConnectionRequest(
245: ConnectionRequest.RECONNECT, cmt));
246: }
247: }
248:
249: public void asyncRestoreConnection(ClientMessageTransport cmt,
250: TCSocketAddress sa, RestoreConnectionCallback callback,
251: long timeoutMillis) {
252: synchronized (asyncReconnecting) {
253: if (asyncReconnecting.get())
254: return;
255: putReconnectRequest(new RestoreConnectionRequest(cmt, sa,
256: callback, timeoutMillis));
257: }
258: }
259:
260: private void putReconnectRequest(ConnectionRequest request) {
261: if (connectionEstablisher == null) {
262: // First time
263: // Allow the async thread reconnects/restores only when cmt was connected atleast once
264: if ((request.getClientMessageTransport() == null)
265: || (!request.getClientMessageTransport()
266: .wasOpened()))
267: return;
268:
269: connectionEstablisher = new Thread(
270: new AsyncReconnect(this ), "ConnectionEstablisher");
271: connectionEstablisher.setDaemon(true);
272: connectionEstablisher.start();
273:
274: }
275:
276: // DEV-1140 : avoiding the race condition
277: // asyncReconnecting.set(true);
278: reconnectRequest.put(request);
279: }
280:
281: public void quitReconnectAttempts() {
282: putReconnectRequest(new ConnectionRequest(
283: ConnectionRequest.QUIT, null));
284: }
285:
286: static class AsyncReconnect implements Runnable {
287: private final ClientConnectionEstablisher cce;
288:
289: public AsyncReconnect(ClientConnectionEstablisher cce) {
290: this .cce = cce;
291: }
292:
293: public void run() {
294: ConnectionRequest request = null;
295: while ((request = (ConnectionRequest) cce.reconnectRequest
296: .take()) != null) {
297: if (request.isReconnect()) {
298: ClientMessageTransport cmt = request
299: .getClientMessageTransport();
300: try {
301: cce.reconnect(cmt);
302: } catch (MaxConnectionsExceededException e) {
303: cmt.logger.warn(e);
304: cmt.logger
305: .warn("No longer trying to reconnect.");
306: return;
307: } catch (Throwable t) {
308: cmt.logger.warn("Reconnect failed !", t);
309: }
310: } else if (request.isRestoreConnection()) {
311: RestoreConnectionRequest req = (RestoreConnectionRequest) request;
312: cce.restoreConnection(req
313: .getClientMessageTransport(), req
314: .getSocketAddress(),
315: req.getTimeoutMillis(), req.getCallback());
316: } else if (request.isQuit()) {
317: break;
318: }
319: }
320: }
321: }
322:
323: static class ConnectionRequest {
324:
325: public static final int RECONNECT = 1;
326: public static final int QUIT = 2;
327: public static final int RESTORE_CONNECTION = 3;
328:
329: private final int type;
330: private final TCSocketAddress sa;
331: private final ClientMessageTransport cmt;
332:
333: public ConnectionRequest(int type, ClientMessageTransport cmt) {
334: this (type, cmt, null);
335: }
336:
337: public ConnectionRequest(final int type,
338: final ClientMessageTransport cmt,
339: final TCSocketAddress sa) {
340: this .type = type;
341: this .cmt = cmt;
342: this .sa = sa;
343: }
344:
345: public boolean isReconnect() {
346: return type == RECONNECT;
347: }
348:
349: public boolean isQuit() {
350: return type == QUIT;
351: }
352:
353: public boolean isRestoreConnection() {
354: return type == RESTORE_CONNECTION;
355: }
356:
357: public TCSocketAddress getSocketAddress() {
358: return sa;
359: }
360:
361: public ClientMessageTransport getClientMessageTransport() {
362: return cmt;
363: }
364: }
365:
366: static class RestoreConnectionRequest extends ConnectionRequest {
367:
368: private final RestoreConnectionCallback callback;
369: private final long timeoutMillis;
370:
371: public RestoreConnectionRequest(ClientMessageTransport cmt,
372: final TCSocketAddress sa,
373: RestoreConnectionCallback callback, long timeoutMillis) {
374: super (RESTORE_CONNECTION, cmt, sa);
375: this .callback = callback;
376: this .timeoutMillis = timeoutMillis;
377: }
378:
379: public RestoreConnectionCallback getCallback() {
380: return callback;
381: }
382:
383: public long getTimeoutMillis() {
384: return timeoutMillis;
385: }
386: }
387: }
|