001: /**
002: * Sequoia: Database clustering technology.
003: * Copyright (C) 2002-2004 French National Institute For Research In Computer
004: * Science And Control (INRIA).
005: * Copyright (C) 2005 AmicoSoft, Inc. dba Emic Networks
006: * Copyright (C) 2005-2006 Continuent, Inc.
007: * Contact: sequoia@continuent.org
008: *
009: * Licensed under the Apache License, Version 2.0 (the "License");
010: * you may not use this file except in compliance with the License.
011: * You may obtain a copy of the License at
012: *
013: * http://www.apache.org/licenses/LICENSE-2.0
014: *
015: * Unless required by applicable law or agreed to in writing, software
016: * distributed under the License is distributed on an "AS IS" BASIS,
017: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018: * See the License for the specific language governing permissions and
019: * limitations under the License.
020: *
021: * Initial developer(s): Emmanuel Cecchet.
022: * Contributor(s): Julie Marguerite, Jaco Swart.
023: */package org.continuent.sequoia.controller.loadbalancer.tasks;
024:
025: import java.sql.Connection;
026: import java.sql.SQLException;
027: import java.util.HashMap;
028: import java.util.Map;
029:
030: import org.continuent.sequoia.common.exceptions.NoTransactionStartWhenDisablingException;
031: import org.continuent.sequoia.common.exceptions.UnreachableBackendException;
032: import org.continuent.sequoia.common.i18n.Translate;
033: import org.continuent.sequoia.common.log.Trace;
034: import org.continuent.sequoia.controller.backend.DatabaseBackend;
035: import org.continuent.sequoia.controller.backend.result.ExecuteUpdateResult;
036: import org.continuent.sequoia.controller.connection.AbstractConnectionManager;
037: import org.continuent.sequoia.controller.connection.PooledConnection;
038: import org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer;
039: import org.continuent.sequoia.controller.loadbalancer.BackendWorkerThread;
040: import org.continuent.sequoia.controller.requests.AbstractRequest;
041: import org.continuent.sequoia.controller.requests.AbstractWriteRequest;
042:
043: /**
044: * Executes an <code>AbstractWriteRequest</code> statement.
045: *
046: * @author <a href="mailto:Emmanuel.Cecchet@inria.fr">Emmanuel Cecchet </a>
047: * @author <a href="mailto:Julie.Marguerite@inria.fr">Julie Marguerite </a>
048: * @author <a href="mailto:jaco.swart@iblocks.co.uk">Jaco Swart </a>
049: * @version 1.0
050: */
051: public class StatementExecuteUpdateTask extends AbstractTask {
052: private AbstractWriteRequest request;
053:
054: /**
055: * results : store results from all the backends
056: */
057: private Map results = null;
058: /**
059: * result : this is the result for the first backend to succeed
060: */
061: private ExecuteUpdateResult result = null;
062:
063: static Trace endUserLogger = Trace
064: .getLogger("org.continuent.sequoia.enduser");
065:
066: /**
067: * Creates a new <code>StatementExecuteUpdateTask</code>.
068: *
069: * @param nbToComplete number of threads that must succeed before returning
070: * @param totalNb total number of threads
071: * @param request an <code>AbstractWriteRequest</code>
072: */
073: public StatementExecuteUpdateTask(int nbToComplete, int totalNb,
074: AbstractWriteRequest request) {
075: super (nbToComplete, totalNb, request.isPersistentConnection(),
076: request.getPersistentConnectionId());
077: this .request = request;
078: this .results = new HashMap();
079: }
080:
081: /**
082: * Executes a write request with the given backend thread.
083: *
084: * @param backendThread the backend thread that will execute the task
085: * @throws SQLException if an error occurs
086: */
087: public void executeTask(BackendWorkerThread backendThread)
088: throws SQLException {
089: DatabaseBackend backend = backendThread.getBackend();
090:
091: try {
092: AbstractConnectionManager cm = backend
093: .getConnectionManager(request.getLogin());
094: if (cm == null) {
095: SQLException se = new SQLException(
096: "No Connection Manager for Virtual Login:"
097: + request.getLogin());
098: try {
099: notifyFailure(backendThread, -1, se);
100: } catch (SQLException ignore) {
101: }
102: throw se;
103: }
104:
105: Trace logger = backendThread.getLogger();
106: if (request.isAutoCommit())
107: executeInAutoCommit(backendThread, backend, cm, logger);
108: else
109: executeInTransaction(backendThread, backend, cm, logger);
110:
111: /*
112: * If the backend is disabling, no result is retrieved. The notification
113: * has alreay been handled.
114: */
115: if (result != null) {
116: int resultOnFirstBackendToSucceed = notifySuccess(
117: backendThread, result.getUpdateCount());
118: if (results.get(backendThread) != null
119: && results.get(backendThread) instanceof ExecuteUpdateResult
120: && resultOnFirstBackendToSucceed != ((ExecuteUpdateResult) results
121: .get(backendThread)).getUpdateCount()) {
122: String msg = "Disabling backend "
123: + backend.getName()
124: + " that reports a different number of updated rows ("
125: + ((ExecuteUpdateResult) results
126: .get(backendThread))
127: .getUpdateCount()
128: + ") than first backend to succeed ("
129: + resultOnFirstBackendToSucceed
130: + ") for request " + request;
131: logger.error(msg);
132: // Disable this backend (it is no more in sync)
133: backendThread.getLoadBalancer().disableBackend(
134: backend, true);
135: endUserLogger.error(Translate.get(
136: "loadbalancer.backend.disabling", backend
137: .getName()));
138: throw new SQLException(msg);
139: }
140: }
141: } finally {
142: backend.getTaskQueues().completeWriteRequestExecution(this );
143: }
144: }
145:
146: private void executeInAutoCommit(BackendWorkerThread backendThread,
147: DatabaseBackend backend, AbstractConnectionManager cm,
148: Trace logger) throws SQLException {
149: if (!backend.canAcceptTasks(request)) {
150: // Backend is disabling, we do not execute queries except the one in
151: // the
152: // transaction we already started. Just notify the completion for the
153: // others.
154: notifyCompletion(backendThread);
155: return;
156: }
157:
158: // Use a connection just for this request
159: PooledConnection c = null;
160: try {
161: c = cm.retrieveConnectionInAutoCommit(request);
162: } catch (UnreachableBackendException e1) {
163: SQLException se = new SQLException("Backend "
164: + backend.getName() + " is no more reachable.");
165: try {
166: notifyFailure(backendThread, -1, se);
167: } catch (SQLException ignore) {
168: }
169: // Disable this backend (it is no more in sync) by killing the backend
170: // thread
171:
172: backendThread.getLoadBalancer().disableBackend(backend,
173: true);
174: String msg = Translate.get(
175: "loadbalancer.backend.disabling.unreachable",
176: backend.getName());
177: logger.error(msg);
178: endUserLogger.error(msg);
179: throw se;
180: }
181:
182: // Sanity check
183: if (c == null) {
184: SQLException se = new SQLException("No more connections");
185: try { // All backends failed, just ignore
186: if (!notifyFailure(backendThread,
187: request.getTimeout() * 1000L, se))
188: return;
189: } catch (SQLException ignore) {
190: }
191: // Disable this backend (it is no more in sync) by killing the backend
192: // thread
193: backendThread.getLoadBalancer().disableBackend(backend,
194: true);
195: String msg = "Request '"
196: + request.getSqlShortForm(backend
197: .getSqlShortFormLength())
198: + "' failed on backend " + backend.getName()
199: + " but " + getSuccess() + " succeeded (" + se
200: + ")";
201: logger.error(msg);
202: endUserLogger.error(Translate
203: .get("loadbalancer.backend.disabling", backend
204: .getName()));
205: throw new SQLException(msg);
206: }
207:
208: // Execute Query
209: try {
210: ExecuteUpdateResult tmpResult = AbstractLoadBalancer
211: .executeStatementExecuteUpdateOnBackend(request,
212: backend, backendThread, c);
213: synchronized (this ) {
214: if (result == null)
215: result = tmpResult;
216: results.put(backendThread, tmpResult);
217: }
218: backend.updateDatabaseBackendSchema(request);
219: } catch (Exception e) {
220: try { // All backends failed, just ignore
221: if (!notifyFailure(backendThread,
222: request.getTimeout() * 1000L, e)) {
223: result = null;
224: return;
225: }
226: } catch (SQLException ignore) {
227: }
228: // Disable this backend (it is no more in sync) by killing the backend
229: // thread
230: backendThread.getLoadBalancer().disableBackend(backend,
231: true);
232: String msg = "Request '"
233: + request.getSqlShortForm(backend
234: .getSqlShortFormLength())
235: + "' failed on backend " + backend.getName()
236: + " but " + getSuccess() + " succeeded (" + e + ")";
237:
238: if (logger.isDebugEnabled())
239: logger.debug(msg, e);
240: else
241: logger.error(msg);
242: endUserLogger.error(Translate
243: .get("loadbalancer.backend.disabling", backend
244: .getName()));
245: throw new SQLException(msg);
246: } finally {
247: cm.releaseConnectionInAutoCommit(request, c);
248: }
249: }
250:
251: private void executeInTransaction(
252: BackendWorkerThread backendThread, DatabaseBackend backend,
253: AbstractConnectionManager cm, Trace logger)
254: throws SQLException {
255: // Re-use the connection used by this transaction
256: Connection c;
257: long tid = request.getTransactionId();
258:
259: try {
260: c = backend
261: .getConnectionForTransactionAndLazyBeginIfNeeded(
262: request, cm);
263: } catch (UnreachableBackendException ube) {
264: SQLException se = new SQLException("Backend "
265: + backend.getName() + " is no more reachable.");
266: try {
267: notifyFailure(backendThread, -1, se);
268: } catch (SQLException ignore) {
269: }
270: // Disable this backend (it is no more in sync) by killing the backend
271: // thread
272: backendThread.getLoadBalancer().disableBackend(backend,
273: true);
274: String msg = Translate.get(
275: "loadbalancer.backend.disabling.unreachable",
276: backend.getName());
277: logger.error(msg);
278: endUserLogger.error(msg);
279: throw se;
280: } catch (NoTransactionStartWhenDisablingException e) {
281: // Backend is disabling, we do not execute queries except the one in
282: // the
283: // transaction we already started. Just notify the completion for the
284: // others.
285: notifyCompletion(backendThread);
286: return;
287: } catch (SQLException e1) {
288: SQLException se = new SQLException(
289: "Unable to get connection for transaction " + tid);
290: try { // All backends failed, just ignore
291: if (!notifyFailure(backendThread,
292: request.getTimeout() * 1000L, se))
293: return;
294: } catch (SQLException ignore) {
295: }
296: // Disable this backend (it is no more in sync) by killing the
297: // backend thread
298: backendThread.getLoadBalancer().disableBackend(backend,
299: true);
300: String msg = "Request '"
301: + request.getSqlShortForm(backend
302: .getSqlShortFormLength())
303: + "' failed on backend " + backend.getName()
304: + " but " + getSuccess() + " succeeded (" + se
305: + ")";
306: logger.error(msg);
307: endUserLogger.error(Translate
308: .get("loadbalancer.backend.disabling", backend
309: .getName()));
310: throw new SQLException(msg);
311: }
312:
313: // Sanity check
314: if (c == null) { // Bad connection
315: SQLException se = new SQLException(
316: "Unable to retrieve connection for transaction "
317: + tid);
318: try { // All backends failed, just ignore
319: if (!notifyFailure(backendThread,
320: request.getTimeout() * 1000L, se))
321: return;
322: } catch (SQLException ignore) {
323: }
324: // Disable this backend (it is no more in sync) by killing the
325: // backend thread
326: backendThread.getLoadBalancer().disableBackend(backend,
327: true);
328: String msg = "Request '"
329: + request.getSqlShortForm(backend
330: .getSqlShortFormLength())
331: + "' failed on backend " + backend.getName()
332: + " but " + getSuccess() + " succeeded (" + se
333: + ")";
334: logger.error(msg);
335: endUserLogger.error(Translate
336: .get("loadbalancer.backend.disabling", backend
337: .getName()));
338: throw new SQLException(msg);
339: }
340:
341: // Execute Query
342: try {
343: ExecuteUpdateResult tmpResult = AbstractLoadBalancer
344: .executeStatementExecuteUpdateOnBackend(request,
345: backend, backendThread,
346: cm.retrieveConnectionForTransaction(tid));
347: synchronized (this ) {
348: if (result == null)
349: result = tmpResult;
350: results.put(backendThread, tmpResult);
351: }
352: backend.updateDatabaseBackendSchema(request);
353: } catch (Exception e) {
354: try { // All backends failed, just ignore
355: if (!notifyFailure(backendThread,
356: request.getTimeout() * 1000L, e)) {
357: result = null;
358: return;
359: }
360: } catch (SQLException ignore) {
361: }
362: // Disable this backend (it is no more in sync)
363: backendThread.getLoadBalancer().disableBackend(backend,
364: true);
365: String msg = "Request '"
366: + request.getSqlShortForm(backend
367: .getSqlShortFormLength())
368: + "' failed on backend " + backend.getName()
369: + " but " + getSuccess() + " succeeded (" + e + ")";
370: if (logger.isDebugEnabled())
371: logger.debug(msg, e);
372: else
373: logger.error(msg);
374: endUserLogger.error(Translate
375: .get("loadbalancer.backend.disabling", backend
376: .getName()));
377: throw new SQLException(msg);
378: }
379: }
380:
381: /**
382: * @see org.continuent.sequoia.controller.loadbalancer.tasks.AbstractTask#getRequest()
383: */
384: public AbstractRequest getRequest() {
385: return request;
386: }
387:
388: /**
389: * Returns the result.
390: *
391: * @return int
392: */
393: public ExecuteUpdateResult getResult() {
394: return result;
395: }
396:
397: /**
398: * @see org.continuent.sequoia.controller.loadbalancer.tasks.AbstractTask#getTransactionId()
399: */
400: public long getTransactionId() {
401: return request.getTransactionId();
402: }
403:
404: /**
405: * @see org.continuent.sequoia.controller.loadbalancer.tasks.AbstractTask#isAutoCommit()
406: */
407: public boolean isAutoCommit() {
408: return request.isAutoCommit();
409: }
410:
411: /**
412: * @see java.lang.Object#equals(java.lang.Object)
413: */
414: public boolean equals(Object other) {
415: if ((other == null)
416: || !(other instanceof StatementExecuteUpdateTask))
417: return false;
418:
419: StatementExecuteUpdateTask seut = (StatementExecuteUpdateTask) other;
420: return this .request.equals(seut.getRequest());
421: }
422:
423: /**
424: * @see java.lang.Object#hashCode()
425: */
426: public int hashCode() {
427: return (int) request.getId();
428: }
429:
430: /**
431: * @see java.lang.Object#toString()
432: */
433: public String toString() {
434: if (request.isAutoCommit())
435: return "Autocommit StatementExecuteUpdateTask "
436: + request.getTransactionId() + " ("
437: + request.getUniqueKey() + ")";
438: else
439: return "StatementExecuteUpdateTask from transaction "
440: + request.getTransactionId() + " ("
441: + request.getUniqueKey() + ")";
442: }
443:
444: }
|