Source Code Cross Referenced for AbstractTask.java in  » Database-JDBC-Connection-Pool » sequoia-2.10.9 » org » continuent » sequoia » controller » loadbalancer » tasks » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Database JDBC Connection Pool » sequoia 2.10.9 » org.continuent.sequoia.controller.loadbalancer.tasks 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        /**
002:         * Sequoia: Database clustering technology.
003:         * Copyright (C) 2002-2004 French National Institute For Research In Computer
004:         * Science And Control (INRIA).
005:         * Copyright (C) 2006 Continuent, Inc.
006:         * Contact: sequoia@continuent.org
007:         * 
008:         * Licensed under the Apache License, Version 2.0 (the "License");
009:         * you may not use this file except in compliance with the License.
010:         * You may obtain a copy of the License at
011:         * 
012:         * http://www.apache.org/licenses/LICENSE-2.0
013:         * 
014:         * Unless required by applicable law or agreed to in writing, software
015:         * distributed under the License is distributed on an "AS IS" BASIS,
016:         * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017:         * See the License for the specific language governing permissions and
018:         * limitations under the License. 
019:         *
020:         * Initial developer(s): Emmanuel Cecchet.
021:         * Contributor(s): Jaco Swart.
022:         */package org.continuent.sequoia.controller.loadbalancer.tasks;
023:
024:        import java.sql.ResultSet;
025:        import java.sql.SQLException;
026:        import java.util.ArrayList;
027:        import java.util.HashMap;
028:        import java.util.List;
029:        import java.util.Map;
030:
031:        import org.continuent.sequoia.common.exceptions.SQLExceptionFactory;
032:        import org.continuent.sequoia.controller.backend.DatabaseBackend;
033:        import org.continuent.sequoia.controller.loadbalancer.BackendWorkerThread;
034:        import org.continuent.sequoia.controller.requests.AbstractRequest;
035:
036:        /**
037:         * Defines an abstract task to be processed by a
038:         * <code>BackendWorkerThread</code>.
039:         * 
040:         * @author <a href="mailto:Emmanuel.Cecchet@inria.fr">Emmanuel Cecchet </a>
041:         * @author <a href="mailto:jaco.swart@iblocks.co.uk">Jaco Swart </a>
042:         * @version 1.0
043:         */
044:        public abstract class AbstractTask {
045:            //
046:            // How the code is organized ?
047:            // 1. Member variables
048:            // 2. Constructor(s)
049:            // 3. Task management
050:            // 4. Getter/Setter
051:            //
052:
053:            /** Total number of threads. */
054:            private int totalNb;
055:
056:            /** Number of threads that must succeed before returning. */
057:            private int nbToComplete;
058:
059:            /** Number of thread that have started the execution of the task */
060:            private int executionStarted;
061:            /** Number of backendThread that have succeeded */
062:            private int success = 0;
063:            /** Number of backendThread that have failed */
064:            private int failed = 0;
065:            /** List of backendThread that have notified this task */
066:            private List notifications = null;
067:            /** List of exceptions of failed nodes */
068:            private List exceptions = null;
069:            /** Map of Lists of locksMap taken by this task, indexed by backend */
070:            private Map locksMap = new HashMap();
071:
072:            // ResultSet for getting back autogenerated keys in an update with keys
073:            private ResultSet generatedKeysResultSet;
074:
075:            // True if the timeout has expired on this task
076:            private boolean timeoutExpired = false;
077:
078:            /** Query result on the fist backend to succeed. Used for sanity check */
079:            private int resultOnFirstBackendToSucceed;
080:
081:            /** True if this task executes on a persistent connection */
082:            private boolean persistentConnection;
083:
084:            /** Persistent connection id if persistentConnection is true */
085:            private long persistentConnectionId;
086:
087:            /*
088:             * Constructor
089:             */
090:
091:            /**
092:             * Sets the number of threads among the total number of threads that must
093:             * successfully complete the execution of this AbstractTask before returning.
094:             * 
095:             * @param nbToComplete number of threads that must succeed before returning
096:             * @param totalNb total number of threads
097:             * @param isPersistentConnection true if this task executes on a persistent
098:             *          connection
099:             * @param persistentConnectionId persistent connection id if
100:             *          persistentConnection is true
101:             */
102:            public AbstractTask(int nbToComplete, int totalNb,
103:                    boolean isPersistentConnection, long persistentConnectionId) {
104:                this .nbToComplete = nbToComplete;
105:                this .totalNb = totalNb;
106:                success = 0;
107:                failed = 0;
108:                executionStarted = 0;
109:                notifications = new ArrayList(nbToComplete);
110:                this .persistentConnection = isPersistentConnection;
111:                this .persistentConnectionId = persistentConnectionId;
112:            }
113:
114:            /*
115:             * Task management
116:             */
117:
118:            /**
119:             * The task code executed by the backendThread.
120:             * 
121:             * @param backendThread The backend thread executing this task
122:             * @throws SQLException if an error occurs
123:             */
124:            public void execute(BackendWorkerThread backendThread)
125:                    throws SQLException {
126:                synchronized (this ) {
127:                    // If the task has expired and nobody has executed it yet, we ignore it
128:                    // else we have to play it.
129:                    // Note that the exception corresponding to the timeout is set by the
130:                    // caller of setExpiredTimeout.
131:                    if (timeoutExpired && (executionStarted == 0))
132:                        return;
133:                    this .executionStarted++;
134:                }
135:                executeTask(backendThread);
136:                // Completed executions are handled by the task internal code that calls
137:                // notifyFailure or notifySuccess.
138:            }
139:
140:            /**
141:             * The implementation specific task code to be executed by backendThread.
142:             * 
143:             * @param backendThread The backend thread executing this task
144:             * @throws SQLException if an error occurs
145:             */
146:            public abstract void executeTask(BackendWorkerThread backendThread)
147:                    throws SQLException;
148:
149:            /**
150:             * This is used to notify the completion of this task without success or
151:             * failure. This is usually used when the task has been discarded for example
152:             * by a backend that is currently disabling but still needs to execute the
153:             * remaining queries in open transactions.
154:             * <p>
155:             * Therefore, this only decrements by one the number of threads that needs to
156:             * complete.
157:             * 
158:             * @param backendThread The backend worker thread notifying this task (null in
159:             *          case this task is cancelled before being processed by a worker
160:             *          thread)
161:             */
162:            public synchronized void notifyCompletion(
163:                    BackendWorkerThread backendThread) {
164:                if ((backendThread != null) && !addNotification(backendThread))
165:                    return;
166:
167:                totalNb--;
168:                // Notify if needed
169:                if (success + failed >= totalNb) {
170:                    notifyAll(); // Notify all failed threads
171:                }
172:            }
173:
174:            /**
175:             * Notifies that the specified backendThread failed to execute this task. If
176:             * all nodes failed, this method return <code>false</code> meaning that the
177:             * problem was due to the task and not to the thread. If the method returns
178:             * <code>true</code>, it can mean that this thread failed and is no more
179:             * coherent, therefore the backend associated to this thread should be
180:             * disabled.
181:             * 
182:             * @param backendThread The backend thread notifying this task
183:             * @param timeout time in milliseconds to wait for other threads to signal
184:             *          success or failure (use -1 if you don't want to wait)
185:             * @param e the exception causing the failure
186:             * @return <code>true</code> if at least one node succeeded to execute this
187:             *         task, <code>false</code> if all threads failed
188:             * @throws SQLException if an error occured in the notification process
189:             */
190:            public synchronized boolean notifyFailure(
191:                    BackendWorkerThread backendThread, long timeout, Throwable e)
192:                    throws SQLException {
193:                if (!addNotification(backendThread)) {
194:                    if (backendThread != null)
195:                        backendThread.getLogger().info(
196:                                "Backend " + backendThread.getBackend()
197:                                        + " already notified task "
198:                                        + toString());
199:                    return success > 0;
200:                }
201:
202:                failed++;
203:
204:                // Log the exception
205:                if (exceptions == null)
206:                    exceptions = new ArrayList();
207:                String backendName;
208:                if (backendThread == null) { // Happens in case of cascade abort (see SEQUOIA-469)
209:                    backendName = "Query not processed";
210:                } else
211:                    backendName = backendThread.getName();
212:
213:                if (e instanceof  SQLException) {
214:                    SQLException sqlEx = (SQLException) e;
215:                    exceptions.add(SQLExceptionFactory.getSQLException(sqlEx,
216:                            "Backend " + backendName + " failed ("
217:                                    + sqlEx.getLocalizedMessage() + ")"));
218:                } else
219:                    exceptions.add(new SQLException("Backend " + backendName
220:                            + " failed (" + e.getLocalizedMessage() + ")")
221:                            .initCause(e));
222:
223:                // Notify if needed
224:                if (success + failed >= totalNb) {
225:                    notifyAll(); // Notify all failed threads
226:                } else {
227:                    if ((timeout > -1) && (success == 0)) {
228:                        try { // Wait to check if all other threads failed or not
229:                            wait(timeout);
230:                        } catch (InterruptedException ie) {
231:                            throw (SQLException) new SQLException(
232:                                    "Wait interrupted() in failed task of backend "
233:                                            + backendName + " ("
234:                                            + e.getLocalizedMessage() + ")")
235:                                    .initCause(e);
236:                        }
237:                    }
238:                }
239:                return success > 0;
240:            }
241:
242:            /**
243:             * Notifies the successful completion of this task.
244:             * 
245:             * @param backendThread The backend thread notifying this task
246:             */
247:            public synchronized void notifySuccess(
248:                    BackendWorkerThread backendThread)
249:
250:            {
251:                if (!addNotification(backendThread))
252:                    return;
253:
254:                doNotifySuccess();
255:            }
256:
257:            /**
258:             * 
259:             */
260:            private void doNotifySuccess() {
261:                success++;
262:
263:                // Notify if needed
264:                if ((success == nbToComplete) || (success + failed >= totalNb)) {
265:                    if (failed > 0)
266:                        notifyAll(); // Notify all failed threads too
267:                    else
268:                        notify();
269:                }
270:            }
271:
272:            /**
273:             * Notifies the successful completion of this task and provide the
274:             * resultOnFirstBackendToSucceed for checking.
275:             * 
276:             * @param backendThread The backend thread notifying this task
277:             * @param result the result of the query on the backend that notify the
278:             *          success
279:             * @return the result returned by the query on the first backend which
280:             *         succeeded
281:             */
282:            public synchronized int notifySuccess(
283:                    BackendWorkerThread backendThread, int result) {
284:                if (success == 0) {
285:                    /*
286:                     * we keep only the result of the first backend to succeed which can be
287:                     * used as a basis to check that success on other backends will be
288:                     * consistent with that first result.
289:                     */
290:                    resultOnFirstBackendToSucceed = result;
291:                }
292:
293:                /*
294:                 * There is a nasty case if the database does not support
295:                 * Statement.cancel(). When a deadlock is detected, the query will be
296:                 * aborted and failure will be notified. But if the query cannot be
297:                 * cancelled, it might be notified as a success later on. In this case, we
298:                 * have to update resultOnFirstBackendToSucceed first (this is why the check
299:                 * on addNotification is done after the update of
300:                 * resultOnFirstBackendToSucceed.
301:                 */
302:                if (!addNotification(backendThread))
303:                    return resultOnFirstBackendToSucceed;
304:
305:                doNotifySuccess();
306:
307:                return resultOnFirstBackendToSucceed;
308:            }
309:
310:            /**
311:             * Add a backend worker thread to the notification list
312:             * 
313:             * @param backendThread the backend worker thread to add
314:             * @return false if this thread already notified the task
315:             */
316:            private boolean addNotification(BackendWorkerThread backendThread) {
317:                if (notifications.contains(backendThread))
318:                    return false;
319:                notifications.add(backendThread);
320:                return true;
321:            }
322:
323:            //
324:            // Getter/Setter
325:            //
326:
327:            /**
328:             * Returns true if this task is in autocommit mode, false if it is in a
329:             * transaction.
330:             * 
331:             * @return Returns true if task must be executed in autoCommit mode.
332:             * @see #getTransactionId()
333:             */
334:            public abstract boolean isAutoCommit();
335:
336:            /**
337:             * Returns the exceptions lists.
338:             * 
339:             * @return an <code>List</code>
340:             */
341:            public List getExceptions() {
342:                return exceptions;
343:            }
344:
345:            /**
346:             * Returns the number of threads that have started the execution of the task.
347:             * 
348:             * @return Returns the number of started executions.
349:             */
350:            public synchronized int getExecutionStarted() {
351:                return executionStarted;
352:            }
353:
354:            /**
355:             * Set the flag to tell that the timeout has expired on this task. If no
356:             * backend has started the task execution then the task will be canceled and
357:             * the method will return true. Otherwise, all backends will execute the
358:             * request and the method will return false.
359:             * 
360:             * @return true if BackendThreads will ignore the task, false if all backends
361:             *         will execute the task.
362:             */
363:            public synchronized boolean setExpiredTimeout() {
364:                this .timeoutExpired = true;
365:                return executionStarted == 0;
366:            }
367:
368:            /**
369:             * Returns the failed.
370:             * 
371:             * @return an <code>int</code> value
372:             */
373:            public int getFailed() {
374:                return failed;
375:            }
376:
377:            /**
378:             * Returns the generatedKeysResultSet value.
379:             * 
380:             * @return Returns the generatedKeysResultSet.
381:             */
382:            public ResultSet getGeneratedKeysResultSet() {
383:                return generatedKeysResultSet;
384:            }
385:
386:            /**
387:             * Sets the generatedKeysResultSet value.
388:             * 
389:             * @param generatedKeysResultSet The generatedKeysResultSet to set.
390:             */
391:            public void setGeneratedKeysResultSet(
392:                    ResultSet generatedKeysResultSet) {
393:                this .generatedKeysResultSet = generatedKeysResultSet;
394:            }
395:
396:            /**
397:             * Returns the locksMap taken by this task for a given backend.
398:             * 
399:             * @param backend backend for which to get the locksMap
400:             * @return Returns the locksMap.
401:             */
402:            public List getLocks(DatabaseBackend backend) {
403:                synchronized (locksMap) {
404:                    return (List) locksMap.get(backend);
405:                }
406:            }
407:
408:            /**
409:             * Sets the locksMap taken by this task for a given backend (ignored if the
410:             * locksMap were already set once).
411:             * 
412:             * @param backend backend for which to set the locksMap
413:             * @param locks The locks map taken.
414:             */
415:            public synchronized void setLocks(DatabaseBackend backend,
416:                    List locks) {
417:                synchronized (locksMap) {
418:                    /*
419:                     * Use get instead of contains key, because sometimes a null entry is
420:                     * added.
421:                     */
422:                    if (locksMap.get(backend) == null)
423:                        locksMap.put(backend, locks);
424:                    else {
425:                        backend.getLogger().fatal(
426:                                "Double locks entry: " + locks + " and "
427:                                        + locksMap.get(backend),
428:                                new Exception());
429:                    }
430:                }
431:            }
432:
433:            /**
434:             * Returns the number of threads that must succeed before returning.
435:             * 
436:             * @return an <code>int</code> value
437:             */
438:            public int getNbToComplete() {
439:                return nbToComplete;
440:            }
441:
442:            /**
443:             * Returns the persistentConnection value.
444:             * 
445:             * @return Returns the persistentConnection.
446:             */
447:            public final boolean isPersistentConnection() {
448:                return persistentConnection;
449:            }
450:
451:            /**
452:             * Sets the persistentConnection value.
453:             * 
454:             * @param persistentConnection The persistentConnection to set.
455:             */
456:            public final void setPersistentConnection(
457:                    boolean persistentConnection) {
458:                this .persistentConnection = persistentConnection;
459:            }
460:
461:            /**
462:             * Returns the persistentConnectionId value.
463:             * 
464:             * @return Returns the persistentConnectionId.
465:             */
466:            public final long getPersistentConnectionId() {
467:                return persistentConnectionId;
468:            }
469:
470:            /**
471:             * Sets the persistentConnectionId value.
472:             * 
473:             * @param persistentConnectionId The persistentConnectionId to set.
474:             */
475:            public final void setPersistentConnectionId(
476:                    long persistentConnectionId) {
477:                this .persistentConnectionId = persistentConnectionId;
478:            }
479:
480:            /**
481:             * Return the request associated with this task if any, returns null
482:             * otherwise.
483:             * 
484:             * @return the request associated with this task or null
485:             */
486:            public abstract AbstractRequest getRequest();
487:
488:            /**
489:             * Returns the success.
490:             * 
491:             * @return an <code>int</code> value
492:             */
493:            public int getSuccess() {
494:                return success;
495:            }
496:
497:            /**
498:             * Returns the total number of threads.
499:             * 
500:             * @return an <code>int</code> value
501:             * @see #setTotalNb
502:             */
503:            public int getTotalNb() {
504:                return totalNb;
505:            }
506:
507:            /**
508:             * Sets the total number of threads.
509:             * 
510:             * @param totalNb the total number of threads to set
511:             * @see #getTotalNb
512:             */
513:            public void setTotalNb(int totalNb) {
514:                this .totalNb = totalNb;
515:            }
516:
517:            /**
518:             * Returns the transaction identifier of this task if any (isAutoCommit
519:             * returns false).
520:             * 
521:             * @return Returns the transaction identifier.
522:             */
523:            public abstract long getTransactionId();
524:
525:            /**
526:             * Returns true if the task has been sucessfully completed by nbToComplete
527:             * nodes (set in the constructor) of if everyone has completed (successfully
528:             * or not), false otherwise.
529:             * 
530:             * @return true if the task execution is complete
531:             * @see AbstractTask#AbstractTask(int, int)
532:             */
533:            public synchronized boolean hasCompleted() {
534:                return ((success >= nbToComplete) || (success + failed == totalNb));
535:            }
536:
537:            /**
538:             * Returns true if the task has completed (successfully or not) or false if we
539:             * are still expecting answers from some backends.
540:             * 
541:             * @return true if the task execution is complete
542:             */
543:            public synchronized boolean hasFullyCompleted() {
544:                return success + failed == totalNb;
545:            }
546:
547:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.