Source Code Cross Referenced for Scheduler.java in  » Science » Cougaar12_4 » org » cougaar » core » thread » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Science » Cougaar12_4 » org.cougaar.core.thread 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        /*
002:         * <copyright>
003:         *  
004:         *  Copyright 1997-2004 BBNT Solutions, LLC
005:         *  under sponsorship of the Defense Advanced Research Projects
006:         *  Agency (DARPA).
007:         * 
008:         *  You can redistribute this software and/or modify it under the
009:         *  terms of the Cougaar Open Source License as published on the
010:         *  Cougaar Open Source Website (www.cougaar.org).
011:         * 
012:         *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013:         *  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014:         *  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015:         *  A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016:         *  OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017:         *  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018:         *  LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019:         *  DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020:         *  THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021:         *  (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022:         *  OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023:         *  
024:         * </copyright>
025:         */
026:
027:        package org.cougaar.core.thread;
028:
029:        import java.util.ArrayList;
030:        import java.util.Comparator;
031:        import java.util.Iterator;
032:        import java.util.List;
033:        import java.util.RandomAccess;
034:
035:        import org.cougaar.util.UnaryPredicate;
036:        import org.cougaar.util.log.Logger;
037:        import org.cougaar.util.log.Logging;
038:
039:        /**
040:         * The base class of thread-scheduler.  It allows a certain maximum
041:         * number of threads to running, queuing any requests beyond that.  An
042:         * items is dequeued when a running thread stops.  The maximum is per
043:         * thread-service, not global.  This scheduler is not used by default
044:         * (the PropagatingScheduler extension is the default).
045:         *
046:         * @property org.cougaar.thread.running.max specifies the maximum
047:         * number of running threads.  A negative number is interpreted to
048:         * mean that there is no maximum.  The precise meaning of 'maximum' is
049:         * varies by scheduler class.
050:         *
051:         */
052:        public class Scheduler {
053:            private DynamicSortedQueue<SchedulableObject> pendingThreads;
054:            private List<SchedulableObject> disqualified = new ArrayList<SchedulableObject>();
055:            private UnaryPredicate qualifier;
056:            private UnaryPredicate childQualifier;
057:            private ThreadListenerProxy listenerProxy;
058:            private String printName;
059:            private TreeNode treeNode;
060:            private int absoluteMax;
061:            private int maxRunningThreads = 0;
062:            private int runningThreadCount = 0;
063:            private int lane;
064:            protected Logger logger = Logging.getLogger(getClass().getName());
065:            protected int rightsRequestCount = 0;
066:
067:            private static Logger _logger = Logging.getLogger(Scheduler.class);
068:
069:            private Comparator<SchedulableObject> timeComparator = new Comparator<SchedulableObject>() {
070:                public boolean equals(Object x) {
071:                    return x == this ;
072:                }
073:
074:                public int compare(SchedulableObject x, SchedulableObject y) {
075:                    long t1 = x.getTimestamp();
076:                    long t2 = y.getTimestamp();
077:                    if (t1 < t2)
078:                        return -1;
079:                    else if (t1 > t2)
080:                        return 1;
081:                    else
082:                        return 0;
083:                }
084:            };
085:
086:            public Scheduler(ThreadListenerProxy listenerProxy) {
087:                pendingThreads = new DynamicSortedQueue<SchedulableObject>(
088:                        timeComparator);
089:                this .listenerProxy = listenerProxy;
090:            }
091:
092:            void setAbsoluteMax(int absoluteMax) {
093:                this .absoluteMax = absoluteMax;
094:                maxRunningThreads = absoluteMax;
095:                if (_logger.isInfoEnabled()) {
096:                    _logger.info("Initialized maxRunningThreads to "
097:                            + maxRunningThreads);
098:                }
099:            }
100:
101:            // NB: By design this method is NOT synchronized! It should only
102:            // be used by the ThreadStatusService and is intended to provide
103:            // a best-effort snapshot.  Failures are normal. 
104:            int iterateOverQueuedThreads(ThreadStatusService.Body body) {
105:                // could copy the queue in a synchronized block, on the off
106:                // chance that read-access is unsafe otherwise.  But there are
107:                // no indications this is really an issue.
108:                try {
109:                    return pendingThreads.processEach(body, getName(), _logger);
110:                } catch (IndexOutOfBoundsException r) {
111:                    // this may happen if pendingThreads was modified during the traversal
112:                    if (_logger.isDebugEnabled()) {
113:                        _logger
114:                                .debug(
115:                                        this 
116:                                                + ": SortedQueue.processEach detected a collision",
117:                                        r);
118:                    }
119:                } catch (Throwable r) {
120:                    _logger
121:                            .error(
122:                                    this 
123:                                            + ": SortedQueue.processEach threw an uncaught exception",
124:                                    r);
125:                }
126:                return 0;
127:            }
128:
129:            private Logger getLogger() {
130:                return logger;
131:            }
132:
133:            public void setRightsSelector(RightsSelector selector) {
134:                // error? no-op?
135:            }
136:
137:            public String toString() {
138:                return printName;
139:            }
140:
141:            void setTreeNode(TreeNode treeNode) {
142:                this .treeNode = treeNode;
143:                printName = "<Scheduler " + treeNode.getName() + " [" + lane
144:                        + "]>";
145:            }
146:
147:            TreeNode getTreeNode() {
148:                return treeNode;
149:            }
150:
151:            int getLane() {
152:                return lane;
153:            }
154:
155:            void setLane(int lane) {
156:                this .lane = lane;
157:            }
158:
159:            String getName() {
160:                return getTreeNode().getName();
161:            }
162:
163:            // ThreadControlService 
164:            synchronized public void setQueueComparator(
165:                    final Comparator<Schedulable> comparator) {
166:                if (comparator != null) {
167:                    Comparator<SchedulableObject> c = new Comparator<SchedulableObject>() {
168:                        public int compare(SchedulableObject o1,
169:                                SchedulableObject o2) {
170:                            return comparator.compare(o1, o2);
171:                        }
172:                    };
173:                    pendingThreads.setComparator(c);
174:                } else {
175:                    pendingThreads.setComparator(timeComparator);
176:                }
177:            }
178:
179:            public int maxRunningThreadCount() {
180:                return maxRunningThreads;
181:            }
182:
183:            public int pendingThreadCount() {
184:                return pendingThreads.size();
185:            }
186:
187:            public int runningThreadCount() {
188:                return runningThreadCount;
189:            }
190:
191:            // synchronize to keep the two addends consistent
192:            synchronized public int activeThreadCount() {
193:                return runningThreadCount + pendingThreads.size();
194:            }
195:
196:            public boolean setChildQualifier(UnaryPredicate predicate) {
197:                if (predicate == null) {
198:                    childQualifier = null;
199:                    return true;
200:                } else if (childQualifier == null) {
201:                    childQualifier = predicate;
202:                    return true;
203:                } else {
204:                    // log an error
205:                    Logger logger = getLogger();
206:                    if (logger.isErrorEnabled()) {
207:                        logger.error("ChildQualifier is already set");
208:                    }
209:                    return false;
210:                }
211:            }
212:
213:            boolean allowRightFor(Scheduler child) {
214:                if (child == this  || childQualifier == null) {
215:                    // Don't run this on yourself, leave it to the parent.
216:                    return true;
217:                } else {
218:                    return childQualifier.execute(child);
219:                }
220:            }
221:
222:            synchronized private boolean addPendingThreadSync(
223:                    SchedulableObject thread) {
224:                if (pendingThreads.contains(thread)) {
225:                    return false;
226:                }
227:                thread.setQueued(true);
228:                pendingThreads.add(thread);
229:                return true;
230:            }
231:
232:            void addPendingThread(SchedulableObject thread) {
233:                if (addPendingThreadSync(thread)) {
234:                    listenerProxy.notifyQueued(thread);
235:                }
236:            }
237:
238:            synchronized void dequeue(SchedulableObject thread) {
239:                pendingThreads.remove(thread);
240:                threadDequeued(thread);
241:            }
242:
243:            void threadDequeued(SchedulableObject thread) {
244:                listenerProxy.notifyDequeued(thread);
245:            }
246:
247:            // Called within the thread itself as the first thing it does.
248:            void threadClaimed(SchedulableObject thread) {
249:                listenerProxy.notifyStart(thread);
250:            }
251:
252:            synchronized private SchedulableObject getNextSync() {
253:                return pendingThreads.next();
254:            }
255:
256:            // Called within the thread itself as the last thing it does.
257:            SchedulableObject threadReclaimed(SchedulableObject thread,
258:                    boolean reuse) {
259:                listenerProxy.notifyEnd(thread);
260:                return reuse ? getNextSync() : null;
261:            }
262:
263:            // Suspend/Resume "hints" -- not used yet.
264:            void threadResumed(SchedulableObject thread) {
265:            }
266:
267:            void threadSuspended(SchedulableObject thread) {
268:            }
269:
270:            synchronized void incrementRunCount(Scheduler consumer) {
271:                ++runningThreadCount;
272:                listenerProxy.notifyRightGiven(consumer);
273:            }
274:
275:            synchronized void decrementRunCount(Scheduler consumer) {
276:                --runningThreadCount;
277:                if (runningThreadCount < 0) {
278:                    StringBuffer buf = new StringBuffer();
279:                    buf.append(this .toString());
280:                    buf.append(" thread count is ").append(
281:                            Integer.toString(runningThreadCount));
282:                    TreeNode node = getTreeNode();
283:                    TreeNode parent = node.getParent();
284:                    if (parent != null) {
285:                        buf.append(" parent ").append(parent.getName());
286:                    }
287:                    List<TreeNode> children = node.getChildren();
288:                    if (children != null && !children.isEmpty()) {
289:                        buf.append(" children");
290:                        for (TreeNode child : children) {
291:                            buf.append(" " + child.getName());
292:                        }
293:                    }
294:                    logger.error(buf.toString());
295:                    runningThreadCount = 0;
296:                }
297:                listenerProxy.notifyRightReturned(consumer);
298:            }
299:
300:            SchedulableObject getNextPending() {
301:                return popQueue();
302:            }
303:
304:            synchronized private SchedulableObject popQueueSync() {
305:                if (!checkLocalRights()) {
306:                    return null;
307:                }
308:                SchedulableObject thread = pendingThreads.next();
309:                if (thread != null) {
310:                    incrementRunCount(this );
311:                }
312:                return thread;
313:            }
314:
315:            SchedulableObject popQueue() {
316:                SchedulableObject thread = popQueueSync();
317:                // Notify listeners
318:                if (thread != null) {
319:                    threadDequeued(thread);
320:                }
321:
322:                return thread;
323:            }
324:
325:            // Caller should synchronize.  
326:            boolean checkLocalRights() {
327:                if (maxRunningThreads < 0) {
328:                    return true;
329:                }
330:                return runningThreadCount + rightsRequestCount < maxRunningThreads;
331:            }
332:
333:            synchronized boolean requestRights(Scheduler requestor) {
334:                if (maxRunningThreads < 0
335:                        || runningThreadCount < maxRunningThreads) {
336:                    incrementRunCount(requestor);
337:                    return true;
338:                }
339:                return false;
340:            }
341:
342:            synchronized void releaseRights(Scheduler consumer) {
343:                // If the max has recently decreased it may be lower than the
344:                // running count.  In that case don't do a handoff.
345:                decrementRunCount(consumer);
346:                SchedulableObject handoff = null;
347:
348:                if (runningThreadCount < maxRunningThreads) {
349:                    handoff = getNextPending();
350:                    if (handoff != null) {
351:                        handoff.thread_start();
352:                    }
353:                } else {
354:                    if (logger.isErrorEnabled()) {
355:                        logger
356:                                .error("Decreased thread count prevented handoff "
357:                                        + runningThreadCount
358:                                        + ">"
359:                                        + maxRunningThreads);
360:                    }
361:                }
362:            }
363:
364:            synchronized private SchedulableObject getNextPendingSync() {
365:                return getNextPending();
366:            }
367:
368:            public void setMaxRunningThreadCount(int requested_max) {
369:                int count = requested_max;
370:                Logger logger = getLogger();
371:                if (requested_max > absoluteMax) {
372:                    if (logger.isErrorEnabled()) {
373:                        logger.error("Attempt to set maxRunningThreadCount to "
374:                                + requested_max
375:                                + " which is greater than the absolute max of "
376:                                + absoluteMax);
377:                    }
378:                    count = absoluteMax;
379:                } else {
380:                    if (_logger.isInfoEnabled()) {
381:                        _logger.info(this 
382:                                + ": Setting maxRunningThreadCount to "
383:                                + requested_max + " from " + maxRunningThreads);
384:                    }
385:                }
386:                int additionalThreads = count - maxRunningThreads;
387:                maxRunningThreads = count;
388:                TreeNode parent_node = getTreeNode().getParent();
389:                if (parent_node != null) {
390:                    return;
391:                }
392:
393:                // If we get here, we're the root node.  Try to run more
394:                // threads if the count has gone up.
395:                for (int i = 0; i < additionalThreads; i++) {
396:                    SchedulableObject schedulable = getNextPendingSync();
397:                    if (schedulable == null) {
398:                        return;
399:                    }
400:                    // 	    System.err.println("Increased thread count let me start one!");
401:                    schedulable.thread_start();
402:                }
403:            }
404:
405:            private boolean qualified(SchedulableObject thread) {
406:                return qualifier == null || qualifier.execute(thread);
407:            }
408:
409:            synchronized private void setNewQualifier(UnaryPredicate predicate) {
410:                qualifier = predicate;
411:                List<SchedulableObject> bad = pendingThreads.filter(predicate);
412:                // move any disqualified items on the queue to the
413:                // disqualified list
414:                if (bad instanceof  RandomAccess) {
415:                    Iterator<SchedulableObject> itr = bad.iterator();
416:                    while (itr.hasNext()) {
417:                        disqualify(itr.next());
418:                    }
419:                } else {
420:                    for (SchedulableObject sched : bad) {
421:                        disqualify(sched);
422:                    }
423:                }
424:            }
425:
426:            synchronized private List<SchedulableObject> resetQualifier() {
427:                qualifier = null;
428:                List<SchedulableObject> requeue = new ArrayList<SchedulableObject>(
429:                        disqualified);
430:                disqualified.clear();
431:                return requeue;
432:            }
433:
434:            public boolean setQualifier(UnaryPredicate predicate) {
435:                if (predicate == null) {
436:                    List<SchedulableObject> requeue = resetQualifier();
437:                    Logger logger = getLogger();
438:                    if (logger.isDebugEnabled()) {
439:                        logger.debug("Restoring " + requeue.size()
440:                                + " previously disqualified threads");
441:                    }
442:                    for (int i = 0, n = requeue.size(); i < n; i++) {
443:                        SchedulableObject sched = requeue.get(i);
444:                        SchedulableStateChangeQueue.pushStart(sched);
445:                    }
446:                    return true;
447:                } else if (qualifier == null) {
448:                    setNewQualifier(predicate);
449:                    return true;
450:                } else {
451:                    Logger logger = getLogger();
452:                    if (logger.isErrorEnabled()) {
453:                        logger.error("Qualifier is already set");
454:                    }
455:                    return false;
456:                }
457:            }
458:
459:            private void disqualify(SchedulableObject sched) {
460:                sched.setDisqualified(true);
461:                if (!disqualified.contains(sched)) {
462:                    disqualified.add(sched);
463:                }
464:            }
465:
466:            synchronized boolean checkQualification(SchedulableObject thread) {
467:                if (!qualified(thread)) {
468:                    disqualify(thread);
469:                    return false;
470:                }
471:                if (pendingThreadCount() > 0) {
472:                    addPendingThread(thread);
473:                    return false;
474:                }
475:                return true;
476:            }
477:
478:            void startOrQueue(SchedulableObject thread) {
479:                // If the queue isn't empty, queue this one too.
480:                if (!checkQualification(thread)) {
481:                    return;
482:                }
483:                boolean can_run = requestRights(this);
484:                if (can_run) {
485:                    thread.thread_start();
486:                } else {
487:                    addPendingThread(thread);
488:                }
489:            }
490:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.