001: /**
002: * Sequoia: Database clustering technology.
003: * Copyright (C) 2002-2004 French National Institute For Research In Computer
004: * Science And Control (INRIA).
005: * Copyright (C) 2005 AmicoSoft, Inc. dba Emic Networks
006: * Copyright (C) 2005-2006 Continuent, Inc.
007: * Contact: sequoia@continuent.org
008: *
009: * Licensed under the Apache License, Version 2.0 (the "License");
010: * you may not use this file except in compliance with the License.
011: * You may obtain a copy of the License at
012: *
013: * http://www.apache.org/licenses/LICENSE-2.0
014: *
015: * Unless required by applicable law or agreed to in writing, software
016: * distributed under the License is distributed on an "AS IS" BASIS,
017: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018: * See the License for the specific language governing permissions and
019: * limitations under the License.
020: *
021: * Initial developer(s): Emmanuel Cecchet.
022: * Contributor(s): ______________________.
023: */package org.continuent.sequoia.controller.loadbalancer.tasks;
024:
025: import java.sql.Connection;
026: import java.sql.SQLException;
027:
028: import org.continuent.sequoia.common.exceptions.NoTransactionStartWhenDisablingException;
029: import org.continuent.sequoia.common.exceptions.UnreachableBackendException;
030: import org.continuent.sequoia.common.i18n.Translate;
031: import org.continuent.sequoia.common.log.Trace;
032: import org.continuent.sequoia.controller.backend.DatabaseBackend;
033: import org.continuent.sequoia.controller.backend.result.ExecuteResult;
034: import org.continuent.sequoia.controller.cache.metadata.MetadataCache;
035: import org.continuent.sequoia.controller.connection.AbstractConnectionManager;
036: import org.continuent.sequoia.controller.connection.PooledConnection;
037: import org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer;
038: import org.continuent.sequoia.controller.loadbalancer.BackendWorkerThread;
039: import org.continuent.sequoia.controller.requests.AbstractRequest;
040: import org.continuent.sequoia.controller.requests.AbstractWriteRequest;
041:
042: /**
043: * Executes an <code>AbstractRequest</code> using Statement.execute() and
044: * returns multiple results stored in an ExecuteResult object.
045: *
046: * @author <a href="mailto:emmanuel.cecchet@continuent.com">Emmanuel Cecchet
047: * </a>
048: * @version 1.0
049: */
050: public class StatementExecuteTask extends AbstractTask {
051: private AbstractWriteRequest request;
052: private ExecuteResult result = null;
053: private MetadataCache metadataCache;
054:
055: static Trace endUserLogger = Trace
056: .getLogger("org.continuent.sequoia.enduser");
057:
058: /**
059: * Creates a new <code>StatementExecuteQueryTask</code>.
060: *
061: * @param nbToComplete number of threads that must succeed before returning
062: * @param totalNb total number of threads
063: * @param request the request to execute
064: * @param metadataCache the metadataCache if any or null
065: */
066: public StatementExecuteTask(int nbToComplete, int totalNb,
067: AbstractWriteRequest request, MetadataCache metadataCache) {
068: super (nbToComplete, totalNb, request.isPersistentConnection(),
069: request.getPersistentConnectionId());
070: this .request = request;
071: this .metadataCache = metadataCache;
072: }
073:
074: /**
075: * Call a stored procedure that returns a ResultSet on the given backend
076: * thread.
077: *
078: * @param backendThread the backend thread that will execute the task
079: * @throws SQLException if an error occurs
080: */
081: public void executeTask(BackendWorkerThread backendThread)
082: throws SQLException {
083: DatabaseBackend backend = backendThread.getBackend();
084:
085: try {
086: AbstractConnectionManager cm = backend
087: .getConnectionManager(request.getLogin());
088: if (cm == null) {
089: SQLException se = new SQLException(
090: "No Connection Manager for Virtual Login:"
091: + request.getLogin());
092: try {
093: notifyFailure(backendThread, -1, se);
094: } catch (SQLException ignore) {
095:
096: }
097: throw se;
098: }
099:
100: Trace logger = backendThread.getLogger();
101: if (request.isAutoCommit())
102: executeInAutoCommit(backendThread, backend, cm, logger);
103: else
104: executeInTransaction(backendThread, backend, cm, logger);
105:
106: if (result != null)
107: notifySuccess(backendThread);
108: } finally {
109: backend.getTaskQueues().completeWriteRequestExecution(this );
110: }
111: }
112:
113: private void executeInAutoCommit(BackendWorkerThread backendThread,
114: DatabaseBackend backend, AbstractConnectionManager cm,
115: Trace logger) throws SQLException {
116: if (!backend.canAcceptTasks(request)) {
117: // Backend is disabling, we do not execute queries except the one in the
118: // transaction we already started. Just notify the completion for the
119: // others.
120: notifyCompletion(backendThread);
121: return;
122: }
123:
124: // Use a connection just for this request
125: PooledConnection c = null;
126: try {
127: c = cm.retrieveConnectionInAutoCommit(request);
128: } catch (UnreachableBackendException e1) {
129: SQLException se = new SQLException("Backend "
130: + backend.getName() + " is no more reachable.");
131: try {
132: notifyFailure(backendThread, -1, se);
133: } catch (SQLException ignore) {
134: }
135: // Disable this backend (it is no more in sync) by killing the backend
136: // thread
137: backendThread.getLoadBalancer().disableBackend(backend,
138: true);
139: String msg = Translate.get(
140: "loadbalancer.backend.disabling.unreachable",
141: backend.getName());
142: logger.error(msg);
143: endUserLogger.error(msg);
144: throw se;
145: }
146:
147: // Sanity check
148: if (c == null) {
149: SQLException se = new SQLException("No more connections");
150: try { // All backends failed, just ignore
151: if (!notifyFailure(backendThread,
152: request.getTimeout() * 1000L, se))
153: return;
154: } catch (SQLException ignore) {
155: }
156: // Disable this backend (it is no more in sync) by killing the backend
157: // thread
158: backendThread.getLoadBalancer().disableBackend(backend,
159: true);
160: String msg = "Stored procedure '"
161: + request.getSqlShortForm(backend
162: .getSqlShortFormLength())
163: + "' failed on backend " + backend.getName()
164: + " but " + getSuccess() + " succeeded (" + se
165: + ")";
166: logger.error(msg);
167: endUserLogger.error(Translate
168: .get("loadbalancer.backend.disabling", backend
169: .getName()));
170: throw new SQLException(msg);
171: }
172:
173: // Execute Query
174: try {
175: result = AbstractLoadBalancer
176: .executeStatementExecuteOnBackend(request, backend,
177: backendThread, c, metadataCache);
178:
179: backend.updateDatabaseBackendSchema(request);
180: } catch (Exception e) {
181: try { // All backends failed, just ignore
182: if (!notifyFailure(backendThread,
183: request.getTimeout() * 1000L, e)) {
184: result = null;
185: return;
186: }
187: } catch (SQLException ignore) {
188: }
189: // Disable this backend (it is no more in sync) by killing the backend
190: // thread
191: backendThread.getLoadBalancer().disableBackend(backend,
192: true);
193: String msg = "Stored procedure '"
194: + request.getSqlShortForm(backend
195: .getSqlShortFormLength())
196: + "' failed on backend " + backend.getName()
197: + " but " + getSuccess() + " succeeded (" + e + ")";
198: logger.error(msg);
199: endUserLogger.error(Translate
200: .get("loadbalancer.backend.disabling", backend
201: .getName()));
202: throw new SQLException(msg);
203: } finally {
204: cm.releaseConnectionInAutoCommit(request, c);
205: }
206: }
207:
208: private void executeInTransaction(
209: BackendWorkerThread backendThread, DatabaseBackend backend,
210: AbstractConnectionManager cm, Trace logger)
211: throws SQLException {
212: // Re-use the connection used by this transaction
213: Connection c;
214: long tid = request.getTransactionId();
215:
216: try {
217: c = backend
218: .getConnectionForTransactionAndLazyBeginIfNeeded(
219: request, cm);
220: } catch (UnreachableBackendException ube) {
221: SQLException se = new SQLException("Backend "
222: + backend.getName() + " is no more reachable.");
223: try {
224: notifyFailure(backendThread, -1, se);
225: } catch (SQLException ignore) {
226: }
227: // Disable this backend (it is no more in sync) by killing the backend
228: // thread
229: backendThread.getLoadBalancer().disableBackend(backend,
230: true);
231: String msg = Translate.get(
232: "loadbalancer.backend.disabling.unreachable",
233: backend.getName());
234: logger.error(msg);
235: endUserLogger.error(msg);
236: throw se;
237: } catch (NoTransactionStartWhenDisablingException e) {
238: // Backend is disabling, we do not execute queries except the one in the
239: // transaction we already started. Just notify the completion for the
240: // others.
241: notifyCompletion(backendThread);
242: return;
243: } catch (SQLException e1) {
244: SQLException se = new SQLException(
245: "Unable to get connection for transaction " + tid);
246: try { // All backends failed, just ignore
247: if (!notifyFailure(backendThread,
248: request.getTimeout() * 1000L, se))
249: return;
250: } catch (SQLException ignore) {
251: }
252: // Disable this backend (it is no more in sync) by killing the
253: // backend thread
254: backendThread.getLoadBalancer().disableBackend(backend,
255: true);
256: String msg = "Request '"
257: + request.getSqlShortForm(backend
258: .getSqlShortFormLength())
259: + "' failed on backend " + backend.getName()
260: + " but " + getSuccess() + " succeeded (" + se
261: + ")";
262: logger.error(msg);
263: endUserLogger.error(Translate
264: .get("loadbalancer.backend.disabling", backend
265: .getName()));
266: throw new SQLException(msg);
267: }
268:
269: // Sanity check
270: if (c == null) { // Bad connection
271: SQLException se = new SQLException(
272: "Unable to retrieve connection for transaction "
273: + tid);
274: try { // All backends failed, just ignore
275: if (!notifyFailure(backendThread,
276: request.getTimeout() * 1000L, se))
277: return;
278: } catch (SQLException ignore) {
279: }
280: // Disable this backend (it is no more in sync) by killing the
281: // backend thread
282: backendThread.getLoadBalancer().disableBackend(backend,
283: true);
284: String msg = "Request '"
285: + request.getSqlShortForm(backend
286: .getSqlShortFormLength())
287: + "' failed on backend " + backend.getName()
288: + " but " + getSuccess() + " succeeded (" + se
289: + ")";
290: logger.error(msg);
291: endUserLogger.error(Translate
292: .get("loadbalancer.backend.disabling", backend
293: .getName()));
294: throw new SQLException(msg);
295: }
296:
297: // Execute Query
298: try {
299: result = AbstractLoadBalancer
300: .executeStatementExecuteOnBackend(request, backend,
301: backendThread,
302: cm.retrieveConnectionForTransaction(tid),
303: metadataCache);
304:
305: backend.updateDatabaseBackendSchema(request);
306: } catch (Exception e) {
307: try { // All backends failed, just ignore
308: if (!notifyFailure(backendThread,
309: request.getTimeout() * 1000L, e)) {
310: result = null;
311: return;
312: }
313: } catch (SQLException ignore) {
314: }
315: // Disable this backend (it is no more in sync) by killing the backend
316: // thread
317: backendThread.getLoadBalancer().disableBackend(backend,
318: true);
319: String msg = "Stored procedure '"
320: + request.getSqlShortForm(backend
321: .getSqlShortFormLength())
322: + "' failed on backend " + backend.getName()
323: + " but " + getSuccess() + " succeeded (" + e + ")";
324: logger.error(msg);
325: endUserLogger.error(Translate
326: .get("loadbalancer.backend.disabling", backend
327: .getName()));
328: throw new SQLException(msg);
329: }
330: }
331:
332: /**
333: * @see org.continuent.sequoia.controller.loadbalancer.tasks.AbstractTask#getRequest()
334: */
335: public AbstractRequest getRequest() {
336: return request;
337: }
338:
339: /**
340: * Returns the results.
341: *
342: * @return an <code>ExecuteResult</code> object
343: */
344: public ExecuteResult getResult() {
345: return result;
346: }
347:
348: /**
349: * @see org.continuent.sequoia.controller.loadbalancer.tasks.AbstractTask#getTransactionId()
350: */
351: public long getTransactionId() {
352: return request.getTransactionId();
353: }
354:
355: /**
356: * @see org.continuent.sequoia.controller.loadbalancer.tasks.AbstractTask#isAutoCommit()
357: */
358: public boolean isAutoCommit() {
359: return request.isAutoCommit();
360: }
361:
362: /**
363: * @see java.lang.Object#equals(java.lang.Object)
364: */
365: public boolean equals(Object other) {
366: if ((other == null) || !(other instanceof StatementExecuteTask))
367: return false;
368:
369: StatementExecuteTask sexec = (StatementExecuteTask) other;
370: return this .request.equals(sexec.getRequest());
371: }
372:
373: /**
374: * @see java.lang.Object#hashCode()
375: */
376: public int hashCode() {
377: return (int) request.getId();
378: }
379:
380: /**
381: * @see java.lang.Object#toString()
382: */
383: public String toString() {
384: if (request.isAutoCommit())
385: return "Autocommit StatementExecuteTask "
386: + request.getTransactionId() + " ("
387: + request.getUniqueKey() + ")";
388: else
389: return "StatementExecuteTask for transaction "
390: + request.getTransactionId() + " ("
391: + request.getUniqueKey() + ")";
392: }
393: }
|