001: /**
002: * Sequoia: Database clustering technology.
003: * Copyright (C) 2002-2004 French National Institute For Research In Computer
004: * Science And Control (INRIA).
005: * Copyright (C) 2005 AmicoSoft, Inc. dba Emic Networks
006: * Copyright (C) 2005-2006 Continuent, Inc.
007: * Contact: sequoia@continuent.org
008: *
009: * Licensed under the Apache License, Version 2.0 (the "License");
010: * you may not use this file except in compliance with the License.
011: * You may obtain a copy of the License at
012: *
013: * http://www.apache.org/licenses/LICENSE-2.0
014: *
015: * Unless required by applicable law or agreed to in writing, software
016: * distributed under the License is distributed on an "AS IS" BASIS,
017: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018: * See the License for the specific language governing permissions and
019: * limitations under the License.
020: *
021: * Initial developer(s): Emmanuel Cecchet.
022: * Contributor(s): ______________________.
023: */package org.continuent.sequoia.controller.loadbalancer.tasks;
024:
025: import java.sql.Connection;
026: import java.sql.SQLException;
027:
028: import org.continuent.sequoia.common.exceptions.NoTransactionStartWhenDisablingException;
029: import org.continuent.sequoia.common.exceptions.UnreachableBackendException;
030: import org.continuent.sequoia.common.i18n.Translate;
031: import org.continuent.sequoia.common.log.Trace;
032: import org.continuent.sequoia.common.sql.schema.DatabaseProcedureSemantic;
033: import org.continuent.sequoia.controller.backend.DatabaseBackend;
034: import org.continuent.sequoia.controller.backend.result.ControllerResultSet;
035: import org.continuent.sequoia.controller.cache.metadata.MetadataCache;
036: import org.continuent.sequoia.controller.connection.AbstractConnectionManager;
037: import org.continuent.sequoia.controller.connection.PooledConnection;
038: import org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer;
039: import org.continuent.sequoia.controller.loadbalancer.BackendWorkerThread;
040: import org.continuent.sequoia.controller.requests.AbstractRequest;
041: import org.continuent.sequoia.controller.requests.StoredProcedure;
042:
043: /**
044: * Executes a <code>StoredProcedure</code> call using
045: * CallableStatement.executeQuery() and returns a ResultSet.
046: *
047: * @author <a href="mailto:Emmanuel.Cecchet@inria.fr">Emmanuel Cecchet </a>
048: * @version 1.0
049: */
050: public class CallableStatementExecuteQueryTask extends AbstractTask {
051: private StoredProcedure proc;
052: private ControllerResultSet result = null;
053: private MetadataCache metadataCache;
054:
055: static Trace endUserLogger = Trace
056: .getLogger("org.continuent.sequoia.enduser");
057:
058: /**
059: * Creates a new <code>CallableStatementExecuteQueryTask</code>.
060: *
061: * @param nbToComplete number of threads that must succeed before returning
062: * @param totalNb total number of threads
063: * @param proc the <code>StoredProcedure</code> to call
064: * @param metadataCache the metadataCache if any or null
065: */
066: public CallableStatementExecuteQueryTask(int nbToComplete,
067: int totalNb, StoredProcedure proc,
068: MetadataCache metadataCache) {
069: super (nbToComplete, totalNb, proc.isPersistentConnection(),
070: proc.getPersistentConnectionId());
071: this .proc = proc;
072: this .metadataCache = metadataCache;
073: }
074:
075: /**
076: * Call a stored procedure that returns a ResultSet on the given backend
077: * thread.
078: *
079: * @param backendThread the backend thread that will execute the task
080: * @throws SQLException if an error occurs
081: */
082: public void executeTask(BackendWorkerThread backendThread)
083: throws SQLException {
084: DatabaseBackend backend = backendThread.getBackend();
085:
086: try {
087: AbstractConnectionManager cm = backend
088: .getConnectionManager(proc.getLogin());
089: if (cm == null) {
090: SQLException se = new SQLException(
091: "No Connection Manager for Virtual Login:"
092: + proc.getLogin());
093: try {
094: notifyFailure(backendThread, -1, se);
095: } catch (SQLException ignore) {
096:
097: }
098: throw se;
099: }
100:
101: Trace logger = backendThread.getLogger();
102: if (proc.isAutoCommit())
103: executeInAutoCommit(backendThread, backend, cm, logger);
104: else
105: executeInTransaction(backendThread, backend, cm, logger);
106:
107: if (result != null)
108: notifySuccess(backendThread);
109: } finally {
110: backend.getTaskQueues().completeStoredProcedureExecution(
111: this );
112: }
113: }
114:
115: private void executeInAutoCommit(BackendWorkerThread backendThread,
116: DatabaseBackend backend, AbstractConnectionManager cm,
117: Trace logger) throws SQLException {
118: if (!backend.canAcceptTasks(proc)) {
119: // Backend is disabling, we do not execute queries except the one in
120: // the
121: // transaction we already started. Just notify the completion for the
122: // others.
123: notifyCompletion(backendThread);
124: return;
125: }
126:
127: // Use a connection just for this request
128: PooledConnection c = null;
129: try {
130: c = cm.retrieveConnectionInAutoCommit(proc);
131: } catch (UnreachableBackendException e1) {
132: SQLException se = new SQLException("Backend "
133: + backend.getName() + " is no more reachable.");
134: try {
135: notifyFailure(backendThread, -1, se);
136: } catch (SQLException ignore) {
137: }
138: // Disable this backend (it is no more in sync) by killing the backend
139: // thread
140: backendThread.getLoadBalancer().disableBackend(backend,
141: true);
142: String msg = Translate.get(
143: "loadbalancer.backend.disabling.unreachable",
144: backend.getName());
145: logger.error(msg);
146: endUserLogger.error(msg);
147: throw se;
148: }
149:
150: // Sanity check
151: if (c == null) {
152: SQLException se = new SQLException("No more connections");
153: try { // All backends failed, just ignore
154: if (!notifyFailure(backendThread,
155: proc.getTimeout() * 1000L, se))
156: return;
157: } catch (SQLException ignore) {
158: }
159: // Disable this backend (it is no more in sync) by killing the backend
160: // thread
161: backendThread.getLoadBalancer().disableBackend(backend,
162: true);
163: String msg = "Stored procedure '"
164: + proc.getSqlShortForm(backend
165: .getSqlShortFormLength())
166: + "' failed on backend " + backend.getName()
167: + " but " + getSuccess() + " succeeded (" + se
168: + ")";
169: logger.error(msg);
170: endUserLogger.error(Translate
171: .get("loadbalancer.backend.disabling", backend
172: .getName()));
173: throw new SQLException(msg);
174: }
175:
176: // Execute Query
177: try {
178: result = AbstractLoadBalancer
179: .executeCallableStatementExecuteQueryOnBackend(
180: proc, backend, backendThread, c
181: .getConnection(), metadataCache);
182:
183: DatabaseProcedureSemantic semantic = proc.getSemantic();
184: if ((semantic == null) || semantic.hasDDLWrite())
185: backend.setSchemaIsDirty(true, proc);
186: } catch (Exception e) {
187: try { // All backends failed, just ignore
188: if (!notifyFailure(backendThread,
189: proc.getTimeout() * 1000L, e)) {
190: result = null;
191: return;
192: }
193: } catch (SQLException ignore) {
194: }
195: // Disable this backend (it is no more in sync) by killing the backend
196: // thread
197: backendThread.getLoadBalancer().disableBackend(backend,
198: true);
199: String msg = "Stored procedure '"
200: + proc.getSqlShortForm(backend
201: .getSqlShortFormLength())
202: + "' failed on backend " + backend.getName()
203: + " but " + getSuccess() + " succeeded (" + e + ")";
204: logger.error(msg);
205: endUserLogger.error(Translate
206: .get("loadbalancer.backend.disabling", backend
207: .getName()));
208: throw new SQLException(msg);
209: } finally {
210: cm.releaseConnectionInAutoCommit(proc, c);
211: }
212: }
213:
214: private void executeInTransaction(
215: BackendWorkerThread backendThread, DatabaseBackend backend,
216: AbstractConnectionManager cm, Trace logger)
217: throws SQLException {
218: // Re-use the connection used by this transaction
219: Connection c;
220: long tid = proc.getTransactionId();
221:
222: try {
223: c = backend
224: .getConnectionForTransactionAndLazyBeginIfNeeded(
225: proc, cm);
226: } catch (UnreachableBackendException ube) {
227: SQLException se = new SQLException("Backend "
228: + backend.getName() + " is no more reachable.");
229: try {
230: notifyFailure(backendThread, -1, se);
231: } catch (SQLException ignore) {
232: }
233: // Disable this backend (it is no more in sync) by killing the backend
234: // thread
235: backendThread.getLoadBalancer().disableBackend(backend,
236: true);
237: String msg = Translate.get(
238: "loadbalancer.backend.disabling.unreachable",
239: backend.getName());
240: logger.error(msg);
241: endUserLogger.error(msg);
242: throw se;
243: } catch (NoTransactionStartWhenDisablingException e) {
244: // Backend is disabling, we do not execute queries except the one in
245: // the
246: // transaction we already started. Just notify the completion for the
247: // others.
248: notifyCompletion(backendThread);
249: return;
250: } catch (SQLException e1) {
251: SQLException se = new SQLException(
252: "Unable to get connection for transaction " + tid);
253: try { // All backends failed, just ignore
254: if (!notifyFailure(backendThread,
255: proc.getTimeout() * 1000L, se))
256: return;
257: } catch (SQLException ignore) {
258: }
259: // Disable this backend (it is no more in sync) by killing the
260: // backend thread
261: backendThread.getLoadBalancer().disableBackend(backend,
262: true);
263: String msg = "Request '"
264: + proc.getSqlShortForm(backend
265: .getSqlShortFormLength())
266: + "' failed on backend " + backend.getName()
267: + " but " + getSuccess() + " succeeded (" + se
268: + ")";
269: logger.error(msg);
270: endUserLogger.error(Translate
271: .get("loadbalancer.backend.disabling", backend
272: .getName()));
273: throw new SQLException(msg);
274: }
275:
276: // Sanity check
277: if (c == null) { // Bad connection
278: SQLException se = new SQLException(
279: "Unable to retrieve connection for transaction "
280: + tid);
281: try { // All backends failed, just ignore
282: if (!notifyFailure(backendThread,
283: proc.getTimeout() * 1000L, se))
284: return;
285: } catch (SQLException ignore) {
286: }
287: // Disable this backend (it is no more in sync) by killing the
288: // backend thread
289: backendThread.getLoadBalancer().disableBackend(backend,
290: true);
291: String msg = "Request '"
292: + proc.getSqlShortForm(backend
293: .getSqlShortFormLength())
294: + "' failed on backend " + backend.getName()
295: + " but " + getSuccess() + " succeeded (" + se
296: + ")";
297: logger.error(msg);
298: endUserLogger.error(Translate
299: .get("loadbalancer.backend.disabling", backend
300: .getName()));
301: throw new SQLException(msg);
302: }
303:
304: // Execute Query
305: try {
306: result = AbstractLoadBalancer
307: .executeCallableStatementExecuteQueryOnBackend(
308: proc, backend, backendThread, c,
309: metadataCache);
310:
311: DatabaseProcedureSemantic semantic = proc.getSemantic();
312: if ((semantic == null) || semantic.hasDDLWrite())
313: backend.setSchemaIsDirty(true, proc);
314: } catch (Exception e) {
315: try { // All backends failed, just ignore
316: if (!notifyFailure(backendThread,
317: proc.getTimeout() * 1000L, e)) {
318: result = null;
319: return;
320: }
321: } catch (SQLException ignore) {
322: }
323: // Disable this backend (it is no more in sync) by killing the backend
324: // thread
325: backendThread.getLoadBalancer().disableBackend(backend,
326: true);
327: String msg = "Stored procedure '"
328: + proc.getSqlShortForm(backend
329: .getSqlShortFormLength())
330: + "' failed on backend " + backend.getName()
331: + " but " + getSuccess() + " succeeded (" + e + ")";
332: logger.error(msg);
333: endUserLogger.error(Translate
334: .get("loadbalancer.backend.disabling", backend
335: .getName()));
336: throw new SQLException(msg);
337: }
338: }
339:
340: /**
341: * @see org.continuent.sequoia.controller.loadbalancer.tasks.AbstractTask#getRequest()
342: */
343: public AbstractRequest getRequest() {
344: return proc;
345: }
346:
347: /**
348: * Returns the result.
349: *
350: * @return a <code>ResultSet</code>
351: */
352: public ControllerResultSet getResult() {
353: return result;
354: }
355:
356: /**
357: * @see org.continuent.sequoia.controller.loadbalancer.tasks.AbstractTask#getTransactionId()
358: */
359: public long getTransactionId() {
360: return proc.getTransactionId();
361: }
362:
363: /**
364: * @see org.continuent.sequoia.controller.loadbalancer.tasks.AbstractTask#isAutoCommit()
365: */
366: public boolean isAutoCommit() {
367: return proc.isAutoCommit();
368: }
369:
370: /**
371: * @see java.lang.Object#equals(java.lang.Object)
372: */
373: public boolean equals(Object other) {
374: if ((other == null)
375: || !(other instanceof CallableStatementExecuteQueryTask))
376: return false;
377:
378: CallableStatementExecuteQueryTask cseqt = (CallableStatementExecuteQueryTask) other;
379: if (proc == null)
380: return cseqt.getRequest() == null;
381: return proc.equals(cseqt.getRequest());
382: }
383:
384: /**
385: * @see java.lang.Object#hashCode()
386: */
387: public int hashCode() {
388: return (int) proc.getId();
389: }
390:
391: /**
392: * @see java.lang.Object#toString()
393: */
394: public String toString() {
395: if (proc.isAutoCommit())
396: return "Autocommit CallableStatementExecuteQueryTask "
397: + proc.getTransactionId() + " ("
398: + proc.getUniqueKey() + ")";
399: else
400: return "CallableStatementExecuteQueryTask for transaction "
401: + proc.getTransactionId() + " ("
402: + proc.getUniqueKey() + ")";
403: }
404:
405: }
|