001: /*
002: * This file or a portion of this file is licensed under the terms of
003: * the Globus Toolkit Public License, found in file GTPL, or at
004: * http://www.globus.org/toolkit/download/license.html. This notice must
005: * appear in redistributions of this file, with or without modification.
006: *
007: * Redistributions of this Software, with or without modification, must
008: * reproduce the GTPL in: (1) the Software, or (2) the Documentation or
009: * some other similar material which is provided with the Software (if
010: * any).
011: *
012: * Copyright 1999-2004 University of Chicago and The University of
013: * Southern California. All rights reserved.
014: */
015:
016: package org.griphyn.cPlanner.engine;
017:
018: import java.io.File;
019: import java.io.FileInputStream;
020: import java.util.LinkedList;
021: import java.util.Set;
022:
023: import org.gridforum.jgss.ExtendedGSSCredential;
024: import org.gridforum.jgss.ExtendedGSSManager;
025: import org.griphyn.cPlanner.classes.AuthenticateRequest;
026: import org.griphyn.cPlanner.classes.Profile;
027: import org.griphyn.cPlanner.common.LogManager;
028: import org.griphyn.cPlanner.common.PegasusProperties;
029: import org.griphyn.cPlanner.namespace.ENV;
030: import org.griphyn.cPlanner.poolinfo.PoolInfoProvider;
031: import org.griphyn.cPlanner.poolinfo.PoolMode;
032: import org.ietf.jgss.GSSCredential;
033:
034: /**
035: * This maintains a pool of authenticate threads that authenticate against a
036: * particular resource.
037: *
038: * @author Karan Vahi
039: * @version $Revision: 50 $
040: */
041:
042: public class ThreadPool {
043:
044: /**
045: * The maximum number of authentication threads that are spawned.
046: */
047: public static final int NUMBER_OF_THREADS = 5;
048:
049: /**
050: * The request queue that holds the authenticate requests. The worker
051: * threads do access this job queue.
052: */
053: private LinkedList mQueue;
054:
055: /**
056: * The handle to the properties object.
057: */
058: private PegasusProperties mProps;
059:
060: /**
061: * The handle to the Pool Info Provider.
062: */
063: private PoolInfoProvider mPoolHandle;
064:
065: /**
066: * The handle to the LogManager object.
067: */
068: private LogManager mLogger;
069:
070: /**
071: * The Set of pools that need to be authenticated against.
072: */
073: private Set mExecPools;
074:
075: /**
076: * The number of pools that one has to authenticate against.
077: */
078: private Integer mNumOfPools;
079:
080: /**
081: * The handle to the pool of threads that this thread pool is reponsible for.
082: */
083: private AuthenticateThread[] mWorkers;
084:
085: /**
086: * The condition variable that is used to synchronize the shutdown.
087: */
088: private ConditionVariable mCurrentNum;
089:
090: /**
091: * The namespace object holding the environment variables for local
092: * pool.
093: */
094: private ENV mLocalEnv;
095:
096: /**
097: * The credential loaded from the non default location if specified.
098: */
099: private GSSCredential mCredential;
100:
101: /**
102: * The overloaded constructor.
103: *
104: * @param properties the <code>PegasusProperties</code> to be used.
105: * @param pools the set of pools against which the user is authenticating.
106: */
107: public ThreadPool(PegasusProperties properties, Set pools) {
108: mQueue = new LinkedList();
109: mCurrentNum = new ConditionVariable();
110: mProps = properties;
111: mLogger = LogManager.getInstance();
112: String poolClass = PoolMode.getImplementingClass(mProps
113: .getPoolMode());
114: mPoolHandle = PoolMode.loadPoolInstance(poolClass, mProps
115: .getPoolFile(), PoolMode.SINGLETON_LOAD);
116: mExecPools = pools;
117: mNumOfPools = new Integer(pools.size());
118:
119: //load the local environment variables
120: mLocalEnv = loadLocalEnvVariables();
121: //load the credential if the user has set the
122: //corresponding environment variable.
123: mCredential = (mLocalEnv.containsKey(ENV.X509_USER_PROXY_KEY)) ?
124: //load the proxy from the path specified
125: getGSSCredential((String) mLocalEnv
126: .get(ENV.X509_USER_PROXY_KEY))
127: : null;
128:
129: if (mCredential == null) {
130: //log message
131: mLogger
132: .log(
133: "Proxy will be picked up from the default location in /tmp",
134: LogManager.DEBUG_MESSAGE_LEVEL);
135: }
136:
137: //intialise the worker threads
138: mWorkers = new AuthenticateThread[this .NUMBER_OF_THREADS];
139: for (int i = 0; i < NUMBER_OF_THREADS; i++) {
140: mWorkers[i] = new AuthenticateThread(i);
141:
142: //start the threads
143: mWorkers[i].start();
144: }
145: }
146:
147: /**
148: * This method is called to ensure the clean shutdown of threads, and
149: * waits till all the requests have been serviced.
150: */
151: public void shutdown() {
152:
153: //mNumOfPools is the CV on which you do a shutdowm
154: synchronized (mCurrentNum) {
155:
156: int numOfPools = mNumOfPools.intValue();
157: for (int i = 0; i < NUMBER_OF_THREADS; i++) {
158: //send the shutdown signal to the worker threads
159: mWorkers[i].shutdown();
160: }
161:
162: //wake up all the threads on this
163: synchronized (mQueue) {
164: //mLogger.logMessage("Manager sending notify to all");
165: mQueue.notifyAll();
166: }
167:
168: while (mCurrentNum.getValue() < NUMBER_OF_THREADS) {
169: try {
170: mCurrentNum.wait();
171: } catch (InterruptedException e) {
172: mLogger.log(
173: "Manager got interrupted during shutdown"
174: + e.getMessage(),
175: LogManager.ERROR_MESSAGE_LEVEL);
176: }
177: }
178: }
179:
180: }
181:
182: /**
183: * Accepts an authentication request, that has to be serviced. It is added
184: * to queue of requests.
185: */
186: public void acceptRequest(Object request) {
187:
188: //see if any of the worker threads are available
189: /*for(int i = 0; i < NUMBER_OF_THREADS; i++){
190: if(mWorkers[i].isAvailable()){
191: //no need to add to queue.
192: }
193: }*/
194:
195: synchronized (mQueue) {
196: mQueue.addLast(request);
197: //send a notification to a worker thread
198: mQueue.notify();
199: }
200:
201: }
202:
203: /**
204: * Reads in the environment variables into memory from the properties file
205: * and the pool catalog.
206: *
207: * @return the <code>ENV</code> namespace object holding the environment
208: * variables.
209: */
210: private ENV loadLocalEnvVariables() {
211: //assumes that pool handle, and property handle are initialized.
212: ENV env = new ENV();
213:
214: //load from the pool.config
215: env.checkKeyInNS(mPoolHandle.getPoolProfile("local",
216: Profile.ENV));
217: //load from property file
218: env.checkKeyInNS(mProps.getLocalPoolEnvVar());
219:
220: return env;
221: }
222:
223: /**
224: * Loads a GSSCredential from the proxy file residing at the path specified.
225: *
226: * @param file the path to the proxy file.
227: *
228: * @return GSSCredential
229: * null in case the file format is wrong, or file does not exist.
230: */
231: private GSSCredential getGSSCredential(String file) {
232: File f = new File(file);
233: GSSCredential gcred = null;
234: //sanity check first
235: if (!f.exists()) {
236: return null;
237: }
238:
239: try {
240: byte[] data = new byte[(int) f.length()];
241: FileInputStream in = new FileInputStream(f);
242: in.read(data);
243: in.close();
244:
245: ExtendedGSSManager manager = (ExtendedGSSManager) ExtendedGSSManager
246: .getInstance();
247:
248: gcred = manager.createCredential(data,
249: ExtendedGSSCredential.IMPEXP_OPAQUE,
250: GSSCredential.DEFAULT_LIFETIME, null,
251: GSSCredential.INITIATE_AND_ACCEPT);
252: mLogger.log(
253: "Loaded the credential from proxy file " + file,
254: LogManager.DEBUG_MESSAGE_LEVEL);
255:
256: } catch (Exception e) {
257: mLogger.log("Unable to load proxy from file" + file + " "
258: + e.getMessage(), LogManager.ERROR_MESSAGE_LEVEL);
259: }
260: return gcred;
261: }
262:
263: /**
264: * A thread as an inner class, that authenticates against one particular
265: * pool.
266: */
267: class AuthenticateThread implements Runnable {
268:
269: /**
270: * The pool against which to authenticate.
271: */
272: private String mPool;
273:
274: /**
275: * The thread object that is used to launch the thread.
276: */
277: private Thread mThread;
278:
279: /**
280: * Whether the thread is available to do some work or not.
281: */
282: private boolean mAvailable;
283:
284: /**
285: * Whether to shutdown or not.
286: */
287: private boolean mShutdown;
288:
289: /**
290: * The unique identifying id of the thread.
291: */
292: private int mIndex;
293:
294: /**
295: * The overloaded constructor.
296: *
297: *
298: */
299: public AuthenticateThread(int index) {
300: mAvailable = true;
301: mShutdown = false;
302: mIndex = index;
303: }
304:
305: /**
306: * The start method for the thread. It initialises the thread and calls
307: * it's start method.
308: */
309: public void start() {
310: mThread = new Thread(this );
311: mThread.start();
312: }
313:
314: /**
315: * Returns whether a thread is available to do some work or not.
316: */
317: public boolean isAvailable() {
318: return mAvailable;
319: }
320:
321: /**
322: * Sets the shutdown flag to true. This does not make the thread stop.
323: * The thread only stops when it's current request is serviced and the
324: * queue is empty.
325: */
326: public void shutdown() {
327: mShutdown = true;
328: }
329:
330: /**
331: * Calls the corresponding join method of the thread associated with
332: * this class.
333: *
334: * @param millis The time to wait in milliseconds.
335: */
336: public void join(long millis) throws InterruptedException {
337: mThread.join(millis);
338: }
339:
340: /**
341: * The runnable method of the thread, that is called when the thread is
342: * started.
343: */
344: public void run() {
345: AuthenticateRequest ar;
346: Authenticate a = new Authenticate(mProps, mPoolHandle);
347: a.setCredential(mCredential);
348: boolean authenticated = false;
349:
350: for (;;) {
351: //remain in an infinite loop and wait for a request to be released
352: //from the queue.
353: ar = getAuthenticateRequest();
354: if (ar == null) {
355: //no more requests to service and the shutdown signal has
356: //been received. send the notification to the manager and exit
357: mLogger.log("Thread [" + mIndex
358: + "] got shutdown signal",
359: LogManager.DEBUG_MESSAGE_LEVEL);
360: synchronized (mCurrentNum) {
361: mCurrentNum.increment();
362: mCurrentNum.notify();
363: }
364:
365: break;
366: }
367:
368: //means worker is busy, processing a request.
369: mAvailable = false;
370: //do the processing.
371: authenticated = a.authenticate(ar);
372: mLogger
373: .log("Thread [" + mIndex
374: + "] Authentication of " + ar
375: + " successful:" + authenticated,
376: LogManager.DEBUG_MESSAGE_LEVEL);
377: if (!authenticated) {
378: //we need to remove
379: boolean removal = a.removeResource(ar);
380: mLogger.log("Thread [" + mIndex
381: + "] Removal of resource" + ar
382: + " successful:" + removal,
383: LogManager.DEBUG_MESSAGE_LEVEL);
384: }
385: mAvailable = true;
386: //be nice and sleep
387: try {
388: mThread.sleep(5);
389: } catch (InterruptedException ex) {
390: mLogger.log("Authenticate Thread [" + mIndex
391: + "] got interrupted while waiting",
392: LogManager.DEBUG_MESSAGE_LEVEL);
393: //go into sleep again
394: continue;
395: }
396:
397: }
398:
399: }
400:
401: /**
402: * Returns an authentication request to the worker thread.
403: *
404: * @return the authentication request.
405: */
406: public AuthenticateRequest getAuthenticateRequest() {
407: synchronized (mQueue) {
408:
409: for (;;) {
410: if (mQueue.isEmpty() && mShutdown) {
411: //no more requests to service and the shutdown signal has
412: //been received.
413: return null;
414: } else if (mQueue.isEmpty()) {
415: //there is nothing in the queue so wait on it.
416: try {
417: mLogger.log("Thread [" + mIndex
418: + "] going to wait",
419: LogManager.DEBUG_MESSAGE_LEVEL);
420: mQueue.wait();
421: //again check for empty queue and shutdown signal
422: if (mQueue.isEmpty() && !mShutdown)
423: //go back to the wait state to receive a new
424: //request or a AR request
425: continue;
426: } catch (InterruptedException ex) {
427: mLogger
428: .log(
429: "Authenticate Thread ["
430: + mIndex
431: + "] got interrupted while waiting "
432: + ex.getMessage(),
433: LogManager.ERROR_MESSAGE_LEVEL);
434: //go into sleep again
435: continue;
436: }
437:
438: }
439: return (mQueue.isEmpty() && mShutdown) ?
440: //indicates shutdown
441: null
442: : (AuthenticateRequest) mQueue
443: .removeFirst();
444:
445: }
446:
447: }
448: }
449:
450: }
451:
452: /**
453: * A wrapper around an int that acts as a Condition Variable, and is used
454: * as such. In behaviour it is probably closer to a semaphore.
455: */
456: class ConditionVariable {
457:
458: /**
459: * The int that is associated with this object.
460: */
461: private int value;
462:
463: /**
464: * The default constructor.
465: */
466: public ConditionVariable() {
467: value = 0;
468: }
469:
470: /**
471: * It increments the value by 1.
472: */
473: public void increment() {
474: value++;
475: }
476:
477: /**
478: * Returns the value.
479: */
480: public int getValue() {
481: return value;
482: }
483: }
484:
485: }
|