001: /**
002: * Sequoia: Database clustering technology.
003: * Copyright (C) 2002-2004 French National Institute For Research In Computer
004: * Science And Control (INRIA).
005: * Copyright (C) 2005 AmicoSoft, Inc. dba Emic Networks
006: * Copyright (C) 2005-2006 Continuent, Inc.
007: * Contact: sequoia@continuent.org
008: *
009: * Licensed under the Apache License, Version 2.0 (the "License");
010: * you may not use this file except in compliance with the License.
011: * You may obtain a copy of the License at
012: *
013: * http://www.apache.org/licenses/LICENSE-2.0
014: *
015: * Unless required by applicable law or agreed to in writing, software
016: * distributed under the License is distributed on an "AS IS" BASIS,
017: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018: * See the License for the specific language governing permissions and
019: * limitations under the License.
020: *
021: * Initial developer(s): Emmanuel Cecchet.
022: * Contributor(s): ______________________.
023: */package org.continuent.sequoia.controller.loadbalancer.tasks;
024:
025: import java.sql.Connection;
026: import java.sql.SQLException;
027:
028: import org.continuent.sequoia.common.exceptions.NoTransactionStartWhenDisablingException;
029: import org.continuent.sequoia.common.exceptions.UnreachableBackendException;
030: import org.continuent.sequoia.common.i18n.Translate;
031: import org.continuent.sequoia.common.log.Trace;
032: import org.continuent.sequoia.common.sql.schema.DatabaseProcedureSemantic;
033: import org.continuent.sequoia.controller.backend.DatabaseBackend;
034: import org.continuent.sequoia.controller.backend.result.ExecuteResult;
035: import org.continuent.sequoia.controller.cache.metadata.MetadataCache;
036: import org.continuent.sequoia.controller.connection.AbstractConnectionManager;
037: import org.continuent.sequoia.controller.connection.PooledConnection;
038: import org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer;
039: import org.continuent.sequoia.controller.loadbalancer.BackendWorkerThread;
040: import org.continuent.sequoia.controller.requests.AbstractRequest;
041: import org.continuent.sequoia.controller.requests.StoredProcedure;
042:
043: /**
044: * Executes a <code>StoredProcedure</code> call using
045: * CallableStatement.execute() and returns multiple results stored in an
046: * ExecuteResult object.
047: *
048: * @author <a href="mailto:Emmanuel.Cecchet@inria.fr">Emmanuel Cecchet </a>
049: * @version 1.0
050: */
051: public class CallableStatementExecuteTask extends AbstractTask {
052: private StoredProcedure proc;
053: private ExecuteResult result;
054: private MetadataCache metadataCache;
055:
056: static Trace endUserLogger = Trace
057: .getLogger("org.continuent.sequoia.enduser");
058:
059: /**
060: * Creates a new <code>CallableStatementExecuteQueryTask</code>.
061: *
062: * @param nbToComplete number of threads that must succeed before returning
063: * @param totalNb total number of threads
064: * @param proc the <code>StoredProcedure</code> to call
065: * @param metadataCache the metadataCache if any or null
066: */
067: public CallableStatementExecuteTask(int nbToComplete, int totalNb,
068: StoredProcedure proc, MetadataCache metadataCache) {
069: super (nbToComplete, totalNb, proc.isPersistentConnection(),
070: proc.getPersistentConnectionId());
071: this .proc = proc;
072: this .metadataCache = metadataCache;
073: }
074:
075: /**
076: * Call a stored procedure that returns a ResultSet on the given backend
077: * thread.
078: *
079: * @param backendThread the backend thread that will execute the task
080: * @throws SQLException if an error occurs
081: */
082: public void executeTask(BackendWorkerThread backendThread)
083: throws SQLException {
084: DatabaseBackend backend = backendThread.getBackend();
085:
086: try {
087: AbstractConnectionManager cm = backend
088: .getConnectionManager(proc.getLogin());
089: if (cm == null) {
090: SQLException se = new SQLException(
091: "No Connection Manager for Virtual Login:"
092: + proc.getLogin());
093: try {
094: notifyFailure(backendThread, -1, se);
095: } catch (SQLException ignore) {
096:
097: }
098: throw se;
099: }
100:
101: Trace logger = backendThread.getLogger();
102: if (proc.isAutoCommit())
103: executeInAutoCommit(backendThread, backend, cm, logger);
104: else
105: executeInTransaction(backendThread, backend, cm, logger);
106:
107: if (result != null)
108: notifySuccess(backendThread);
109: } finally {
110: backend.getTaskQueues().completeStoredProcedureExecution(
111: this );
112: }
113: }
114:
115: private void executeInAutoCommit(BackendWorkerThread backendThread,
116: DatabaseBackend backend, AbstractConnectionManager cm,
117: Trace logger) throws SQLException {
118: if (!backend.canAcceptTasks(proc)) {
119: // Backend is disabling, we do not execute queries except the one in the
120: // transaction we already started. Just notify the completion for the
121: // others.
122: notifyCompletion(backendThread);
123: return;
124: }
125:
126: // Use a connection just for this request
127: PooledConnection c = null;
128: try {
129: c = cm.retrieveConnectionInAutoCommit(proc);
130: } catch (UnreachableBackendException e1) {
131: SQLException se = new SQLException("Backend "
132: + backend.getName() + " is no more reachable.");
133: try {
134: notifyFailure(backendThread, -1, se);
135: } catch (SQLException ignore) {
136: }
137: // Disable this backend (it is no more in sync) by killing the backend
138: // thread
139: backendThread.getLoadBalancer().disableBackend(backend,
140: true);
141: String msg = Translate.get(
142: "loadbalancer.backend.disabling.unreachable",
143: backend.getName());
144: logger.error(msg);
145: endUserLogger.error(msg);
146: throw se;
147: }
148:
149: // Sanity check
150: if (c == null) {
151: SQLException se = new SQLException("No more connections");
152: try { // All backends failed, just ignore
153: if (!notifyFailure(backendThread,
154: proc.getTimeout() * 1000L, se))
155: return;
156: } catch (SQLException ignore) {
157: }
158: // Disable this backend (it is no more in sync) by killing the backend
159: // thread
160: backendThread.getLoadBalancer().disableBackend(backend,
161: true);
162: String msg = "Stored procedure '"
163: + proc.getSqlShortForm(backend
164: .getSqlShortFormLength())
165: + "' failed on backend " + backend.getName()
166: + " but " + getSuccess() + " succeeded (" + se
167: + ")";
168: logger.error(msg);
169: endUserLogger.error(Translate
170: .get("loadbalancer.backend.disabling", backend
171: .getName()));
172: throw new SQLException(msg);
173: }
174:
175: // Execute Query
176: try {
177: result = AbstractLoadBalancer
178: .executeCallableStatementExecuteOnBackend(proc,
179: backend, backendThread, c, metadataCache);
180:
181: DatabaseProcedureSemantic semantic = proc.getSemantic();
182: if ((semantic == null) || semantic.hasDDLWrite())
183: backend.setSchemaIsDirty(true, proc);
184: } catch (Exception e) {
185: try { // All backends failed, just ignore
186: if (!notifyFailure(backendThread,
187: proc.getTimeout() * 1000L, e)) {
188: result = null;
189: return;
190: }
191: } catch (SQLException ignore) {
192: }
193: // Disable this backend (it is no more in sync) by killing the backend
194: // thread
195: backendThread.getLoadBalancer().disableBackend(backend,
196: true);
197: String msg = "Stored procedure '"
198: + proc.getSqlShortForm(backend
199: .getSqlShortFormLength())
200: + "' failed on backend " + backend.getName()
201: + " but " + getSuccess() + " succeeded (" + e + ")";
202: logger.error(msg);
203: endUserLogger.error(Translate
204: .get("loadbalancer.backend.disabling", backend
205: .getName()));
206: throw new SQLException(msg);
207: } finally {
208: cm.releaseConnectionInAutoCommit(proc, c);
209: }
210: }
211:
212: private void executeInTransaction(
213: BackendWorkerThread backendThread, DatabaseBackend backend,
214: AbstractConnectionManager cm, Trace logger)
215: throws SQLException {
216: // Re-use the connection used by this transaction
217: Connection c;
218: long tid = proc.getTransactionId();
219:
220: try {
221: c = backend
222: .getConnectionForTransactionAndLazyBeginIfNeeded(
223: proc, cm);
224: } catch (UnreachableBackendException ube) {
225: SQLException se = new SQLException("Backend "
226: + backend.getName() + " is no more reachable.");
227: try {
228: notifyFailure(backendThread, -1, se);
229: } catch (SQLException ignore) {
230: }
231: // Disable this backend (it is no more in sync) by killing the backend
232: // thread
233: backendThread.getLoadBalancer().disableBackend(backend,
234: true);
235: logger.error("Disabling backend " + backend.getName()
236: + " because it is no more reachable.");
237: endUserLogger.error(Translate.get(
238: "loadbalancer.backend.disabling.unreachable",
239: backend.getName()));
240: throw se;
241: } catch (NoTransactionStartWhenDisablingException e) {
242: // Backend is disabling, we do not execute queries except the one in the
243: // transaction we already started. Just notify the completion for the
244: // others.
245: notifyCompletion(backendThread);
246: return;
247: } catch (SQLException e1) {
248: SQLException se = new SQLException(
249: "Unable to get connection for transaction " + tid);
250: try { // All backends failed, just ignore
251: if (!notifyFailure(backendThread,
252: proc.getTimeout() * 1000L, se))
253: return;
254: } catch (SQLException ignore) {
255: }
256: // Disable this backend (it is no more in sync) by killing the
257: // backend thread
258: backendThread.getLoadBalancer().disableBackend(backend,
259: true);
260: String msg = "Request '"
261: + proc.getSqlShortForm(backend
262: .getSqlShortFormLength())
263: + "' failed on backend " + backend.getName()
264: + " but " + getSuccess() + " succeeded (" + se
265: + ")";
266: logger.error(msg);
267: endUserLogger.error(Translate
268: .get("loadbalancer.backend.disabling", backend
269: .getName()));
270: throw new SQLException(msg);
271: }
272:
273: // Sanity check
274: if (c == null) { // Bad connection
275: SQLException se = new SQLException(
276: "Unable to retrieve connection for transaction "
277: + tid);
278: try { // All backends failed, just ignore
279: if (!notifyFailure(backendThread,
280: proc.getTimeout() * 1000L, se))
281: return;
282: } catch (SQLException ignore) {
283: }
284: // Disable this backend (it is no more in sync) by killing the
285: // backend thread
286: backendThread.getLoadBalancer().disableBackend(backend,
287: true);
288: String msg = "Request '"
289: + proc.getSqlShortForm(backend
290: .getSqlShortFormLength())
291: + "' failed on backend " + backend.getName()
292: + " but " + getSuccess() + " succeeded (" + se
293: + ")";
294: logger.error(msg);
295: endUserLogger.error(Translate
296: .get("loadbalancer.backend.disabling", backend
297: .getName()));
298: throw new SQLException(msg);
299: }
300:
301: // Execute Query
302: try {
303: result = AbstractLoadBalancer
304: .executeCallableStatementExecuteOnBackend(proc,
305: backend, backendThread,
306: cm.retrieveConnectionForTransaction(tid),
307: metadataCache);
308:
309: DatabaseProcedureSemantic semantic = proc.getSemantic();
310: if ((semantic == null) || semantic.hasDDLWrite())
311: backend.setSchemaIsDirty(true, proc);
312: } catch (Exception e) {
313: try { // All backends failed, just ignore
314: if (!notifyFailure(backendThread,
315: proc.getTimeout() * 1000L, e)) {
316: result = null;
317: return;
318: }
319: } catch (SQLException ignore) {
320: }
321: // Disable this backend (it is no more in sync) by killing the backend
322: // thread
323: backendThread.getLoadBalancer().disableBackend(backend,
324: true);
325: String msg = "Stored procedure '"
326: + proc.getSqlShortForm(backend
327: .getSqlShortFormLength())
328: + "' failed on backend " + backend.getName()
329: + " but " + getSuccess() + " succeeded (" + e + ")";
330: logger.error(msg);
331: endUserLogger.error(Translate
332: .get("loadbalancer.backend.disabling", backend
333: .getName()));
334: throw new SQLException(msg);
335: }
336: }
337:
338: /**
339: * @see org.continuent.sequoia.controller.loadbalancer.tasks.AbstractTask#getRequest()
340: */
341: public AbstractRequest getRequest() {
342: return proc;
343: }
344:
345: /**
346: * Returns the results.
347: *
348: * @return an <code>ExecuteResult</code> object
349: */
350: public ExecuteResult getResult() {
351: return result;
352: }
353:
354: /**
355: * @see org.continuent.sequoia.controller.loadbalancer.tasks.AbstractTask#getTransactionId()
356: */
357: public long getTransactionId() {
358: return proc.getTransactionId();
359: }
360:
361: /**
362: * @see org.continuent.sequoia.controller.loadbalancer.tasks.AbstractTask#isAutoCommit()
363: */
364: public boolean isAutoCommit() {
365: return proc.isAutoCommit();
366: }
367:
368: /**
369: * @see java.lang.Object#equals(java.lang.Object)
370: */
371: public boolean equals(Object other) {
372: if ((other == null)
373: || !(other instanceof CallableStatementExecuteTask))
374: return false;
375:
376: CallableStatementExecuteTask cset = (CallableStatementExecuteTask) other;
377: if (proc == null)
378: return cset.getRequest() == null;
379: return proc.equals(cset.getRequest());
380: }
381:
382: /**
383: * @see java.lang.Object#hashCode()
384: */
385: public int hashCode() {
386: return (int) proc.getId();
387: }
388:
389: /**
390: * @see java.lang.Object#toString()
391: */
392: public String toString() {
393: if (proc.isAutoCommit())
394: return "Autocommit CallableStatementExecuteTask "
395: + proc.getTransactionId() + " ("
396: + proc.getUniqueKey() + ")";
397: else
398: return "CallableStatementExecuteTask for transaction "
399: + proc.getTransactionId() + " ("
400: + proc.getUniqueKey() + ")";
401: }
402:
403: }
|