001: /**
002: * Sequoia: Database clustering technology.
003: * Copyright (C) 2002-2004 French National Institute For Research In Computer
004: * Science And Control (INRIA).
005: * Copyright (C) 2005-2006 Continuent, Inc.
006: * Contact: sequoia@continuent.org
007: *
008: * Licensed under the Apache License, Version 2.0 (the "License");
009: * you may not use this file except in compliance with the License.
010: * You may obtain a copy of the License at
011: *
012: * http://www.apache.org/licenses/LICENSE-2.0
013: *
014: * Unless required by applicable law or agreed to in writing, software
015: * distributed under the License is distributed on an "AS IS" BASIS,
016: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017: * See the License for the specific language governing permissions and
018: * limitations under the License.
019: *
020: * Initial developer(s): Emmanuel Cecchet.
021: * Contributor(s): ______________________.
022: */package org.continuent.sequoia.controller.loadbalancer.tasks;
023:
024: import java.sql.Connection;
025: import java.sql.SQLException;
026: import java.util.HashMap;
027: import java.util.Map;
028:
029: import org.continuent.sequoia.common.exceptions.UnreachableBackendException;
030: import org.continuent.sequoia.common.i18n.Translate;
031: import org.continuent.sequoia.common.log.Trace;
032: import org.continuent.sequoia.common.sql.schema.DatabaseProcedureSemantic;
033: import org.continuent.sequoia.controller.backend.DatabaseBackend;
034: import org.continuent.sequoia.controller.backend.result.ExecuteUpdateResult;
035: import org.continuent.sequoia.controller.connection.AbstractConnectionManager;
036: import org.continuent.sequoia.controller.connection.PooledConnection;
037: import org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer;
038: import org.continuent.sequoia.controller.loadbalancer.BackendWorkerThread;
039: import org.continuent.sequoia.controller.requests.AbstractRequest;
040: import org.continuent.sequoia.controller.requests.StoredProcedure;
041:
042: /**
043: * Executes a write <code>StoredProcedure</code> call.
044: *
045: * @author <a href="mailto:Emmanuel.Cecchet@inria.fr">Emmanuel Cecchet </a>
046: * @version 1.0
047: */
048: public class CallableStatementExecuteUpdateTask extends AbstractTask {
049: private StoredProcedure proc;
050:
051: /**
052: * results : store results from all the backends
053: */
054: private Map results = null;
055: /**
056: * result : this is the result for the first backend to succeed
057: */
058: private ExecuteUpdateResult result;
059:
060: static Trace endUserLogger = Trace
061: .getLogger("org.continuent.sequoia.enduser");
062:
063: /**
064: * Creates a new <code>CallableStatementExecuteUpdateTask</code>.
065: *
066: * @param nbToComplete number of threads that must succeed before returning
067: * @param totalNb total number of threads
068: * @param proc the <code>StoredProcedure</code> to call
069: */
070: public CallableStatementExecuteUpdateTask(int nbToComplete,
071: int totalNb, StoredProcedure proc) {
072: super (nbToComplete, totalNb, proc.isPersistentConnection(),
073: proc.getPersistentConnectionId());
074: this .proc = proc;
075: this .results = new HashMap();
076: }
077:
078: /**
079: * Executes a write request with the given backend thread.
080: *
081: * @param backendThread the backend thread that will execute the task
082: * @throws SQLException if an error occurs
083: */
084: public void executeTask(BackendWorkerThread backendThread)
085: throws SQLException {
086: DatabaseBackend backend = backendThread.getBackend();
087:
088: try {
089: AbstractConnectionManager cm = backend
090: .getConnectionManager(proc.getLogin());
091: if (cm == null) {
092: SQLException se = new SQLException(
093: "No Connection Manager for Virtual Login:"
094: + proc.getLogin());
095: try {
096: notifyFailure(backendThread, -1, se);
097: } catch (SQLException ignore) {
098:
099: }
100: throw se;
101: }
102:
103: Trace logger = backendThread.getLogger();
104: if (proc.isAutoCommit())
105: executeInAutoCommit(backendThread, backend, cm, logger);
106: else
107: executeInTransaction(backendThread, backend, cm, logger);
108:
109: if (result == null)
110: return; // failure already handled, just return
111:
112: int resultOnFirstBackendToSucceed = notifySuccess(
113: backendThread, result.getUpdateCount());
114: if (results.get(backendThread) != null
115: && results.get(backendThread) instanceof ExecuteUpdateResult
116: && resultOnFirstBackendToSucceed != ((ExecuteUpdateResult) results
117: .get(backendThread)).getUpdateCount()) {
118: String msg = "Disabling backend "
119: + backend.getName()
120: + " that reports a different number of updated rows ("
121: + ((ExecuteUpdateResult) results
122: .get(backendThread)).getUpdateCount()
123: + ") than first backend to succeed ("
124: + resultOnFirstBackendToSucceed
125: + ") for stored procedure " + proc;
126: logger.error(msg);
127: // Disable this backend (it is no more in sync)
128: backendThread.getLoadBalancer().disableBackend(backend,
129: true);
130: endUserLogger.error(Translate.get(
131: "loadbalancer.backend.disabling", backend
132: .getName()));
133: throw new SQLException(msg);
134: }
135: } finally {
136: backend.getTaskQueues().completeStoredProcedureExecution(
137: this );
138: }
139: }
140:
141: private void executeInAutoCommit(BackendWorkerThread backendThread,
142: DatabaseBackend backend, AbstractConnectionManager cm,
143: Trace logger) throws SQLException {
144: if (!backend.canAcceptTasks(proc)) {
145: // Backend is disabling, we do not execute queries except the one in the
146: // transaction we already started. Just notify the completion for the
147: // others.
148: notifyCompletion(backendThread);
149: return;
150: }
151:
152: // Use a connection just for this request
153: PooledConnection c = null;
154: try {
155: c = cm.retrieveConnectionInAutoCommit(proc);
156: } catch (UnreachableBackendException e1) {
157: SQLException se = new SQLException("Backend "
158: + backend.getName() + " is no more reachable.");
159: try {
160: notifyFailure(backendThread, -1, se);
161: } catch (SQLException ignore) {
162: }
163: // Disable this backend (it is no more in sync) by killing the backend
164: // thread
165: backendThread.getLoadBalancer().disableBackend(backend,
166: true);
167: String msg = Translate.get(
168: "loadbalancer.backend.disabling.unreachable",
169: backend.getName());
170: logger.error(msg);
171: endUserLogger.error(msg);
172: throw se;
173: }
174:
175: // Sanity check
176: if (c == null) {
177: SQLException se = new SQLException("No more connections");
178: try { // All backends failed, just ignore
179: if (!notifyFailure(backendThread,
180: proc.getTimeout() * 1000L, se))
181: return;
182: } catch (SQLException ignore) {
183: }
184: // Disable this backend (it is no more in sync) by killing the backend
185: // thread
186: backendThread.getLoadBalancer().disableBackend(backend,
187: true);
188: String msg = "Stored procedure '"
189: + proc.getSqlShortForm(backend
190: .getSqlShortFormLength())
191: + "' failed on backend " + backend.getName()
192: + " but " + getSuccess() + " succeeded (" + se
193: + ")";
194: logger.error(msg);
195: endUserLogger.error(Translate
196: .get("loadbalancer.backend.disabling", backend
197: .getName()));
198: throw new SQLException(msg);
199: }
200:
201: // Execute Query
202: try {
203: ExecuteUpdateResult tmpResult = AbstractLoadBalancer
204: .executeCallableStatementExecuteUpdateOnBackend(
205: proc, backend, backendThread, c);
206:
207: synchronized (this ) {
208: if (result == null)
209: result = tmpResult;
210: results.put(backendThread, tmpResult);
211: }
212:
213: DatabaseProcedureSemantic semantic = proc.getSemantic();
214: if ((semantic == null) || semantic.hasDDLWrite())
215: backend.setSchemaIsDirty(true, proc);
216: } catch (Exception e) {
217: try { // All backends failed, just ignore
218: if (!notifyFailure(backendThread,
219: proc.getTimeout() * 1000L, e)) {
220: result = null;
221: return;
222: }
223: } catch (SQLException ignore) {
224: }
225: // Disable this backend (it is no more in sync) by killing the backend
226: // thread
227: backendThread.getLoadBalancer().disableBackend(backend,
228: true);
229: String msg = "Stored procedure '"
230: + proc.getSqlShortForm(backend
231: .getSqlShortFormLength())
232: + "' failed on backend " + backend.getName()
233: + " but " + getSuccess() + " succeeded (" + e + ")";
234: logger.error(msg);
235: endUserLogger.error(Translate
236: .get("loadbalancer.backend.disabling", backend
237: .getName()));
238: throw new SQLException(msg);
239: } finally {
240: cm.releaseConnectionInAutoCommit(proc, c);
241: }
242: }
243:
244: private void executeInTransaction(
245: BackendWorkerThread backendThread, DatabaseBackend backend,
246: AbstractConnectionManager cm, Trace logger)
247: throws SQLException {
248: // Re-use the connection used by this transaction
249: Connection c;
250: long tid = proc.getTransactionId();
251:
252: try {
253: c = backend
254: .getConnectionForTransactionAndLazyBeginIfNeeded(
255: proc, cm);
256: } catch (UnreachableBackendException ube) {
257: SQLException se = new SQLException("Backend "
258: + backend.getName() + " is no more reachable.");
259: try {
260: notifyFailure(backendThread, -1, se);
261: } catch (SQLException ignore) {
262: }
263: // Disable this backend (it is no more in sync) by killing the backend
264: // thread
265: backendThread.getLoadBalancer().disableBackend(backend,
266: true);
267: String msg = Translate.get(
268: "loadbalancer.backend.disabling.unreachable",
269: backend.getName());
270: logger.error(msg);
271: endUserLogger.error(msg);
272: throw se;
273: } catch (SQLException e1) {
274: SQLException se = new SQLException(
275: "Unable to get connection for transaction " + tid);
276: try { // All backends failed, just ignore
277: if (!notifyFailure(backendThread,
278: proc.getTimeout() * 1000L, se))
279: return;
280: } catch (SQLException ignore) {
281: }
282: // Disable this backend (it is no more in sync) by killing the
283: // backend thread
284: backendThread.getLoadBalancer().disableBackend(backend,
285: true);
286: String msg = "Request '"
287: + proc.getSqlShortForm(backend
288: .getSqlShortFormLength())
289: + "' failed on backend " + backend.getName()
290: + " but " + getSuccess() + " succeeded (" + se
291: + ")";
292: logger.error(msg);
293: endUserLogger.error(Translate
294: .get("loadbalancer.backend.disabling", backend
295: .getName()));
296: throw new SQLException(msg);
297: }
298:
299: // Sanity check
300: if (c == null) { // Bad connection
301: SQLException se = new SQLException(
302: "Unable to retrieve connection for transaction "
303: + tid);
304: try { // All backends failed, just ignore
305: if (!notifyFailure(backendThread,
306: proc.getTimeout() * 1000L, se))
307: return;
308: } catch (SQLException ignore) {
309: }
310: // Disable this backend (it is no more in sync) by killing the
311: // backend thread
312: backendThread.getLoadBalancer().disableBackend(backend,
313: true);
314: String msg = "Request '"
315: + proc.getSqlShortForm(backend
316: .getSqlShortFormLength())
317: + "' failed on backend " + backend.getName()
318: + " but " + getSuccess() + " succeeded (" + se
319: + ")";
320: logger.error(msg);
321: endUserLogger.error(Translate
322: .get("loadbalancer.backend.disabling", backend
323: .getName()));
324: throw new SQLException(msg);
325: }
326:
327: // Execute Query
328: try {
329: ExecuteUpdateResult tmpResult = AbstractLoadBalancer
330: .executeCallableStatementExecuteUpdateOnBackend(
331: proc, backend, backendThread,
332: cm.retrieveConnectionForTransaction(tid));
333: synchronized (this ) {
334: if (result == null)
335: result = tmpResult;
336: results.put(backendThread, tmpResult);
337: }
338:
339: DatabaseProcedureSemantic semantic = proc.getSemantic();
340: if ((semantic == null) || semantic.hasDDLWrite())
341: backend.setSchemaIsDirty(true, proc);
342: } catch (Exception e) {
343: try { // All backends failed, just ignore
344: if (!notifyFailure(backendThread,
345: proc.getTimeout() * 1000L, e)) {
346: result = null;
347: return;
348: }
349: } catch (SQLException ignore) {
350: }
351: // Disable this backend (it is no more in sync) by killing the backend
352: // thread
353: backendThread.getLoadBalancer().disableBackend(backend,
354: true);
355: String msg = "Stored procedure '"
356: + proc.getSqlShortForm(backend
357: .getSqlShortFormLength())
358: + "' failed on backend " + backend.getName()
359: + " but " + getSuccess() + " succeeded (" + e + ")";
360: logger.error(msg);
361: endUserLogger.error(Translate
362: .get("loadbalancer.backend.disabling", backend
363: .getName()));
364: throw new SQLException(msg);
365: }
366: }
367:
368: /**
369: * @see org.continuent.sequoia.controller.loadbalancer.tasks.AbstractTask#getRequest()
370: */
371: public AbstractRequest getRequest() {
372: return proc;
373: }
374:
375: /**
376: * Returns the result.
377: *
378: * @return updateCount wrapped into a <code>ExecuteUpdateResult</code>
379: */
380: public ExecuteUpdateResult getResult() {
381: return result;
382: }
383:
384: /**
385: * @see org.continuent.sequoia.controller.loadbalancer.tasks.AbstractTask#getTransactionId()
386: */
387: public long getTransactionId() {
388: return proc.getTransactionId();
389: }
390:
391: /**
392: * @see org.continuent.sequoia.controller.loadbalancer.tasks.AbstractTask#isAutoCommit()
393: */
394: public boolean isAutoCommit() {
395: return proc.isAutoCommit();
396: }
397:
398: /**
399: * @see java.lang.Object#equals(java.lang.Object)
400: */
401: public boolean equals(Object other) {
402: if ((other == null)
403: || !(other instanceof CallableStatementExecuteUpdateTask))
404: return false;
405:
406: CallableStatementExecuteUpdateTask cseut = (CallableStatementExecuteUpdateTask) other;
407: if (proc == null)
408: return cseut.getRequest() == null;
409: return proc.equals(cseut.getRequest());
410: }
411:
412: /**
413: * @see java.lang.Object#hashCode()
414: */
415: public int hashCode() {
416: return (int) proc.getId();
417: }
418:
419: /**
420: * @see java.lang.Object#toString()
421: */
422: public String toString() {
423: if (proc.isAutoCommit())
424: return "Autocommit CallableStatementExecuteUpdateTask "
425: + proc.getTransactionId() + " ("
426: + proc.getUniqueKey() + ")";
427: else
428: return "CallableStatementExecuteUpdateTask for transaction "
429: + proc.getTransactionId()
430: + " ("
431: + proc.getUniqueKey() + ")";
432: }
433:
434: }
|