Source Code Cross Referenced for ThreadPool.java in  » Workflow-Engines » pegasus-2.1.0 » org » griphyn » cPlanner » engine » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Workflow Engines » pegasus 2.1.0 » org.griphyn.cPlanner.engine 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        /*
002:         * This file or a portion of this file is licensed under the terms of
003:         * the Globus Toolkit Public License, found in file GTPL, or at
004:         * http://www.globus.org/toolkit/download/license.html. This notice must
005:         * appear in redistributions of this file, with or without modification.
006:         *
007:         * Redistributions of this Software, with or without modification, must
008:         * reproduce the GTPL in: (1) the Software, or (2) the Documentation or
009:         * some other similar material which is provided with the Software (if
010:         * any).
011:         *
012:         * Copyright 1999-2004 University of Chicago and The University of
013:         * Southern California. All rights reserved.
014:         */
015:
016:        package org.griphyn.cPlanner.engine;
017:
018:        import java.io.File;
019:        import java.io.FileInputStream;
020:        import java.util.LinkedList;
021:        import java.util.Set;
022:
023:        import org.gridforum.jgss.ExtendedGSSCredential;
024:        import org.gridforum.jgss.ExtendedGSSManager;
025:        import org.griphyn.cPlanner.classes.AuthenticateRequest;
026:        import org.griphyn.cPlanner.classes.Profile;
027:        import org.griphyn.cPlanner.common.LogManager;
028:        import org.griphyn.cPlanner.common.PegasusProperties;
029:        import org.griphyn.cPlanner.namespace.ENV;
030:        import org.griphyn.cPlanner.poolinfo.PoolInfoProvider;
031:        import org.griphyn.cPlanner.poolinfo.PoolMode;
032:        import org.ietf.jgss.GSSCredential;
033:
034:        /**
035:         * This maintains a pool of authenticate threads that authenticate against a
036:         * particular resource.
037:         *
038:         * @author Karan Vahi
039:         * @version $Revision: 50 $
040:         */
041:
042:        public class ThreadPool {
043:
044:            /**
045:             * The maximum number of authentication threads that are spawned.
046:             */
047:            public static final int NUMBER_OF_THREADS = 5;
048:
049:            /**
050:             * The request queue that holds the authenticate requests. The worker
051:             * threads do access this job queue.
052:             */
053:            private LinkedList mQueue;
054:
055:            /**
056:             * The handle to the properties object.
057:             */
058:            private PegasusProperties mProps;
059:
060:            /**
061:             * The handle to the Pool Info Provider.
062:             */
063:            private PoolInfoProvider mPoolHandle;
064:
065:            /**
066:             * The handle to the LogManager object.
067:             */
068:            private LogManager mLogger;
069:
070:            /**
071:             * The Set of pools that need to be authenticated against.
072:             */
073:            private Set mExecPools;
074:
075:            /**
076:             * The number of pools that one has to authenticate against.
077:             */
078:            private Integer mNumOfPools;
079:
080:            /**
081:             * The handle to the pool of threads that this thread pool is reponsible for.
082:             */
083:            private AuthenticateThread[] mWorkers;
084:
085:            /**
086:             * The condition variable that is used to synchronize the shutdown.
087:             */
088:            private ConditionVariable mCurrentNum;
089:
090:            /**
091:             * The namespace object holding the environment variables for local
092:             * pool.
093:             */
094:            private ENV mLocalEnv;
095:
096:            /**
097:             * The credential loaded from the non default location if specified.
098:             */
099:            private GSSCredential mCredential;
100:
101:            /**
102:             * The overloaded constructor.
103:             *
104:             * @param properties  the <code>PegasusProperties</code> to be used.
105:             * @param pools       the set of pools against which the user is authenticating.
106:             */
107:            public ThreadPool(PegasusProperties properties, Set pools) {
108:                mQueue = new LinkedList();
109:                mCurrentNum = new ConditionVariable();
110:                mProps = properties;
111:                mLogger = LogManager.getInstance();
112:                String poolClass = PoolMode.getImplementingClass(mProps
113:                        .getPoolMode());
114:                mPoolHandle = PoolMode.loadPoolInstance(poolClass, mProps
115:                        .getPoolFile(), PoolMode.SINGLETON_LOAD);
116:                mExecPools = pools;
117:                mNumOfPools = new Integer(pools.size());
118:
119:                //load the local environment variables
120:                mLocalEnv = loadLocalEnvVariables();
121:                //load the credential if the user has set the
122:                //corresponding environment variable.
123:                mCredential = (mLocalEnv.containsKey(ENV.X509_USER_PROXY_KEY)) ?
124:                //load the proxy from the path specified
125:                getGSSCredential((String) mLocalEnv
126:                        .get(ENV.X509_USER_PROXY_KEY))
127:                        : null;
128:
129:                if (mCredential == null) {
130:                    //log message
131:                    mLogger
132:                            .log(
133:                                    "Proxy will be picked up from the default location in /tmp",
134:                                    LogManager.DEBUG_MESSAGE_LEVEL);
135:                }
136:
137:                //intialise the worker threads
138:                mWorkers = new AuthenticateThread[this .NUMBER_OF_THREADS];
139:                for (int i = 0; i < NUMBER_OF_THREADS; i++) {
140:                    mWorkers[i] = new AuthenticateThread(i);
141:
142:                    //start the threads
143:                    mWorkers[i].start();
144:                }
145:            }
146:
147:            /**
148:             * This method is called to ensure the clean shutdown of threads, and
149:             * waits till all the requests have been serviced.
150:             */
151:            public void shutdown() {
152:
153:                //mNumOfPools is the CV on which you do a shutdowm
154:                synchronized (mCurrentNum) {
155:
156:                    int numOfPools = mNumOfPools.intValue();
157:                    for (int i = 0; i < NUMBER_OF_THREADS; i++) {
158:                        //send the shutdown signal to the worker threads
159:                        mWorkers[i].shutdown();
160:                    }
161:
162:                    //wake up all the threads on this
163:                    synchronized (mQueue) {
164:                        //mLogger.logMessage("Manager sending notify to all");
165:                        mQueue.notifyAll();
166:                    }
167:
168:                    while (mCurrentNum.getValue() < NUMBER_OF_THREADS) {
169:                        try {
170:                            mCurrentNum.wait();
171:                        } catch (InterruptedException e) {
172:                            mLogger.log(
173:                                    "Manager got interrupted during shutdown"
174:                                            + e.getMessage(),
175:                                    LogManager.ERROR_MESSAGE_LEVEL);
176:                        }
177:                    }
178:                }
179:
180:            }
181:
182:            /**
183:             * Accepts an authentication request, that has to be serviced. It is added
184:             * to queue of requests.
185:             */
186:            public void acceptRequest(Object request) {
187:
188:                //see if any of the worker threads are available
189:                /*for(int i = 0; i < NUMBER_OF_THREADS; i++){
190:                    if(mWorkers[i].isAvailable()){
191:                        //no need to add to queue.
192:                    }
193:                }*/
194:
195:                synchronized (mQueue) {
196:                    mQueue.addLast(request);
197:                    //send a notification to a worker thread
198:                    mQueue.notify();
199:                }
200:
201:            }
202:
203:            /**
204:             * Reads in the environment variables into memory from the properties file
205:             * and the pool catalog.
206:             *
207:             * @return  the <code>ENV</code> namespace object holding the environment
208:             *          variables.
209:             */
210:            private ENV loadLocalEnvVariables() {
211:                //assumes that pool handle, and property handle are initialized.
212:                ENV env = new ENV();
213:
214:                //load from the pool.config
215:                env.checkKeyInNS(mPoolHandle.getPoolProfile("local",
216:                        Profile.ENV));
217:                //load from property file
218:                env.checkKeyInNS(mProps.getLocalPoolEnvVar());
219:
220:                return env;
221:            }
222:
223:            /**
224:             * Loads a GSSCredential from the proxy file residing at the path specified.
225:             *
226:             * @param file the path to the proxy file.
227:             *
228:             * @return GSSCredential
229:             *         null in case the file format is wrong, or file does not exist.
230:             */
231:            private GSSCredential getGSSCredential(String file) {
232:                File f = new File(file);
233:                GSSCredential gcred = null;
234:                //sanity check first
235:                if (!f.exists()) {
236:                    return null;
237:                }
238:
239:                try {
240:                    byte[] data = new byte[(int) f.length()];
241:                    FileInputStream in = new FileInputStream(f);
242:                    in.read(data);
243:                    in.close();
244:
245:                    ExtendedGSSManager manager = (ExtendedGSSManager) ExtendedGSSManager
246:                            .getInstance();
247:
248:                    gcred = manager.createCredential(data,
249:                            ExtendedGSSCredential.IMPEXP_OPAQUE,
250:                            GSSCredential.DEFAULT_LIFETIME, null,
251:                            GSSCredential.INITIATE_AND_ACCEPT);
252:                    mLogger.log(
253:                            "Loaded the credential from proxy file " + file,
254:                            LogManager.DEBUG_MESSAGE_LEVEL);
255:
256:                } catch (Exception e) {
257:                    mLogger.log("Unable to load proxy from file" + file + " "
258:                            + e.getMessage(), LogManager.ERROR_MESSAGE_LEVEL);
259:                }
260:                return gcred;
261:            }
262:
263:            /**
264:             * A thread as an inner class, that authenticates against one particular
265:             * pool.
266:             */
267:            class AuthenticateThread implements  Runnable {
268:
269:                /**
270:                 * The pool against which to authenticate.
271:                 */
272:                private String mPool;
273:
274:                /**
275:                 * The thread object that is used to launch the thread.
276:                 */
277:                private Thread mThread;
278:
279:                /**
280:                 * Whether the thread is available to do some work or not.
281:                 */
282:                private boolean mAvailable;
283:
284:                /**
285:                 * Whether to shutdown or not.
286:                 */
287:                private boolean mShutdown;
288:
289:                /**
290:                 * The unique identifying id of the thread.
291:                 */
292:                private int mIndex;
293:
294:                /**
295:                 * The overloaded constructor.
296:                 *
297:                 *
298:                 */
299:                public AuthenticateThread(int index) {
300:                    mAvailable = true;
301:                    mShutdown = false;
302:                    mIndex = index;
303:                }
304:
305:                /**
306:                 * The start method for the thread. It initialises the thread and calls
307:                 * it's start method.
308:                 */
309:                public void start() {
310:                    mThread = new Thread(this );
311:                    mThread.start();
312:                }
313:
314:                /**
315:                 * Returns whether a thread is available to do some work or not.
316:                 */
317:                public boolean isAvailable() {
318:                    return mAvailable;
319:                }
320:
321:                /**
322:                 * Sets the shutdown flag to true. This does not make the thread stop.
323:                 * The thread only stops when it's current request is serviced and the
324:                 * queue is empty.
325:                 */
326:                public void shutdown() {
327:                    mShutdown = true;
328:                }
329:
330:                /**
331:                 * Calls the corresponding join method of the thread associated with
332:                 * this class.
333:                 *
334:                 * @param millis   The time to wait in milliseconds.
335:                 */
336:                public void join(long millis) throws InterruptedException {
337:                    mThread.join(millis);
338:                }
339:
340:                /**
341:                 * The runnable method of the thread, that is called when the thread is
342:                 * started.
343:                 */
344:                public void run() {
345:                    AuthenticateRequest ar;
346:                    Authenticate a = new Authenticate(mProps, mPoolHandle);
347:                    a.setCredential(mCredential);
348:                    boolean authenticated = false;
349:
350:                    for (;;) {
351:                        //remain in an infinite loop and wait for a request to be released
352:                        //from the queue.
353:                        ar = getAuthenticateRequest();
354:                        if (ar == null) {
355:                            //no more requests to service and the shutdown signal has
356:                            //been received. send the notification to the manager and exit
357:                            mLogger.log("Thread [" + mIndex
358:                                    + "] got shutdown signal",
359:                                    LogManager.DEBUG_MESSAGE_LEVEL);
360:                            synchronized (mCurrentNum) {
361:                                mCurrentNum.increment();
362:                                mCurrentNum.notify();
363:                            }
364:
365:                            break;
366:                        }
367:
368:                        //means worker is busy, processing a request.
369:                        mAvailable = false;
370:                        //do the processing.
371:                        authenticated = a.authenticate(ar);
372:                        mLogger
373:                                .log("Thread [" + mIndex
374:                                        + "] Authentication of " + ar
375:                                        + " successful:" + authenticated,
376:                                        LogManager.DEBUG_MESSAGE_LEVEL);
377:                        if (!authenticated) {
378:                            //we need to remove
379:                            boolean removal = a.removeResource(ar);
380:                            mLogger.log("Thread [" + mIndex
381:                                    + "] Removal of resource" + ar
382:                                    + " successful:" + removal,
383:                                    LogManager.DEBUG_MESSAGE_LEVEL);
384:                        }
385:                        mAvailable = true;
386:                        //be nice and sleep
387:                        try {
388:                            mThread.sleep(5);
389:                        } catch (InterruptedException ex) {
390:                            mLogger.log("Authenticate Thread [" + mIndex
391:                                    + "] got interrupted while waiting",
392:                                    LogManager.DEBUG_MESSAGE_LEVEL);
393:                            //go into sleep again
394:                            continue;
395:                        }
396:
397:                    }
398:
399:                }
400:
401:                /**
402:                 * Returns an authentication request to the worker thread.
403:                 *
404:                 * @return  the authentication request.
405:                 */
406:                public AuthenticateRequest getAuthenticateRequest() {
407:                    synchronized (mQueue) {
408:
409:                        for (;;) {
410:                            if (mQueue.isEmpty() && mShutdown) {
411:                                //no more requests to service and the shutdown signal has
412:                                //been received.
413:                                return null;
414:                            } else if (mQueue.isEmpty()) {
415:                                //there is nothing in the queue so wait on it.
416:                                try {
417:                                    mLogger.log("Thread [" + mIndex
418:                                            + "] going to wait",
419:                                            LogManager.DEBUG_MESSAGE_LEVEL);
420:                                    mQueue.wait();
421:                                    //again check for empty queue and shutdown signal
422:                                    if (mQueue.isEmpty() && !mShutdown)
423:                                        //go back to the wait state to receive a new
424:                                        //request or a AR request
425:                                        continue;
426:                                } catch (InterruptedException ex) {
427:                                    mLogger
428:                                            .log(
429:                                                    "Authenticate Thread ["
430:                                                            + mIndex
431:                                                            + "] got interrupted while waiting "
432:                                                            + ex.getMessage(),
433:                                                    LogManager.ERROR_MESSAGE_LEVEL);
434:                                    //go into sleep again
435:                                    continue;
436:                                }
437:
438:                            }
439:                            return (mQueue.isEmpty() && mShutdown) ?
440:                            //indicates shutdown
441:                            null
442:                                    : (AuthenticateRequest) mQueue
443:                                            .removeFirst();
444:
445:                        }
446:
447:                    }
448:                }
449:
450:            }
451:
452:            /**
453:             * A wrapper around an int that acts as a Condition Variable, and is used
454:             * as such. In behaviour it is probably closer to a semaphore.
455:             */
456:            class ConditionVariable {
457:
458:                /**
459:                 * The int that is associated with this object.
460:                 */
461:                private int value;
462:
463:                /**
464:                 * The default constructor.
465:                 */
466:                public ConditionVariable() {
467:                    value = 0;
468:                }
469:
470:                /**
471:                 * It increments the value by 1.
472:                 */
473:                public void increment() {
474:                    value++;
475:                }
476:
477:                /**
478:                 * Returns the value.
479:                 */
480:                public int getValue() {
481:                    return value;
482:                }
483:            }
484:
485:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.