001: /*
002: Copyright (C) 2003 Know Gate S.L. All rights reserved.
003: C/Oņa, 107 1š2 28050 Madrid (Spain)
004:
005: Redistribution and use in source and binary forms, with or without
006: modification, are permitted provided that the following conditions
007: are met:
008:
009: 1. Redistributions of source code must retain the above copyright
010: notice, this list of conditions and the following disclaimer.
011:
012: 2. The end-user documentation included with the redistribution,
013: if any, must include the following acknowledgment:
014: "This product includes software parts from hipergate
015: (http://www.hipergate.org/)."
016: Alternately, this acknowledgment may appear in the software itself,
017: if and wherever such third-party acknowledgments normally appear.
018:
019: 3. The name hipergate must not be used to endorse or promote products
020: derived from this software without prior written permission.
021: Products derived from this software may not be called hipergate,
022: nor may hipergate appear in their name, without prior written
023: permission.
024:
025: This library is distributed in the hope that it will be useful,
026: but WITHOUT ANY WARRANTY; without even the implied warranty of
027: MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
028:
029: You should have received a copy of hipergate License with this code;
030: if not, visit http://www.hipergate.org or mail to info@hipergate.org
031: */
032:
033: package com.knowgate.scheduler;
034:
035: import java.sql.SQLException;
036:
037: import java.util.Properties;
038: import java.util.LinkedList;
039: import java.util.ListIterator;
040: import java.util.HashMap;
041: import java.util.Iterator;
042:
043: import com.knowgate.dataobjs.DB;
044: import com.knowgate.debug.DebugFile;
045: import com.knowgate.jdc.JDCConnection;
046:
047: /**
048: * WorkerThread Pool
049: * @author Sergio Montoro Ten
050: * @version 3.0
051: */
052:
053: public class WorkerThreadPool {
054:
055: private WorkerThread aThreads[];
056: private long aStartTime[];
057: private Properties oEnvProps;
058:
059: // ---------------------------------------------------------------------------
060:
061: /**
062: * <p>Create WorkerThreadPool</p>
063: * thread Pool size is readed from maxschedulerthreads property of oEnvironmentProps,
064: * the default value is 1.
065: * Each thread is given the name WorkerThread_<i>n</i>
066: * @param oAtomConsumer Atom Consumer Object to be used
067: * @param oEnvironmentProps Environment Properties collection
068: * (usually readed from hipergate.cnf)
069: */
070: public WorkerThreadPool(AtomConsumer oAtomConsumer,
071: Properties oEnvironmentProps) {
072: int nThreads = Integer.parseInt(oEnvironmentProps.getProperty(
073: "maxschedulerthreads", "1"));
074:
075: if (DebugFile.trace)
076: DebugFile.writeln("maxschedulerthreads="
077: + String.valueOf(nThreads));
078:
079: oEnvProps = oEnvironmentProps;
080: aThreads = new WorkerThread[nThreads];
081: aStartTime = new long[nThreads];
082:
083: for (int t = 0; t < nThreads; t++) {
084: if (DebugFile.trace)
085: DebugFile.writeln("new WorkerThread("
086: + String.valueOf(t) + ")");
087:
088: aThreads[t] = new WorkerThread(this , oAtomConsumer);
089:
090: aThreads[t].setName("WorkerThread_" + String.valueOf(t));
091: } // next(t)
092: }
093:
094: // ---------------------------------------------------------------------------
095:
096: /**
097: * Get Pool Size
098: */
099: public int size() {
100: return aThreads.length;
101: }
102:
103: // ---------------------------------------------------------------------------
104:
105: /**
106: * Get Environment properties collection from hipergate.cnf
107: */
108: public Properties getProperties() {
109: return oEnvProps;
110: }
111:
112: // ---------------------------------------------------------------------------
113:
114: /**
115: * Get Environment property
116: * @return
117: */
118: public String getProperty(String sKey) {
119: return oEnvProps.getProperty(sKey);
120: }
121:
122: // ---------------------------------------------------------------------------
123:
124: public long getRunningTimeMS() {
125: long lRunningTime = 0l;
126: for (int t = 0; t < aThreads.length; t++)
127: lRunningTime += aThreads[t].getRunningTimeMS();
128: return lRunningTime;
129: }
130:
131: // ---------------------------------------------------------------------------
132:
133: /**
134: * Launch all WorkerThreads and start consuming atoms from queue.
135: */
136: public void launchAll() {
137: for (int t = 0; t < aThreads.length; t++) {
138: if (!aThreads[t].isAlive()) {
139: aStartTime[t] = new java.util.Date().getTime();
140: aThreads[t].start();
141: }
142: } // next
143: } // launchAll
144:
145: // ---------------------------------------------------------------------------
146:
147: /**
148: * Count of currently active WorkerThreads
149: */
150: public int livethreads() {
151: int iLive = 0;
152:
153: for (int t = 0; t < aThreads.length; t++) {
154: if (aThreads[t].isAlive()) {
155: iLive++;
156: }
157: } // next
158: return iLive;
159: } // livethreads
160:
161: // ---------------------------------------------------------------------------
162:
163: public WorkerThread[] threads() {
164: return aThreads;
165: }
166:
167: // ---------------------------------------------------------------------------
168:
169: /**
170: * Get array of atoms currently running at live WorkerThreads
171: * @return Atom[]
172: * @since 3.0
173: */
174: public Atom[] runningAtoms() {
175: if (livethreads() == 0)
176: return null;
177: Atom[] aAtoms = new Atom[livethreads()];
178: int a = 0;
179: final int iThreads = aThreads.length;
180: for (int t = 0; t < iThreads; t++) {
181: if (aThreads[t].isAlive()) {
182: aAtoms[a++] = aThreads[t].activeAtom();
183: } // fi
184: } // next (t)
185: return aAtoms;
186: } // runningAtoms
187:
188: // ---------------------------------------------------------------------------
189:
190: /**
191: * Get array with GUIDs of Jobs currently run by live WorkerThreads
192: * @return String[] Job GUID array
193: * @since 3.0
194: */
195: public String[] runningJobs() {
196: Atom[] aAtoms = runningAtoms();
197: if (aAtoms == null)
198: return null;
199: LinkedList oJobs = new LinkedList();
200: String sJob;
201: int nAtoms = aAtoms.length;
202: for (int a = 0; a < nAtoms; a++) {
203: sJob = aAtoms[a].getString(DB.gu_job);
204: if (oJobs.contains(sJob))
205: oJobs.add(sJob);
206: }
207: if (oJobs.size() == 0)
208: return null;
209: String[] aJobs = new String[oJobs.size()];
210: ListIterator oIter = oJobs.listIterator();
211: int j = 0;
212: while (oIter.hasNext()) {
213: aJobs[j] = (String) oIter.next();
214: } // wend
215: return aJobs;
216: } // runningJobs
217:
218: // ---------------------------------------------------------------------------
219:
220: /**
221: * Register a thread callback object for each thread in this pool
222: * @param oNewCallback WorkerThreadCallback subclass instance
223: * @throws IllegalArgumentException If a callback with same name has oNewCallback was already registered
224: */
225: public void registerCallback(WorkerThreadCallback oNewCallback)
226: throws IllegalArgumentException {
227: final int iThreads = aThreads.length;
228: for (int t = 0; t < iThreads; t++)
229: aThreads[t].registerCallback(oNewCallback);
230: } // registerCallback
231:
232: // ---------------------------------------------------------------------------
233:
234: /**
235: * Unregister a thread callback object for each thread in this pool
236: * @param sCallbackName Name of callback to be unregistered
237: */
238: public void unregisterCallback(String sCallbackName) {
239: final int iThreads = aThreads.length;
240:
241: for (int t = 0; t < iThreads; t++)
242: aThreads[t].unregisterCallback(sCallbackName);
243:
244: } // unregisterCallback
245:
246: // --------------------------------------------------------------------------
247:
248: /**
249: * <p>Halt all pooled threads commiting any pending operations before stoping</p>
250: * If a thread is dead-locked by any reason halting it will not cause any effect.<br>
251: * halt() method only sends a signals to the each WokerThread telling it that must
252: * finish pending operations and stop.
253: */
254: public void haltAll() {
255: final int iThreads = aThreads.length;
256: for (int t = 0; t < iThreads; t++)
257: aThreads[t].halt();
258: }
259:
260: // --------------------------------------------------------------------------
261:
262: /**
263: * <p>Call stop() on every thread of the pool which is alive</p>
264: * This method should only be used when threads cannot be stopped by calling
265: * haltAll()
266: * @deprecated Use stopAll(JDCConnection) instead
267: */
268: public void stopAll() {
269: final int iThreads = aThreads.length;
270: for (int t = 0; t < iThreads; t++) {
271: if (aThreads[t].isAlive())
272: aThreads[t].stop();
273: }
274: }
275:
276: // ---------------------------------------------------------------------------
277:
278: /**
279: * <p>Call stop() on every thread of the pool which is alive</p>
280: * All running atoms are set to STATUS_INTERRUPTED
281: * @since 3.0
282: */
283: public void stopAll(JDCConnection oConn) throws SQLException {
284: final int iThreads = aThreads.length;
285: Atom oActiveAtom;
286: for (int t = 0; t < iThreads; t++) {
287: oActiveAtom = aThreads[t].activeAtom();
288: if (null != oActiveAtom)
289: oActiveAtom.setStatus(oConn, Atom.STATUS_INTERRUPTED,
290: "Interrupted by user");
291: if (aThreads[t].isAlive())
292: aThreads[t].stop();
293: }
294: }
295:
296: // ---------------------------------------------------------------------------
297:
298: } // WorkerThreadPool
|