001: /**
002: * Sequoia: Database clustering technology.
003: * Copyright (C) 2002-2004 French National Institute For Research In Computer
004: * Science And Control (INRIA).
005: * Copyright (C) 2005 AmicoSoft, Inc. dba Emic Networks
006: * Contact: sequoia@continuent.org
007: *
008: * Licensed under the Apache License, Version 2.0 (the "License");
009: * you may not use this file except in compliance with the License.
010: * You may obtain a copy of the License at
011: *
012: * http://www.apache.org/licenses/LICENSE-2.0
013: *
014: * Unless required by applicable law or agreed to in writing, software
015: * distributed under the License is distributed on an "AS IS" BASIS,
016: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017: * See the License for the specific language governing permissions and
018: * limitations under the License.
019: *
020: * Initial developer(s): Emmanuel Cecchet.
021: * Contributor(s): ______________________.
022: */package org.continuent.sequoia.controller.virtualdatabase.protocol;
023:
024: import java.io.Serializable;
025: import java.sql.SQLException;
026: import java.util.LinkedList;
027:
028: import org.continuent.sequoia.common.exceptions.NoMoreBackendException;
029: import org.continuent.sequoia.common.i18n.Translate;
030: import org.continuent.sequoia.common.log.Trace;
031: import org.continuent.sequoia.controller.requestmanager.TransactionMetaData;
032: import org.continuent.sequoia.controller.requestmanager.distributed.DistributedRequestManager;
033: import org.continuent.sequoia.controller.requests.AbstractRequest;
034: import org.continuent.sequoia.controller.requests.UnknownWriteRequest;
035: import org.continuent.sequoia.controller.virtualdatabase.VirtualDatabaseWorkerThread;
036:
037: /**
038: * Execute a distributed abort.
039: *
040: * @author <a href="mailto:emmanuel.cecchet@emicnetworks.com">Emmanuel Cecchet
041: * </a>
042: * @version 1.0
043: */
044: public class DistributedAbort extends DistributedTransactionMarker {
045: private static final long serialVersionUID = -8954391235872189513L;
046:
047: private transient Long tid;
048: // Login that rollbacks the transaction. This is used in case the remote
049: // controller has to log the commit but didn't see the begin in which case it
050: // will not be able to retrieve the transaction marker metadata
051: private String login;
052:
053: private DistributedRollback totalOrderAbort;
054:
055: /**
056: * Creates a new <code>Abort</code> message.
057: *
058: * @param login login that rollback the transaction
059: * @param transactionId id of the transaction to commit
060: */
061: public DistributedAbort(String login, long transactionId) {
062: super (transactionId);
063: this .login = login;
064: }
065:
066: /**
067: * @see org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedTransactionMarker#scheduleCommand(org.continuent.sequoia.controller.requestmanager.distributed.DistributedRequestManager)
068: */
069: public Object scheduleCommand(DistributedRequestManager drm)
070: throws SQLException {
071: LinkedList totalOrderQueue = drm.getVirtualDatabase()
072: .getTotalOrderQueue();
073: if (totalOrderQueue != null) {
074: synchronized (totalOrderQueue) {
075: totalOrderAbort = new DistributedRollback(login,
076: transactionId);
077: totalOrderQueue.addLast(totalOrderAbort);
078: }
079: }
080: return totalOrderAbort;
081: }
082:
083: /**
084: * Execution of a distributed rollback command on the specified
085: * <code>DistributedRequestManager</code>
086: *
087: * @param drm the DistributedRequestManager that will execute the rollback
088: * @return Boolean.TRUE if everything went fine or a SQLException if an error
089: * occured
090: * @throws SQLException if an error occurs
091: */
092: public Serializable executeCommand(DistributedRequestManager drm)
093: throws SQLException {
094: Trace logger = drm.getLogger();
095:
096: boolean transactionStartedOnThisController = true;
097: tid = new Long(transactionId);
098: TransactionMetaData tm;
099: try {
100: tm = drm.getTransactionMetaData(tid);
101: } catch (SQLException ignore) {
102: // The transaction was started before the controller joined the
103: // cluster, build a fake tm so that we will be able to log it.
104: transactionStartedOnThisController = false;
105: tm = new TransactionMetaData(transactionId, 0, login,
106: false, 0);
107: }
108:
109: VirtualDatabaseWorkerThread vdbwt = drm.getVirtualDatabase()
110: .getVirtualDatabaseWorkerThreadForTransaction(
111: transactionId);
112: if (vdbwt != null)
113: vdbwt.notifyAbort(transactionId);
114:
115: boolean hasBeenScheduled = false;
116: try {
117: if (transactionStartedOnThisController) { // Rollback would fail on a fake tm so skip it
118: drm.getScheduler().rollback(tm, totalOrderAbort);
119: hasBeenScheduled = true;
120: }
121:
122: if (logger.isDebugEnabled())
123: logger
124: .debug(Translate.get("transaction.aborting",
125: tid));
126:
127: // Send to load balancer
128: drm.getLoadBalancer().abort(tm);
129:
130: // Update recovery log
131: drm.getRecoveryLog().logRequestCompletion(tm.getLogId(),
132: true, 0);
133:
134: // Invalidate the query result cache if this transaction has updated the
135: // cache or altered the schema
136: if ((drm.getResultCache() != null)
137: && (tm.altersQueryResultCache() || tm
138: .altersDatabaseSchema()))
139: drm.getResultCache().rollback(transactionId);
140:
141: // Check for schema modifications that need to be rollbacked
142: if (tm.altersDatabaseSchema()) {
143: if (drm.getMetadataCache() != null)
144: drm.getMetadataCache().flushCache();
145: drm.setSchemaIsDirty(true);
146: }
147:
148: if (hasBeenScheduled) // skipped if tm was fake
149: {
150: // Notify scheduler for completion
151: drm.getScheduler().rollbackCompleted(tm, true);
152: }
153: } catch (NoMoreBackendException e) {
154: if (logger.isDebugEnabled())
155: logger
156: .debug(Translate
157: .get(
158: "virtualdatabase.distributed.abort.logging.only",
159: transactionId));
160:
161: addAbortFailureOnAllBackends(drm, hasBeenScheduled, tm);
162: throw e;
163: } catch (SQLException e) {
164: logger
165: .warn(
166: Translate
167: .get("virtualdatabase.distributed.abort.sqlexception"),
168: e);
169: addAbortFailureOnAllBackends(drm, hasBeenScheduled, tm);
170: return e;
171: } catch (RuntimeException re) {
172: logger
173: .warn(
174: Translate
175: .get("virtualdatabase.distributed.rollback.exception"),
176: re);
177: addAbortFailureOnAllBackends(drm, hasBeenScheduled, tm);
178: throw new SQLException(re.getMessage());
179: } finally {
180: if (!hasBeenScheduled) {
181: // Query has not been scheduled, we have to remove the entry from the
182: // total order queue (wait to be sure that we do it in the proper order)
183: if (drm.getLoadBalancer().waitForTotalOrder(
184: totalOrderAbort, false))
185: drm.getLoadBalancer()
186: .removeObjectFromAndNotifyTotalOrderQueue(
187: totalOrderAbort);
188: }
189:
190: if (transactionStartedOnThisController) {
191: drm.completeTransaction(tid);
192: }
193:
194: if (logger.isDebugEnabled())
195: logger.debug(Translate.get("transaction.aborted", tid));
196: }
197: return Boolean.TRUE;
198: }
199:
200: private void addAbortFailureOnAllBackends(
201: DistributedRequestManager drm, boolean hasBeenScheduled,
202: TransactionMetaData tm) {
203: AbstractRequest abortRequest = new UnknownWriteRequest("abort",
204: false, 0, "\n");
205: abortRequest.setLogin(login);
206: abortRequest.setTransactionId(transactionId);
207: abortRequest.setIsAutoCommit(false);
208: abortRequest.setLogId(tm.getLogId());
209: drm.addFailedOnAllBackends(abortRequest, hasBeenScheduled);
210: }
211:
212: /**
213: * @see java.lang.Object#toString()
214: */
215: public String toString() {
216: return "Abort transaction " + transactionId;
217: }
218: }
|