001: /**
002: * Sequoia: Database clustering technology.
003: * Copyright (C) 2002-2004 French National Institute For Research In Computer
004: * Science And Control (INRIA).
005: * Copyright (C) 2005-2006 Continuent, Inc.
006: * Contact: sequoia@continuent.org
007: *
008: * Licensed under the Apache License, Version 2.0 (the "License");
009: * you may not use this file except in compliance with the License.
010: * You may obtain a copy of the License at
011: *
012: * http://www.apache.org/licenses/LICENSE-2.0
013: *
014: * Unless required by applicable law or agreed to in writing, software
015: * distributed under the License is distributed on an "AS IS" BASIS,
016: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017: * See the License for the specific language governing permissions and
018: * limitations under the License.
019: *
020: * Initial developer(s): Emmanuel Cecchet.
021: * Contributor(s): Jaco Swart.
022: */package org.continuent.sequoia.controller.loadbalancer.tasks;
023:
024: import java.sql.Connection;
025: import java.sql.SQLException;
026: import java.util.HashMap;
027: import java.util.Map;
028:
029: import org.continuent.sequoia.common.exceptions.NoTransactionStartWhenDisablingException;
030: import org.continuent.sequoia.common.exceptions.UnreachableBackendException;
031: import org.continuent.sequoia.common.i18n.Translate;
032: import org.continuent.sequoia.common.log.Trace;
033: import org.continuent.sequoia.controller.backend.DatabaseBackend;
034: import org.continuent.sequoia.controller.backend.result.GeneratedKeysResult;
035: import org.continuent.sequoia.controller.cache.metadata.MetadataCache;
036: import org.continuent.sequoia.controller.connection.AbstractConnectionManager;
037: import org.continuent.sequoia.controller.connection.PooledConnection;
038: import org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer;
039: import org.continuent.sequoia.controller.loadbalancer.BackendWorkerThread;
040: import org.continuent.sequoia.controller.requests.AbstractRequest;
041: import org.continuent.sequoia.controller.requests.AbstractWriteRequest;
042:
043: /**
044: * Executes a <code>AbstractWriteRequest</code> statement and return the auto
045: * generated keys.
046: *
047: * @author <a href="mailto:Emmanuel.Cecchet@inria.fr">Emmanuel Cecchet </a>
048: * @author <a href="mailto:jaco.swart@iblocks.co.uk">Jaco Swart </a>
049: * @version 1.0
050: */
051: public class StatementExecuteUpdateWithKeysTask extends AbstractTask {
052: private AbstractWriteRequest request;
053: /**
054: * results : store results from all the backends
055: */
056: private Map results = null;
057: /**
058: * result : this is the result for the first backend to succeed
059: */
060: private GeneratedKeysResult result = null;
061: private MetadataCache metadataCache;
062:
063: static Trace endUserLogger = Trace
064: .getLogger("org.continuent.sequoia.enduser");
065:
066: /**
067: * Creates a new <code>StatementExecuteUpdateTask</code>.
068: *
069: * @param nbToComplete number of threads that must succeed before returning
070: * @param totalNb total number of threads
071: * @param request an <code>AbstractWriteRequest</code>
072: * @param metadataCache the metadataCache if any or null
073: */
074: public StatementExecuteUpdateWithKeysTask(int nbToComplete,
075: int totalNb, AbstractWriteRequest request,
076: MetadataCache metadataCache) {
077: super (nbToComplete, totalNb, request.isPersistentConnection(),
078: request.getPersistentConnectionId());
079: this .request = request;
080: this .metadataCache = metadataCache;
081: this .results = new HashMap();
082: }
083:
084: /**
085: * Executes a write request with the given backend thread.
086: *
087: * @param backendThread the backend thread that will execute the task
088: * @throws SQLException if an error occurs
089: */
090: public void executeTask(BackendWorkerThread backendThread)
091: throws SQLException {
092: DatabaseBackend backend = backendThread.getBackend();
093:
094: try {
095: if (!backend.getDriverCompliance()
096: .supportGetGeneratedKeys())
097: throw new SQLException(
098: Translate
099: .get(
100: "loadbalancer.backend.autogeneratedkeys.unsupported",
101: backend.getName()));
102:
103: AbstractConnectionManager cm = backend
104: .getConnectionManager(request.getLogin());
105: if (cm == null) {
106: SQLException se = new SQLException(
107: "No Connection Manager for Virtual Login:"
108: + request.getLogin());
109: try {
110: notifyFailure(backendThread, -1, se);
111: } catch (SQLException ignore) {
112:
113: }
114: throw se;
115: }
116:
117: Trace logger = backendThread.getLogger();
118: if (request.isAutoCommit())
119: executeInAutoCommit(backendThread, backend, cm, logger);
120: else
121: executeInTransaction(backendThread, backend, cm, logger);
122:
123: if (result == null)
124: return; // failure, already handled
125:
126: int resultOnFirstBackendToSucceed = notifySuccess(
127: backendThread, result.getUpdateCount());
128: if (results.get(backendThread) != null
129: && results.get(backendThread) instanceof GeneratedKeysResult
130: && resultOnFirstBackendToSucceed != ((GeneratedKeysResult) results
131: .get(backendThread)).getUpdateCount()) {
132: String msg = "Disabling backend "
133: + backend.getName()
134: + " that reports a different number of updated rows ("
135: + ((GeneratedKeysResult) results
136: .get(backendThread)).getUpdateCount()
137: + ") than first backend to succeed ("
138: + resultOnFirstBackendToSucceed
139: + ") for request " + request;
140: logger.error(msg);
141: // Disable this backend (it is no more in sync)
142: backendThread.getLoadBalancer().disableBackend(backend,
143: true);
144: endUserLogger.error(Translate.get(
145: "loadbalancer.backend.disabling", backend
146: .getName()));
147: throw new SQLException(msg);
148: }
149: } finally {
150: backend.getTaskQueues().completeWriteRequestExecution(this );
151: }
152: }
153:
154: private void executeInAutoCommit(BackendWorkerThread backendThread,
155: DatabaseBackend backend, AbstractConnectionManager cm,
156: Trace logger) throws SQLException {
157: if (!backend.canAcceptTasks(request)) {
158: // Backend is disabling, we do not play requests in autocommit, just
159: // notify
160: // the completion for the others
161: notifyCompletion(backendThread);
162: return;
163: }
164:
165: // Use a connection just for this request
166: PooledConnection c = null;
167: try {
168: c = cm.retrieveConnectionInAutoCommit(request);
169: } catch (UnreachableBackendException e1) {
170: SQLException se = new SQLException("Backend "
171: + backend.getName() + " is no more reachable.");
172: try {
173: notifyFailure(backendThread, -1, se);
174: } catch (SQLException ignore) {
175: }
176: // Disable this backend (it is no more in sync) by killing the backend
177: // thread
178: backendThread.getLoadBalancer().disableBackend(backend,
179: true);
180: String msg = Translate.get(
181: "loadbalancer.backend.disabling.unreachable",
182: backend.getName());
183: logger.error(msg);
184: endUserLogger.error(msg);
185: throw se;
186: }
187:
188: // Sanity check
189: if (c == null) {
190: SQLException se = new SQLException("No more connections");
191: try { // All backends failed, just ignore
192: if (!notifyFailure(backendThread,
193: request.getTimeout() * 1000L, se))
194: return;
195: } catch (SQLException ignore) {
196: }
197: // Disable this backend (it is no more in sync) by killing the backend
198: // thread
199: backendThread.getLoadBalancer().disableBackend(backend,
200: true);
201: String msg = "Request '"
202: + request.getSqlShortForm(backend
203: .getSqlShortFormLength())
204: + "' failed on backend " + backend.getName()
205: + " but " + getSuccess() + " succeeded (" + se
206: + ")";
207: logger.error(msg);
208: endUserLogger.error(Translate
209: .get("loadbalancer.backend.disabling", backend
210: .getName()));
211: throw new SQLException(msg);
212: }
213:
214: // Execute Query
215: try {
216:
217: GeneratedKeysResult tmpResult = AbstractLoadBalancer
218: .executeStatementExecuteUpdateWithKeysOnBackend(
219: request, backend, backendThread, c,
220: metadataCache);
221: synchronized (this ) {
222: if (result == null)
223: result = tmpResult;
224: results.put(backendThread, tmpResult);
225: }
226:
227: backend.updateDatabaseBackendSchema(request);
228: } catch (Exception e) {
229: try { // All backends failed, just ignore
230: if (!notifyFailure(backendThread,
231: request.getTimeout() * 1000L, e)) {
232: result = null;
233: return;
234: }
235: } catch (SQLException ignore) {
236: }
237: // Disable this backend (it is no more in sync) by killing the backend
238: // thread
239: backendThread.getLoadBalancer().disableBackend(backend,
240: true);
241: String msg = "Request '"
242: + request.getSqlShortForm(backend
243: .getSqlShortFormLength())
244: + "' failed on backend " + backend.getName()
245: + " but " + getSuccess() + " succeeded (" + e + ")";
246: logger.error(msg);
247: endUserLogger.error(Translate
248: .get("loadbalancer.backend.disabling", backend
249: .getName()));
250: throw new SQLException(msg);
251: } finally {
252: cm.releaseConnectionInAutoCommit(request, c);
253: }
254: }
255:
256: private void executeInTransaction(
257: BackendWorkerThread backendThread, DatabaseBackend backend,
258: AbstractConnectionManager cm, Trace logger)
259: throws SQLException {
260: // Re-use the connection used by this transaction
261: Connection c;
262: long tid = request.getTransactionId();
263:
264: try {
265: c = backend
266: .getConnectionForTransactionAndLazyBeginIfNeeded(
267: request, cm);
268: } catch (UnreachableBackendException ube) {
269: SQLException se = new SQLException("Backend "
270: + backend.getName() + " is no more reachable.");
271: try {
272: notifyFailure(backendThread, -1, se);
273: } catch (SQLException ignore) {
274: }
275: // Disable this backend (it is no more in sync) by killing the backend
276: // thread
277: backendThread.getLoadBalancer().disableBackend(backend,
278: true);
279: String msg = Translate.get(
280: "loadbalancer.backend.disabling.unreachable",
281: backend.getName());
282: logger.error(msg);
283: endUserLogger.error(msg);
284: throw se;
285: } catch (NoTransactionStartWhenDisablingException e) {
286: // Backend is disabling, we do not execute queries except the one in the
287: // transaction we already started. Just notify the completion for the
288: // others.
289: notifyCompletion(backendThread);
290: return;
291: } catch (SQLException e1) {
292: SQLException se = new SQLException(
293: "Unable to get connection for transaction " + tid);
294: try { // All backends failed, just ignore
295: if (!notifyFailure(backendThread,
296: request.getTimeout() * 1000L, se))
297: return;
298: } catch (SQLException ignore) {
299: }
300: // Disable this backend (it is no more in sync) by killing the
301: // backend thread
302: backendThread.getLoadBalancer().disableBackend(backend,
303: true);
304: String msg = "Request '"
305: + request.getSqlShortForm(backend
306: .getSqlShortFormLength())
307: + "' failed on backend " + backend.getName()
308: + " but " + getSuccess() + " succeeded (" + se
309: + ")";
310: logger.error(msg);
311: endUserLogger.error(Translate
312: .get("loadbalancer.backend.disabling", backend
313: .getName()));
314: throw new SQLException(msg);
315: }
316:
317: // Sanity check
318: if (c == null) { // Bad connection
319: SQLException se = new SQLException(
320: "Unable to retrieve connection for transaction "
321: + tid);
322: try { // All backends failed, just ignore
323: if (!notifyFailure(backendThread,
324: request.getTimeout() * 1000L, se))
325: return;
326: } catch (SQLException ignore) {
327: }
328: // Disable this backend (it is no more in sync) by killing the
329: // backend thread
330: backendThread.getLoadBalancer().disableBackend(backend,
331: true);
332: String msg = "Request '"
333: + request.getSqlShortForm(backend
334: .getSqlShortFormLength())
335: + "' failed on backend " + backend.getName()
336: + " but " + getSuccess() + " succeeded (" + se
337: + ")";
338: logger.error(msg);
339: endUserLogger.error(Translate
340: .get("loadbalancer.backend.disabling", backend
341: .getName()));
342: throw new SQLException(msg);
343: }
344:
345: // Execute Query
346: try {
347: GeneratedKeysResult tmpResult = AbstractLoadBalancer
348: .executeStatementExecuteUpdateWithKeysOnBackend(
349: request, backend, backendThread,
350: cm.retrieveConnectionForTransaction(tid),
351: metadataCache);
352: synchronized (this ) {
353: if (result == null)
354: result = tmpResult;
355: results.put(backendThread, tmpResult);
356: }
357:
358: backend.updateDatabaseBackendSchema(request);
359: } catch (Exception e) {
360: try { // All backends failed, just ignore
361: if (!notifyFailure(backendThread,
362: request.getTimeout() * 1000L, e)) {
363: result = null;
364: return;
365: }
366: } catch (SQLException ignore) {
367: }
368: // Disable this backend (it is no more in sync) by killing the backend
369: // thread
370: backendThread.getLoadBalancer().disableBackend(backend,
371: true);
372: String msg = "Request '"
373: + request.getSqlShortForm(backend
374: .getSqlShortFormLength())
375: + "' failed on backend " + backend.getName()
376: + " but " + getSuccess() + " succeeded (" + e + ")";
377: logger.error(msg);
378: endUserLogger.error(Translate
379: .get("loadbalancer.backend.disabling", backend
380: .getName()));
381: throw new SQLException(msg);
382: }
383: }
384:
385: /**
386: * @see org.continuent.sequoia.controller.loadbalancer.tasks.AbstractTask#getRequest()
387: */
388: public AbstractRequest getRequest() {
389: return request;
390: }
391:
392: /**
393: * Returns the update count and auto generated keys.
394: *
395: * @return a <code>GeneratedKeysResult</code> object
396: */
397: public GeneratedKeysResult getResult() {
398: return result;
399: }
400:
401: /**
402: * @see org.continuent.sequoia.controller.loadbalancer.tasks.AbstractTask#getTransactionId()
403: */
404: public long getTransactionId() {
405: return request.getTransactionId();
406: }
407:
408: /**
409: * @see org.continuent.sequoia.controller.loadbalancer.tasks.AbstractTask#isAutoCommit()
410: */
411: public boolean isAutoCommit() {
412: return request.isAutoCommit();
413: }
414:
415: /**
416: * @see java.lang.Object#equals(java.lang.Object)
417: */
418: public boolean equals(Object other) {
419: if ((other == null)
420: || !(other instanceof StatementExecuteUpdateWithKeysTask))
421: return false;
422:
423: StatementExecuteUpdateWithKeysTask seuwk = (StatementExecuteUpdateWithKeysTask) other;
424: return this .request.equals(seuwk.getRequest());
425: }
426:
427: /**
428: * @see java.lang.Object#hashCode()
429: */
430: public int hashCode() {
431: return (int) request.getId();
432: }
433:
434: /**
435: * @see java.lang.Object#toString()
436: */
437: public String toString() {
438: if (request.isAutoCommit())
439: return "Autocommit StatementExecuteUpdateWithKeysTask ("
440: + request.getUniqueKey() + ")";
441: else
442: return "StatementExecuteUpdateWithKeysTask from transaction "
443: + request.getTransactionId()
444: + " ("
445: + request.getUniqueKey() + ")";
446: }
447:
448: }
|