001: /**
002: * Sequoia: Database clustering technology.
003: * Copyright (C) 2002-2004 French National Institute For Research In Computer
004: * Science And Control (INRIA).
005: * Copyright (C) 2006 Continuent, Inc.
006: * Contact: sequoia@continuent.org
007: *
008: * Licensed under the Apache License, Version 2.0 (the "License");
009: * you may not use this file except in compliance with the License.
010: * You may obtain a copy of the License at
011: *
012: * http://www.apache.org/licenses/LICENSE-2.0
013: *
014: * Unless required by applicable law or agreed to in writing, software
015: * distributed under the License is distributed on an "AS IS" BASIS,
016: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017: * See the License for the specific language governing permissions and
018: * limitations under the License.
019: *
020: * Initial developer(s): Emmanuel Cecchet.
021: * Contributor(s): Julie Marguerite.
022: */package org.continuent.sequoia.controller.loadbalancer.tasks;
023:
024: import java.sql.Connection;
025: import java.sql.SQLException;
026:
027: import org.continuent.sequoia.common.i18n.Translate;
028: import org.continuent.sequoia.common.log.Trace;
029: import org.continuent.sequoia.controller.backend.DatabaseBackend;
030: import org.continuent.sequoia.controller.connection.AbstractConnectionManager;
031: import org.continuent.sequoia.controller.connection.PooledConnection;
032: import org.continuent.sequoia.controller.loadbalancer.BackendWorkerThread;
033: import org.continuent.sequoia.controller.requestmanager.TransactionMetaData;
034: import org.continuent.sequoia.controller.requests.AbstractRequest;
035:
036: /**
037: * Task to commit a transaction.
038: *
039: * @author <a href="mailto:Emmanuel.Cecchet@inria.fr">Emmanuel Cecchet </a>
040: * @author <a href="mailto:Julie.Marguerite@inria.fr">Julie Marguerite </a>
041: * @version 1.0
042: */
043: public class CommitTask extends AbstractTask {
044: // Transaction metadata (login, transaction id, timeout)
045: private TransactionMetaData tm;
046:
047: static Trace endUserLogger = Trace
048: .getLogger("org.continuent.sequoia.enduser");
049:
050: /**
051: * Commits a transaction given a login and a transaction id.
052: *
053: * @param nbToComplete number of threads that must succeed before returning
054: * @param totalNb total number of threads
055: * @param tm transaction metadata
056: * @throws NullPointerException if tm is null
057: */
058: public CommitTask(int nbToComplete, int totalNb,
059: TransactionMetaData tm) throws NullPointerException {
060: super (nbToComplete, totalNb, tm.isPersistentConnection(), tm
061: .getPersistentConnectionId());
062: if (tm == null)
063: throw new NullPointerException(
064: "Unexpected null metadata in BeginTask");
065: this .tm = tm;
066: }
067:
068: /**
069: * Commits a transaction with the given backend thread.
070: *
071: * @param backendThread the backend thread that will execute the task
072: * @throws SQLException if an error occurs
073: */
074: public void executeTask(BackendWorkerThread backendThread)
075: throws SQLException {
076: DatabaseBackend backend = backendThread.getBackend();
077: Long lTid = new Long(tm.getTransactionId());
078:
079: AbstractConnectionManager cm = backend.getConnectionManager(tm
080: .getLogin());
081: if (cm == null) {
082: SQLException se = new SQLException(
083: "No Connection Manager for Virtual Login:"
084: + tm.getLogin());
085: try {
086: notifyFailure(backendThread, -1, se);
087: } catch (SQLException ignore) {
088:
089: }
090: throw se;
091: }
092: PooledConnection pc = cm.retrieveConnectionForTransaction(tm
093: .getTransactionId());
094:
095: // Sanity check
096: if (pc == null) { // Bad connection
097: backend.stopTransaction(lTid);
098: SQLException se = new SQLException(
099: "Unable to retrieve connection for transaction "
100: + tm.getTransactionId());
101:
102: try { // All backends failed, just ignore
103: if (!notifyFailure(backendThread, tm.getTimeout(), se))
104: return;
105: } catch (SQLException ignore) {
106: }
107: // Disable this backend (it is no more in sync) by killing the backend
108: // thread
109: backendThread.getLoadBalancer().disableBackend(backend,
110: true);
111: String msg = "Failed to commit transaction "
112: + tm.getTransactionId() + " on backend "
113: + backend.getName() + " but " + getSuccess()
114: + " succeeded (" + se + ")";
115: backendThread.getLogger().error(msg);
116: endUserLogger.error(Translate
117: .get("loadbalancer.backend.disabling", backend
118: .getName()));
119: throw new SQLException(msg);
120: }
121:
122: // Execute Query
123: try {
124: Connection c = pc.getConnection();
125: c.commit();
126: c.setAutoCommit(true);
127: } catch (Exception e) {
128: try {
129: if (!notifyFailure(backendThread, tm.getTimeout(),
130: new SQLException(e.getMessage())))
131: return;
132: } catch (SQLException ignore) {
133: }
134: // Disable this backend (it is no more in sync) by killing the backend
135: // thread
136: backendThread.getLoadBalancer().disableBackend(backend,
137: true);
138: String msg = "Failed to commit transaction "
139: + tm.getTransactionId() + " on backend "
140: + backend.getName() + " but " + getSuccess()
141: + " succeeded (" + e + ")";
142: backendThread.getLogger().error(msg);
143: endUserLogger.error(Translate
144: .get("loadbalancer.backend.disabling", backend
145: .getName()));
146: throw new SQLException(msg);
147: } finally {
148: cm.releaseConnectionForTransaction(tm.getTransactionId());
149: backend.stopTransaction(lTid);
150: backend.getTaskQueues()
151: .releaseLocksAndCheckForPriorityInversion(tm);
152: }
153: notifySuccess(backendThread);
154: }
155:
156: /**
157: * @see org.continuent.sequoia.controller.loadbalancer.tasks.AbstractTask#getRequest()
158: */
159: public AbstractRequest getRequest() {
160: return null;
161: }
162:
163: /**
164: * @see org.continuent.sequoia.controller.loadbalancer.tasks.AbstractTask#getTransactionId()
165: */
166: public long getTransactionId() {
167: return tm.getTransactionId();
168: }
169:
170: /**
171: * @see org.continuent.sequoia.controller.loadbalancer.tasks.AbstractTask#isAutoCommit()
172: */
173: public boolean isAutoCommit() {
174: return false;
175: }
176:
177: /**
178: * @see java.lang.Object#equals(java.lang.Object)
179: */
180: public boolean equals(Object other) {
181: if ((other == null) || !(other instanceof CommitTask))
182: return false;
183:
184: CommitTask commit = (CommitTask) other;
185: return this .getTransactionId() == commit.getTransactionId();
186: }
187:
188: /**
189: * @see java.lang.Object#hashCode()
190: */
191: public int hashCode() {
192: return (int) this .getTransactionId();
193: }
194:
195: /**
196: * @see java.lang.Object#toString()
197: */
198: public String toString() {
199: return "CommitTask (" + tm.getTransactionId() + ")";
200: }
201: }
|