001: /**
002: * Sequoia: Database clustering technology.
003: * Copyright (C) 2002-2004 French National Institute For Research In Computer
004: * Science And Control (INRIA).
005: * Copyright (C) 2005 AmicoSoft, Inc. dba Emic Networks
006: * Copyright (C) 2005-2006 Continuent, Inc.
007: * Contact: sequoia@continuent.org
008: *
009: * Licensed under the Apache License, Version 2.0 (the "License");
010: * you may not use this file except in compliance with the License.
011: * You may obtain a copy of the License at
012: *
013: * http://www.apache.org/licenses/LICENSE-2.0
014: *
015: * Unless required by applicable law or agreed to in writing, software
016: * distributed under the License is distributed on an "AS IS" BASIS,
017: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018: * See the License for the specific language governing permissions and
019: * limitations under the License.
020: *
021: * Initial developer(s): Emmanuel Cecchet.
022: * Contributor(s): ______________________.
023: */package org.continuent.sequoia.controller.virtualdatabase.protocol;
024:
025: import java.io.Serializable;
026: import java.sql.SQLException;
027: import java.util.LinkedList;
028:
029: import org.continuent.sequoia.common.exceptions.NoMoreBackendException;
030: import org.continuent.sequoia.common.i18n.Translate;
031: import org.continuent.sequoia.common.log.Trace;
032: import org.continuent.sequoia.controller.loadbalancer.AllBackendsFailedException;
033: import org.continuent.sequoia.controller.requestmanager.TransactionMetaData;
034: import org.continuent.sequoia.controller.requestmanager.distributed.DistributedRequestManager;
035: import org.continuent.sequoia.controller.requests.AbstractRequest;
036: import org.continuent.sequoia.controller.requests.UnknownWriteRequest;
037:
038: /**
039: * Execute a distributed commit.
040: *
041: * @author <a href="mailto:Emmanuel.Cecchet@inria.fr">Emmanuel Cecchet </a>
042: * @version 1.0
043: */
044: public class DistributedCommit extends DistributedTransactionMarker {
045: private static final long serialVersionUID = 1222810057093662283L;
046:
047: // Login that commits the transaction. This is used in case the remote
048: // controller has to log the commit but didn't see the begin in which case it
049: // will not be able to retrieve the transaction marker metadata
050: private String login;
051:
052: /**
053: * Creates a new <code>Commit</code> message.
054: *
055: * @param login login that commit the transaction
056: * @param transactionId id of the transaction to commit
057: */
058: public DistributedCommit(String login, long transactionId) {
059: super (transactionId);
060: this .login = login;
061: }
062:
063: /**
064: * @see org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedTransactionMarker#scheduleCommand(org.continuent.sequoia.controller.requestmanager.distributed.DistributedRequestManager)
065: */
066: public Object scheduleCommand(DistributedRequestManager drm)
067: throws SQLException {
068: LinkedList totalOrderQueue = drm.getVirtualDatabase()
069: .getTotalOrderQueue();
070: if (totalOrderQueue != null) {
071: synchronized (totalOrderQueue) {
072: totalOrderQueue.addLast(this );
073: }
074: }
075: return this ;
076: }
077:
078: /**
079: * Execution of a distributed commit command on the specified
080: * <code>DistributedRequestManager</code>
081: *
082: * @param drm the DistributedRequestManager that will execute the commit
083: * @return Boolean.TRUE if everything went fine or a SQLException if an error
084: * occured
085: * @throws SQLException if an error occurs
086: */
087: public Serializable executeCommand(DistributedRequestManager drm)
088: throws SQLException {
089: boolean transactionStartedOnThisController = true;
090: Long tid = new Long(transactionId);
091: TransactionMetaData tm;
092: try {
093: tm = drm.getTransactionMetaData(tid);
094: } catch (SQLException ignore) {
095: // The transaction was started before the controller joined the
096: // cluster, build a fake tm so that we will be able to log it.
097: transactionStartedOnThisController = false;
098: tm = new TransactionMetaData(transactionId, 0, login,
099: false, 0);
100: }
101:
102: Trace logger = drm.getLogger();
103: boolean hasBeenScheduled = false;
104: try {
105: if (transactionStartedOnThisController) {
106: drm.getScheduler().commit(tm, false, this );
107: hasBeenScheduled = true;
108: }
109:
110: if (logger.isDebugEnabled())
111: logger.debug(Translate.get("transaction.commit", String
112: .valueOf(tid)));
113:
114: // Send to load balancer
115: drm.getLoadBalancer().commit(tm);
116:
117: // Notify the cache
118: if (drm.getResultCache() != null)
119: drm.getResultCache().commit(tm.getTransactionId());
120:
121: // Update recovery log
122: drm.getRecoveryLog().logRequestCompletion(tm.getLogId(),
123: true, 0);
124:
125: if (transactionStartedOnThisController) {
126: // Notify scheduler for completion
127: drm.getScheduler().commitCompleted(tm, true);
128: drm.completeTransaction(tid);
129: }
130: } catch (NoMoreBackendException e) {
131: addFailedCommitOnAllBackends(drm, tm, hasBeenScheduled);
132: throw e;
133: } catch (SQLException e) {
134:
135: if (tm.isReadOnly()) {
136: boolean beginLogged = false;
137: if (drm.getRecoveryLog() != null) {
138: beginLogged = drm.getRecoveryLog()
139: .hasLoggedBeginForTransaction(tid);
140: }
141: if (!beginLogged) {
142: if (logger.isWarnEnabled())
143:
144: {
145: logger
146: .warn("Ignoring failure of commit for read-only transaction, exception was: "
147: + e);
148: }
149:
150: if (hasBeenScheduled)
151: drm.getScheduler().commitCompleted(tm, true);
152:
153: if (transactionStartedOnThisController) {
154: drm.completeTransaction(tid);
155: }
156: return Boolean.TRUE;
157: }
158: }
159: addFailedCommitOnAllBackends(drm, tm, hasBeenScheduled);
160: logger
161: .warn(
162: Translate
163: .get("virtualdatabase.distributed.commit.sqlexception"),
164: e);
165: return e;
166: } catch (RuntimeException re) {
167: addFailedCommitOnAllBackends(drm, tm, hasBeenScheduled);
168: logger
169: .warn(
170: Translate
171: .get("virtualdatabase.distributed.commit.exception"),
172: re);
173: throw new SQLException(re.getMessage());
174: } catch (AllBackendsFailedException e) {
175: addFailedCommitOnAllBackends(drm, tm, hasBeenScheduled);
176: if (logger.isDebugEnabled())
177: logger
178: .debug(Translate
179: .get(
180: "virtualdatabase.distributed.commit.all.backends.locally.failed",
181: transactionId));
182: return e;
183: }
184:
185: return Boolean.TRUE;
186: }
187:
188: private void addFailedCommitOnAllBackends(
189: DistributedRequestManager drm, TransactionMetaData tm,
190: boolean hasBeenScheduled) {
191: AbstractRequest request = new UnknownWriteRequest("commit",
192: false, 0, "\n");
193: request.setTransactionId(transactionId);
194: request.setLogId(tm.getLogId());
195: drm.addFailedOnAllBackends(request, hasBeenScheduled);
196: }
197:
198: /**
199: * @see java.lang.Object#toString()
200: */
201: public String toString() {
202: return "Commit transaction " + transactionId;
203: }
204: }
|