001: /**
002: * Sequoia: Database clustering technology.
003: * Copyright (C) 2002-2004 French National Institute For Research In Computer
004: * Science And Control (INRIA).
005: * Contact: sequoia@continuent.org
006: *
007: * Licensed under the Apache License, Version 2.0 (the "License");
008: * you may not use this file except in compliance with the License.
009: * You may obtain a copy of the License at
010: *
011: * http://www.apache.org/licenses/LICENSE-2.0
012: *
013: * Unless required by applicable law or agreed to in writing, software
014: * distributed under the License is distributed on an "AS IS" BASIS,
015: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016: * See the License for the specific language governing permissions and
017: * limitations under the License.
018: *
019: * Initial developer(s): Emmanuel Cecchet.
020: * Contributor(s): Julie Marguerite.
021: */package org.continuent.sequoia.controller.loadbalancer.tasks;
022:
023: import java.sql.Connection;
024: import java.sql.SQLException;
025:
026: import org.continuent.sequoia.common.exceptions.NoTransactionStartWhenDisablingException;
027: import org.continuent.sequoia.common.exceptions.UnreachableBackendException;
028: import org.continuent.sequoia.common.i18n.Translate;
029: import org.continuent.sequoia.common.log.Trace;
030: import org.continuent.sequoia.controller.backend.DatabaseBackend;
031: import org.continuent.sequoia.controller.connection.AbstractConnectionManager;
032: import org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer;
033: import org.continuent.sequoia.controller.loadbalancer.BackendWorkerThread;
034: import org.continuent.sequoia.controller.requestmanager.TransactionMetaData;
035: import org.continuent.sequoia.controller.requests.AbstractRequest;
036: import org.continuent.sequoia.controller.requests.UnknownReadRequest;
037:
038: /**
039: * Task to begin a transaction. Note that this task does not properly set the
040: * transaction isolation but this is not a real issue since it is meant to be
041: * used by the recovery log that does not execute reads and provide its own
042: * serial order.
043: * <p>
044: * <strong>This task is meant to be only used to replay the recovery log</strong>
045: *
046: * @author <a href="mailto:Emmanuel.Cecchet@inria.fr">Emmanuel Cecchet </a>
047: * @author <a href="mailto:Julie.Marguerite@inria.fr">Julie Marguerite </a>
048: * @version 1.0
049: */
050: public class BeginTask extends AbstractTask {
051: // Transaction metadata (login, transaction id, timeout)
052: private TransactionMetaData tm;
053: private AbstractRequest request;
054: static Trace endUserLogger = Trace
055: .getLogger("org.continuent.sequoia.enduser");
056:
057: /**
058: * Begins a new transaction given a login and a transaction id.
059: *
060: * @param nbToComplete number of threads that must succeed before returning
061: * @param totalNb total number of threads
062: * @param tm transaction metadata
063: * @throws NullPointerException if tm is null
064: */
065: public BeginTask(int nbToComplete, int totalNb,
066: TransactionMetaData tm) throws NullPointerException {
067: super (nbToComplete, totalNb, tm.isPersistentConnection(), tm
068: .getPersistentConnectionId());
069: if (tm == null)
070: throw new NullPointerException(
071: "Unexpected null metadata in BeginTask");
072: this .tm = tm;
073: this .request = new UnknownReadRequest("begin", false, 0, "");
074: request.setLogin(tm.getLogin());
075: request.setIsAutoCommit(false);
076: request.setTransactionId(tm.getTransactionId());
077: request.setPersistentConnection(tm.isPersistentConnection());
078: request.setPersistentConnectionId(tm
079: .getPersistentConnectionId());
080: request
081: .setTransactionIsolation(org.continuent.sequoia.driver.Connection.DEFAULT_TRANSACTION_ISOLATION_LEVEL);
082: }
083:
084: /**
085: * Begins a new transaction with the given backend thread.
086: *
087: * @param backendThread the backend thread that will execute the task
088: * @exception SQLException if an error occurs
089: */
090: public void executeTask(BackendWorkerThread backendThread)
091: throws SQLException {
092: DatabaseBackend backend = backendThread.getBackend();
093: if (!backend.isReplaying())
094: throw new SQLException(
095: "BeginTask should only be used for recovery purposes. Detected incorrect backend state:"
096: + backend.getState());
097:
098: try {
099: AbstractConnectionManager cm = backend
100: .getConnectionManager(tm.getLogin());
101: if (cm == null) {
102: SQLException se = new SQLException(
103: "No Connection Manager for Virtual Login:"
104: + tm.getLogin());
105: try {
106: notifyFailure(backendThread, 1, se);
107: } catch (SQLException ignore) {
108:
109: }
110: throw se;
111: }
112:
113: Connection c;
114: Long lTid = new Long(tm.getTransactionId());
115: Trace logger = backendThread.getLogger();
116:
117: try {
118: c = AbstractLoadBalancer
119: .getConnectionAndBeginTransaction(backend, cm,
120: request);
121: backend.startTransaction(lTid);
122: } catch (UnreachableBackendException ube) {
123: SQLException se = new SQLException("Backend "
124: + backend.getName() + " is no more reachable.");
125: try {
126: notifyFailure(backendThread, -1, se);
127: } catch (SQLException ignore) {
128: }
129: // Disable this backend (it is no more in sync) by killing the backend
130: // thread
131: backendThread.getLoadBalancer().disableBackend(backend,
132: true);
133: String msg = Translate.get(
134: "loadbalancer.backend.disabling.unreachable",
135: backend.getName());
136: logger.error(msg);
137: endUserLogger.error(msg);
138: throw se;
139: } catch (NoTransactionStartWhenDisablingException e) {
140: // Backend is disabling, we do not execute queries except the one in the
141: // transaction we already started. Just notify the completion for the
142: // others.
143: notifyCompletion(backendThread);
144: return;
145: } catch (SQLException e1) {
146: SQLException se = new SQLException(
147: "Unable to get connection for transaction "
148: + lTid);
149: try { // All backends failed, just ignore
150: if (!notifyFailure(backendThread, tm.getTimeout(),
151: se))
152: return;
153: } catch (SQLException ignore) {
154: }
155: // Disable this backend (it is no more in sync) by killing the
156: // backend thread
157: backendThread.getLoadBalancer().disableBackend(backend,
158: true);
159: String msg = "Begin of transaction "
160: + tm.getTransactionId() + " failed on backend "
161: + backend.getName() + " but " + getSuccess()
162: + " succeeded (" + se + ")";
163: logger.error(msg);
164: endUserLogger.error(Translate.get(
165: "loadbalancer.backend.disabling", backend
166: .getName()));
167: throw new SQLException(msg);
168: }
169:
170: // Sanity check
171: if (c == null) { // Bad connection
172: SQLException se = new SQLException(
173: "No more connection to start a new transaction.");
174: try { // All backends failed, just ignore
175: if (!notifyFailure(backendThread, tm.getTimeout(),
176: se))
177: return;
178: } catch (SQLException ignore) {
179: }
180: } else {
181: notifySuccess(backendThread);
182: }
183: } catch (Exception e) {
184: try {
185: if (!notifyFailure(backendThread, tm.getTimeout(),
186: new SQLException(e.getMessage())))
187: return;
188: } catch (SQLException ignore) {
189: }
190: String msg = "Failed to begin transaction "
191: + tm.getTransactionId() + " on backend "
192: + backend.getName() + " (" + e + ")";
193: backendThread.getLogger().error(msg);
194: throw new SQLException(msg);
195: }
196: }
197:
198: /**
199: * @see org.continuent.sequoia.controller.loadbalancer.tasks.AbstractTask#getRequest()
200: */
201: public AbstractRequest getRequest() {
202: return request;
203: }
204:
205: /**
206: * @see org.continuent.sequoia.controller.loadbalancer.tasks.AbstractTask#getTransactionId()
207: */
208: public long getTransactionId() {
209: return tm.getTransactionId();
210: }
211:
212: /**
213: * @see org.continuent.sequoia.controller.loadbalancer.tasks.AbstractTask#isAutoCommit()
214: */
215: public boolean isAutoCommit() {
216: return false;
217: }
218:
219: /**
220: * @see java.lang.Object#equals(java.lang.Object)
221: */
222: public boolean equals(Object other) {
223: if ((other == null) || !(other instanceof BeginTask))
224: return false;
225:
226: BeginTask begin = (BeginTask) other;
227: return this .getTransactionId() == begin.getTransactionId();
228: }
229:
230: /**
231: * @see java.lang.Object#hashCode()
232: */
233: public int hashCode() {
234: return (int) this .getTransactionId();
235: }
236:
237: /**
238: * @see java.lang.Object#toString()
239: */
240: public String toString() {
241: return "BeginTask (" + tm.getLogin() + ","
242: + tm.getTransactionId() + ")";
243: }
244:
245: }
|