001: /**
002: * Sequoia: Database clustering technology.
003: * Copyright (C) 2006 Continuent, Inc.
004: * Contact: sequoia@continuent.org
005: *
006: * Licensed under the Apache License, Version 2.0 (the "License");
007: * you may not use this file except in compliance with the License.
008: * You may obtain a copy of the License at
009: *
010: * http://www.apache.org/licenses/LICENSE-2.0
011: *
012: * Unless required by applicable law or agreed to in writing, software
013: * distributed under the License is distributed on an "AS IS" BASIS,
014: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015: * See the License for the specific language governing permissions and
016: * limitations under the License.
017: *
018: * Initial developer(s): Emmanuel Cecchet.
019: * Contributor(s): ______________________.
020: */package org.continuent.sequoia.controller.requestmanager.distributed;
021:
022: import java.sql.SQLException;
023: import java.util.ArrayList;
024: import java.util.Collection;
025: import java.util.ConcurrentModificationException;
026: import java.util.HashMap;
027: import java.util.Hashtable;
028: import java.util.Iterator;
029: import java.util.LinkedList;
030: import java.util.List;
031: import java.util.Map;
032:
033: import org.continuent.hedera.adapters.MulticastRequestAdapter;
034: import org.continuent.sequoia.common.exceptions.VirtualDatabaseException;
035: import org.continuent.sequoia.common.log.Trace;
036: import org.continuent.sequoia.controller.requestmanager.TransactionMetaData;
037: import org.continuent.sequoia.controller.requests.AbstractRequest;
038: import org.continuent.sequoia.controller.virtualdatabase.DistributedVirtualDatabase;
039: import org.continuent.sequoia.controller.virtualdatabase.protocol.FlushGroupCommunicationMessages;
040:
041: /**
042: * This class defines a ControllerFailureCleanupThread
043: *
044: * @author <a href="mailto:emmanuel.cecchet@continuent.com">Emmanuel Cecchet</a>
045: * @version 1.0
046: */
047: public class ControllerFailureCleanupThread extends Thread {
048: private Hashtable cleanupThreadsList;
049: private DistributedVirtualDatabase dvdb;
050: private DistributedRequestManager drm;
051: private long failedControllerId;
052: private long failoverTimeoutInMs;
053: private List persistentConnectionsToRecover;
054: private List transactionsToRecover;
055: private Trace logger = Trace
056: .getLogger("org.continuent.sequoia.controller.requestmanager.cleanup");
057:
058: /**
059: * Creates a new <code>ControllerFailureCleanupThread</code> object
060: *
061: * @param failoverTimeoutInMs time to wait before proceeding to the cleanup
062: * @param failedControllerId id of the controller that failed
063: * @param distributedVirtualDatabase the distributed virtual database on which
064: * we should proceed with the cleanup
065: * @param cleanupThreads list of cleanup threads from which we should remove
066: * ourselves when we terminate
067: */
068: public ControllerFailureCleanupThread(
069: DistributedVirtualDatabase distributedVirtualDatabase,
070: long failedControllerId, long failoverTimeoutInMs,
071: Hashtable cleanupThreads, HashMap writesFlushed) {
072: super ("ControllerFailureCleanupThread for controller "
073: + failedControllerId);
074: this .dvdb = distributedVirtualDatabase;
075: drm = (DistributedRequestManager) dvdb.getRequestManager();
076: this .failedControllerId = failedControllerId;
077: this .failoverTimeoutInMs = failoverTimeoutInMs;
078: this .cleanupThreadsList = cleanupThreads;
079: }
080:
081: /**
082: * {@inheritDoc}
083: *
084: * @see java.lang.Runnable#run()
085: */
086: public void run() {
087: try {
088: doRun();
089: } catch (Throwable t) {
090: logger.fatal("Cleanup failed", t);
091: }
092: }
093:
094: /**
095: * {@inheritDoc}
096: *
097: * @see java.lang.Thread#run()
098: */
099: public void doRun() {
100: Long controllerIdKey = new Long(failedControllerId);
101: try {
102: synchronized (this ) {
103: /*
104: * Ensure that all group communication messages get flushed locally
105: * (especially commit/rollback before building the open transaction
106: * list)
107: */
108: dvdb
109: .flushGroupCommunicationMessagesLocally(failedControllerId);
110:
111: notifyAll();
112: }
113:
114: // Wait for clients to failover
115: try {
116: synchronized (this ) {
117: if (logger.isInfoEnabled())
118: logger.info("Waiting " + failoverTimeoutInMs
119: + "ms for client of controller "
120: + controllerIdKey + " to failover");
121: wait(failoverTimeoutInMs);
122: }
123: } catch (InterruptedException ignore) {
124: }
125:
126: /*
127: * Notify queries that were expecting an answer from the failed controller
128: * that this will never happen.
129: */
130: drm.cleanupAllFailedQueriesFromController(controllerIdKey
131: .longValue());
132:
133: /*
134: * Build list of transactions and persistent connections for which
135: * failover did not happen. Start with pending requests in the scheduler
136: * then look for active resources in the distributed request manager.
137: */
138: transactionsToRecover = parseTransactionMetadataListForControllerId(drm
139: .getScheduler().getActiveTransactions());
140: rollbackInactiveTransactions(controllerIdKey);
141: persistentConnectionsToRecover = parsePersistentConnections(drm
142: .getScheduler().getOpenPersistentConnections());
143: // getTransactionAndPersistentConnectionsFromRequests(drm.getScheduler()
144: // .getActiveReadRequests());
145: getTransactionAndPersistentConnectionsFromRequests(drm
146: .getScheduler().getActiveWriteRequests());
147:
148: // If both lists are empty there is nothing to cleanup
149: if ((transactionsToRecover.isEmpty())
150: && (persistentConnectionsToRecover.isEmpty()))
151: return;
152:
153: /*
154: * Ok, now everything should have been recovered. Cleanup all remaining
155: * objects. The following methods will take care of not cleaning up
156: * resources for which failover was notified through
157: * notifyTransactionFailover or notifyPersistentConnectionFailover.
158: */
159: // abortRemainingTransactions(controllerIdKey);
160: closeRemainingPersistentConnections(controllerIdKey);
161: } finally {
162: logger.info("Cleanup for controller " + failedControllerId
163: + " failure is completed.");
164:
165: // Remove ourselves from the thread list
166: cleanupThreadsList.remove(this );
167: }
168: }
169:
170: /**
171: * Rollback all the transactions for clients that did not reconnect. The
172: * transaction can exist at this point for 3 reasons.
173: * <ul>
174: * <li>The client did not have an active request when the controller failed
175: * <li>The client had an active request that completed after the failure
176: * <li>The active request is blocked by a transaction in the first state.
177: * <ul>
178: * In the first two cases the, it is safe to immediately rollback the
179: * transaction, because there is no current activity. In the third case, we
180: * need to wait until the request complete before doing the rollback. So we go
181: * through the list of transactions needing recovery repeatly rollbacking the
182: * inactive transactions. Rollingback transactions should let the other
183: * pending transactions complete.
184: *
185: * @param controllerIdKey the controller id
186: */
187: private void rollbackInactiveTransactions(Long controllerIdKey) {
188: List transactionsRecovered = dvdb
189: .getTransactionsRecovered(controllerIdKey);
190: Map readRequests = drm.getScheduler().getActiveReadRequests();
191: Map writeRequests = drm.getScheduler().getActiveWriteRequests();
192: while (!transactionsToRecover.isEmpty()) {
193: int waitingForCompletion = 0;
194: // Iterate on the list of active transactions (based on scheduler
195: // knowledge) that were started by the failed controller.
196: for (Iterator iter = transactionsToRecover.iterator(); iter
197: .hasNext();) {
198: Long lTid = (Long) iter.next();
199:
200: if ((transactionsRecovered == null)
201: || !transactionsRecovered.contains(lTid)) {
202: if (!hasRequestForTransaction(lTid.longValue(),
203: readRequests)
204: && !hasRequestForTransaction(lTid
205: .longValue(), writeRequests)) {
206: if (logger.isInfoEnabled())
207: logger
208: .info("Rollingback transaction "
209: + lTid
210: + " started by dead controller "
211: + failedControllerId
212: + " since client did not ask for failover");
213:
214: try {
215: boolean logRollback = dvdb.getRecoveryLog()
216: .hasLoggedBeginForTransaction(lTid);
217: dvdb
218: .rollback(lTid.longValue(),
219: logRollback);
220: } catch (SQLException e) {
221: logger
222: .error(
223: "Failed to rollback transaction "
224: + lTid
225: + " started by dead controller "
226: + failedControllerId,
227: e);
228: } catch (VirtualDatabaseException e) {
229: logger
230: .error(
231: "Failed to rollback transaction "
232: + lTid
233: + " started by dead controller "
234: + failedControllerId,
235: e);
236: }
237:
238: iter.remove();
239: } else {
240: waitingForCompletion++;
241: if (logger.isDebugEnabled())
242: logger
243: .debug("Waiting for activity to complete for "
244: + lTid
245: + " started by dead controller "
246: + failedControllerId
247: + " since client did not ask for failover");
248: }
249: }
250: }
251:
252: if (waitingForCompletion == 0)
253: break;
254: try {
255: synchronized (writeRequests) {
256: if (!writeRequests.isEmpty()) {
257: writeRequests.wait(500);
258: continue;
259: }
260: }
261: synchronized (readRequests) {
262: if (!readRequests.isEmpty()) {
263: readRequests.wait(500);
264: continue;
265: }
266: }
267: } catch (InterruptedException e) {
268: }
269: } // while (!transactionsToRecover.isEmpty())
270: }
271:
272: /**
273: * Returns true if the given map contains an AbstractRequest value that
274: * belongs to the given transaction.
275: *
276: * @param transactionId transaction id to look for
277: * @param map map of Long(transaction id) -> AbstractRequest
278: * @return true if a request in the map matches the transaction id
279: */
280: private boolean hasRequestForTransaction(long transactionId, Map map) {
281: synchronized (map) {
282: for (Iterator iter = map.values().iterator(); iter
283: .hasNext();) {
284: AbstractRequest request = (AbstractRequest) iter.next();
285: if (transactionId == request.getTransactionId())
286: return true;
287: }
288: }
289:
290: return false;
291: }
292:
293: /**
294: * Shutdown this thread and wait for its completion (the thread will try to
295: * end asap and skip work if possible).
296: */
297: public synchronized void shutdown() {
298: notifyAll();
299: try {
300: this .join();
301: } catch (InterruptedException e) {
302: logger
303: .warn("Controller cleanup thread may not have completed before it was terminated");
304: }
305: }
306:
307: private void closeRemainingPersistentConnections(Long controllerId) {
308: List persistentConnectionsRecovered = dvdb
309: .getControllerPersistentConnectionsRecovered(controllerId);
310: for (Iterator iter = persistentConnectionsToRecover.iterator(); iter
311: .hasNext();) {
312: Long lConnectionId = (Long) iter.next();
313:
314: if ((persistentConnectionsRecovered == null)
315: || !persistentConnectionsRecovered
316: .contains(lConnectionId)) {
317: if (logger.isInfoEnabled())
318: logger.info("Closing persistent connection "
319: + lConnectionId
320: + " started by dead controller "
321: + failedControllerId
322: + " since client did not ask for failover");
323:
324: drm.closePersistentConnection(lConnectionId);
325: }
326: }
327: }
328:
329: /**
330: * Update the transactionsToRecover and persistentConnectionsToRecover lists
331: * with the requests found in the HashMap that are matching our failed
332: * controller id.
333: *
334: * @param map the map to parse
335: */
336: private void getTransactionAndPersistentConnectionsFromRequests(
337: Map map) {
338: synchronized (map) {
339: for (Iterator iter = map.keySet().iterator(); iter
340: .hasNext();) {
341: Long lTid = (Long) iter.next();
342: if ((lTid.longValue() & DistributedRequestManager.CONTROLLER_ID_BIT_MASK) == failedControllerId) { // Request id matches the failed controller
343: AbstractRequest request = (AbstractRequest) map
344: .get(lTid);
345: if (!request.isAutoCommit()) {
346: /*
347: * Re-check transaction id in case this was already a failover from
348: * another controller
349: */
350: if ((request.getTransactionId() & DistributedRequestManager.CONTROLLER_ID_BIT_MASK) == failedControllerId) {
351: Long tidLong = new Long(request
352: .getTransactionId());
353: if (!transactionsToRecover
354: .contains(tidLong)) {
355: transactionsToRecover.add(tidLong);
356: }
357: }
358: }
359: if (request.isPersistentConnection()) {
360: /*
361: * Re-check persistent connection id in case this was a failover
362: * from another controller
363: */
364: Long connIdLong = new Long(request
365: .getPersistentConnectionId());
366: if ((request.getPersistentConnectionId() & DistributedRequestManager.CONTROLLER_ID_BIT_MASK) == failedControllerId)
367: if (!persistentConnectionsToRecover
368: .contains(connIdLong)) {
369: persistentConnectionsToRecover
370: .add(connIdLong);
371: }
372: }
373: }
374: }
375: }
376: }
377:
378: private List parsePersistentConnections(Map map) {
379: LinkedList result = new LinkedList();
380: synchronized (map) {
381: for (Iterator iter = map.keySet().iterator(); iter
382: .hasNext();) {
383: Long persistentConnectionId = (Long) iter.next();
384: if ((persistentConnectionId.longValue() & DistributedRequestManager.CONTROLLER_ID_BIT_MASK) == failedControllerId) { // Request id matches the failed controller
385: result.add(persistentConnectionId);
386: }
387: }
388: return result;
389: }
390: }
391:
392: /**
393: * Parse the list containing transaction metadata and return a list containing
394: * all transaction ids matching the controllerId this thread is taking care
395: * of.
396: *
397: * @param list transaction metadata list to parse
398: * @return sublist containing ids matching failedControllerId
399: */
400: private List parseTransactionMetadataListForControllerId(
401: Collection list) {
402: LinkedList result = new LinkedList();
403: synchronized (list) {
404: boolean retry = true;
405: while (retry) {
406: try {
407: for (Iterator iter = list.iterator(); iter
408: .hasNext();) {
409: TransactionMetaData tm = (TransactionMetaData) iter
410: .next();
411: if ((tm.getTransactionId() & DistributedRequestManager.CONTROLLER_ID_BIT_MASK) == failedControllerId)
412: result.addLast(new Long(tm
413: .getTransactionId()));
414: }
415:
416: retry = false;
417: } catch (ConcurrentModificationException e) {
418: // TODO: handle exception
419: }
420: }
421: }
422: return result;
423: }
424:
425: }
|