001: /**
002: * Sequoia: Database clustering technology.
003: * Copyright (C) 2002-2004 French National Institute For Research In Computer
004: * Science And Control (INRIA).
005: * Copyright (C) 2006 Continuent, Inc.
006: * Contact: sequoia@continuent.org
007: *
008: * Licensed under the Apache License, Version 2.0 (the "License");
009: * you may not use this file except in compliance with the License.
010: * You may obtain a copy of the License at
011: *
012: * http://www.apache.org/licenses/LICENSE-2.0
013: *
014: * Unless required by applicable law or agreed to in writing, software
015: * distributed under the License is distributed on an "AS IS" BASIS,
016: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017: * See the License for the specific language governing permissions and
018: * limitations under the License.
019: *
020: * Initial developer(s): Emmanuel Cecchet.
021: * Contributor(s): Jaco Swart.
022: */package org.continuent.sequoia.controller.loadbalancer.tasks;
023:
024: import java.sql.ResultSet;
025: import java.sql.SQLException;
026: import java.util.ArrayList;
027: import java.util.HashMap;
028: import java.util.List;
029: import java.util.Map;
030:
031: import org.continuent.sequoia.common.exceptions.SQLExceptionFactory;
032: import org.continuent.sequoia.controller.backend.DatabaseBackend;
033: import org.continuent.sequoia.controller.loadbalancer.BackendWorkerThread;
034: import org.continuent.sequoia.controller.requests.AbstractRequest;
035:
036: /**
037: * Defines an abstract task to be processed by a
038: * <code>BackendWorkerThread</code>.
039: *
040: * @author <a href="mailto:Emmanuel.Cecchet@inria.fr">Emmanuel Cecchet </a>
041: * @author <a href="mailto:jaco.swart@iblocks.co.uk">Jaco Swart </a>
042: * @version 1.0
043: */
044: public abstract class AbstractTask {
045: //
046: // How the code is organized ?
047: // 1. Member variables
048: // 2. Constructor(s)
049: // 3. Task management
050: // 4. Getter/Setter
051: //
052:
053: /** Total number of threads. */
054: private int totalNb;
055:
056: /** Number of threads that must succeed before returning. */
057: private int nbToComplete;
058:
059: /** Number of thread that have started the execution of the task */
060: private int executionStarted;
061: /** Number of backendThread that have succeeded */
062: private int success = 0;
063: /** Number of backendThread that have failed */
064: private int failed = 0;
065: /** List of backendThread that have notified this task */
066: private List notifications = null;
067: /** List of exceptions of failed nodes */
068: private List exceptions = null;
069: /** Map of Lists of locksMap taken by this task, indexed by backend */
070: private Map locksMap = new HashMap();
071:
072: // ResultSet for getting back autogenerated keys in an update with keys
073: private ResultSet generatedKeysResultSet;
074:
075: // True if the timeout has expired on this task
076: private boolean timeoutExpired = false;
077:
078: /** Query result on the fist backend to succeed. Used for sanity check */
079: private int resultOnFirstBackendToSucceed;
080:
081: /** True if this task executes on a persistent connection */
082: private boolean persistentConnection;
083:
084: /** Persistent connection id if persistentConnection is true */
085: private long persistentConnectionId;
086:
087: /*
088: * Constructor
089: */
090:
091: /**
092: * Sets the number of threads among the total number of threads that must
093: * successfully complete the execution of this AbstractTask before returning.
094: *
095: * @param nbToComplete number of threads that must succeed before returning
096: * @param totalNb total number of threads
097: * @param isPersistentConnection true if this task executes on a persistent
098: * connection
099: * @param persistentConnectionId persistent connection id if
100: * persistentConnection is true
101: */
102: public AbstractTask(int nbToComplete, int totalNb,
103: boolean isPersistentConnection, long persistentConnectionId) {
104: this .nbToComplete = nbToComplete;
105: this .totalNb = totalNb;
106: success = 0;
107: failed = 0;
108: executionStarted = 0;
109: notifications = new ArrayList(nbToComplete);
110: this .persistentConnection = isPersistentConnection;
111: this .persistentConnectionId = persistentConnectionId;
112: }
113:
114: /*
115: * Task management
116: */
117:
118: /**
119: * The task code executed by the backendThread.
120: *
121: * @param backendThread The backend thread executing this task
122: * @throws SQLException if an error occurs
123: */
124: public void execute(BackendWorkerThread backendThread)
125: throws SQLException {
126: synchronized (this ) {
127: // If the task has expired and nobody has executed it yet, we ignore it
128: // else we have to play it.
129: // Note that the exception corresponding to the timeout is set by the
130: // caller of setExpiredTimeout.
131: if (timeoutExpired && (executionStarted == 0))
132: return;
133: this .executionStarted++;
134: }
135: executeTask(backendThread);
136: // Completed executions are handled by the task internal code that calls
137: // notifyFailure or notifySuccess.
138: }
139:
140: /**
141: * The implementation specific task code to be executed by backendThread.
142: *
143: * @param backendThread The backend thread executing this task
144: * @throws SQLException if an error occurs
145: */
146: public abstract void executeTask(BackendWorkerThread backendThread)
147: throws SQLException;
148:
149: /**
150: * This is used to notify the completion of this task without success or
151: * failure. This is usually used when the task has been discarded for example
152: * by a backend that is currently disabling but still needs to execute the
153: * remaining queries in open transactions.
154: * <p>
155: * Therefore, this only decrements by one the number of threads that needs to
156: * complete.
157: *
158: * @param backendThread The backend worker thread notifying this task (null in
159: * case this task is cancelled before being processed by a worker
160: * thread)
161: */
162: public synchronized void notifyCompletion(
163: BackendWorkerThread backendThread) {
164: if ((backendThread != null) && !addNotification(backendThread))
165: return;
166:
167: totalNb--;
168: // Notify if needed
169: if (success + failed >= totalNb) {
170: notifyAll(); // Notify all failed threads
171: }
172: }
173:
174: /**
175: * Notifies that the specified backendThread failed to execute this task. If
176: * all nodes failed, this method return <code>false</code> meaning that the
177: * problem was due to the task and not to the thread. If the method returns
178: * <code>true</code>, it can mean that this thread failed and is no more
179: * coherent, therefore the backend associated to this thread should be
180: * disabled.
181: *
182: * @param backendThread The backend thread notifying this task
183: * @param timeout time in milliseconds to wait for other threads to signal
184: * success or failure (use -1 if you don't want to wait)
185: * @param e the exception causing the failure
186: * @return <code>true</code> if at least one node succeeded to execute this
187: * task, <code>false</code> if all threads failed
188: * @throws SQLException if an error occured in the notification process
189: */
190: public synchronized boolean notifyFailure(
191: BackendWorkerThread backendThread, long timeout, Throwable e)
192: throws SQLException {
193: if (!addNotification(backendThread)) {
194: if (backendThread != null)
195: backendThread.getLogger().info(
196: "Backend " + backendThread.getBackend()
197: + " already notified task "
198: + toString());
199: return success > 0;
200: }
201:
202: failed++;
203:
204: // Log the exception
205: if (exceptions == null)
206: exceptions = new ArrayList();
207: String backendName;
208: if (backendThread == null) { // Happens in case of cascade abort (see SEQUOIA-469)
209: backendName = "Query not processed";
210: } else
211: backendName = backendThread.getName();
212:
213: if (e instanceof SQLException) {
214: SQLException sqlEx = (SQLException) e;
215: exceptions.add(SQLExceptionFactory.getSQLException(sqlEx,
216: "Backend " + backendName + " failed ("
217: + sqlEx.getLocalizedMessage() + ")"));
218: } else
219: exceptions.add(new SQLException("Backend " + backendName
220: + " failed (" + e.getLocalizedMessage() + ")")
221: .initCause(e));
222:
223: // Notify if needed
224: if (success + failed >= totalNb) {
225: notifyAll(); // Notify all failed threads
226: } else {
227: if ((timeout > -1) && (success == 0)) {
228: try { // Wait to check if all other threads failed or not
229: wait(timeout);
230: } catch (InterruptedException ie) {
231: throw (SQLException) new SQLException(
232: "Wait interrupted() in failed task of backend "
233: + backendName + " ("
234: + e.getLocalizedMessage() + ")")
235: .initCause(e);
236: }
237: }
238: }
239: return success > 0;
240: }
241:
242: /**
243: * Notifies the successful completion of this task.
244: *
245: * @param backendThread The backend thread notifying this task
246: */
247: public synchronized void notifySuccess(
248: BackendWorkerThread backendThread)
249:
250: {
251: if (!addNotification(backendThread))
252: return;
253:
254: doNotifySuccess();
255: }
256:
257: /**
258: *
259: */
260: private void doNotifySuccess() {
261: success++;
262:
263: // Notify if needed
264: if ((success == nbToComplete) || (success + failed >= totalNb)) {
265: if (failed > 0)
266: notifyAll(); // Notify all failed threads too
267: else
268: notify();
269: }
270: }
271:
272: /**
273: * Notifies the successful completion of this task and provide the
274: * resultOnFirstBackendToSucceed for checking.
275: *
276: * @param backendThread The backend thread notifying this task
277: * @param result the result of the query on the backend that notify the
278: * success
279: * @return the result returned by the query on the first backend which
280: * succeeded
281: */
282: public synchronized int notifySuccess(
283: BackendWorkerThread backendThread, int result) {
284: if (success == 0) {
285: /*
286: * we keep only the result of the first backend to succeed which can be
287: * used as a basis to check that success on other backends will be
288: * consistent with that first result.
289: */
290: resultOnFirstBackendToSucceed = result;
291: }
292:
293: /*
294: * There is a nasty case if the database does not support
295: * Statement.cancel(). When a deadlock is detected, the query will be
296: * aborted and failure will be notified. But if the query cannot be
297: * cancelled, it might be notified as a success later on. In this case, we
298: * have to update resultOnFirstBackendToSucceed first (this is why the check
299: * on addNotification is done after the update of
300: * resultOnFirstBackendToSucceed.
301: */
302: if (!addNotification(backendThread))
303: return resultOnFirstBackendToSucceed;
304:
305: doNotifySuccess();
306:
307: return resultOnFirstBackendToSucceed;
308: }
309:
310: /**
311: * Add a backend worker thread to the notification list
312: *
313: * @param backendThread the backend worker thread to add
314: * @return false if this thread already notified the task
315: */
316: private boolean addNotification(BackendWorkerThread backendThread) {
317: if (notifications.contains(backendThread))
318: return false;
319: notifications.add(backendThread);
320: return true;
321: }
322:
323: //
324: // Getter/Setter
325: //
326:
327: /**
328: * Returns true if this task is in autocommit mode, false if it is in a
329: * transaction.
330: *
331: * @return Returns true if task must be executed in autoCommit mode.
332: * @see #getTransactionId()
333: */
334: public abstract boolean isAutoCommit();
335:
336: /**
337: * Returns the exceptions lists.
338: *
339: * @return an <code>List</code>
340: */
341: public List getExceptions() {
342: return exceptions;
343: }
344:
345: /**
346: * Returns the number of threads that have started the execution of the task.
347: *
348: * @return Returns the number of started executions.
349: */
350: public synchronized int getExecutionStarted() {
351: return executionStarted;
352: }
353:
354: /**
355: * Set the flag to tell that the timeout has expired on this task. If no
356: * backend has started the task execution then the task will be canceled and
357: * the method will return true. Otherwise, all backends will execute the
358: * request and the method will return false.
359: *
360: * @return true if BackendThreads will ignore the task, false if all backends
361: * will execute the task.
362: */
363: public synchronized boolean setExpiredTimeout() {
364: this .timeoutExpired = true;
365: return executionStarted == 0;
366: }
367:
368: /**
369: * Returns the failed.
370: *
371: * @return an <code>int</code> value
372: */
373: public int getFailed() {
374: return failed;
375: }
376:
377: /**
378: * Returns the generatedKeysResultSet value.
379: *
380: * @return Returns the generatedKeysResultSet.
381: */
382: public ResultSet getGeneratedKeysResultSet() {
383: return generatedKeysResultSet;
384: }
385:
386: /**
387: * Sets the generatedKeysResultSet value.
388: *
389: * @param generatedKeysResultSet The generatedKeysResultSet to set.
390: */
391: public void setGeneratedKeysResultSet(
392: ResultSet generatedKeysResultSet) {
393: this .generatedKeysResultSet = generatedKeysResultSet;
394: }
395:
396: /**
397: * Returns the locksMap taken by this task for a given backend.
398: *
399: * @param backend backend for which to get the locksMap
400: * @return Returns the locksMap.
401: */
402: public List getLocks(DatabaseBackend backend) {
403: synchronized (locksMap) {
404: return (List) locksMap.get(backend);
405: }
406: }
407:
408: /**
409: * Sets the locksMap taken by this task for a given backend (ignored if the
410: * locksMap were already set once).
411: *
412: * @param backend backend for which to set the locksMap
413: * @param locks The locks map taken.
414: */
415: public synchronized void setLocks(DatabaseBackend backend,
416: List locks) {
417: synchronized (locksMap) {
418: /*
419: * Use get instead of contains key, because sometimes a null entry is
420: * added.
421: */
422: if (locksMap.get(backend) == null)
423: locksMap.put(backend, locks);
424: else {
425: backend.getLogger().fatal(
426: "Double locks entry: " + locks + " and "
427: + locksMap.get(backend),
428: new Exception());
429: }
430: }
431: }
432:
433: /**
434: * Returns the number of threads that must succeed before returning.
435: *
436: * @return an <code>int</code> value
437: */
438: public int getNbToComplete() {
439: return nbToComplete;
440: }
441:
442: /**
443: * Returns the persistentConnection value.
444: *
445: * @return Returns the persistentConnection.
446: */
447: public final boolean isPersistentConnection() {
448: return persistentConnection;
449: }
450:
451: /**
452: * Sets the persistentConnection value.
453: *
454: * @param persistentConnection The persistentConnection to set.
455: */
456: public final void setPersistentConnection(
457: boolean persistentConnection) {
458: this .persistentConnection = persistentConnection;
459: }
460:
461: /**
462: * Returns the persistentConnectionId value.
463: *
464: * @return Returns the persistentConnectionId.
465: */
466: public final long getPersistentConnectionId() {
467: return persistentConnectionId;
468: }
469:
470: /**
471: * Sets the persistentConnectionId value.
472: *
473: * @param persistentConnectionId The persistentConnectionId to set.
474: */
475: public final void setPersistentConnectionId(
476: long persistentConnectionId) {
477: this .persistentConnectionId = persistentConnectionId;
478: }
479:
480: /**
481: * Return the request associated with this task if any, returns null
482: * otherwise.
483: *
484: * @return the request associated with this task or null
485: */
486: public abstract AbstractRequest getRequest();
487:
488: /**
489: * Returns the success.
490: *
491: * @return an <code>int</code> value
492: */
493: public int getSuccess() {
494: return success;
495: }
496:
497: /**
498: * Returns the total number of threads.
499: *
500: * @return an <code>int</code> value
501: * @see #setTotalNb
502: */
503: public int getTotalNb() {
504: return totalNb;
505: }
506:
507: /**
508: * Sets the total number of threads.
509: *
510: * @param totalNb the total number of threads to set
511: * @see #getTotalNb
512: */
513: public void setTotalNb(int totalNb) {
514: this .totalNb = totalNb;
515: }
516:
517: /**
518: * Returns the transaction identifier of this task if any (isAutoCommit
519: * returns false).
520: *
521: * @return Returns the transaction identifier.
522: */
523: public abstract long getTransactionId();
524:
525: /**
526: * Returns true if the task has been sucessfully completed by nbToComplete
527: * nodes (set in the constructor) of if everyone has completed (successfully
528: * or not), false otherwise.
529: *
530: * @return true if the task execution is complete
531: * @see AbstractTask#AbstractTask(int, int)
532: */
533: public synchronized boolean hasCompleted() {
534: return ((success >= nbToComplete) || (success + failed == totalNb));
535: }
536:
537: /**
538: * Returns true if the task has completed (successfully or not) or false if we
539: * are still expecting answers from some backends.
540: *
541: * @return true if the task execution is complete
542: */
543: public synchronized boolean hasFullyCompleted() {
544: return success + failed == totalNb;
545: }
546:
547: }
|