001: /*
002: * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
003: *
004: * Copyright 1997-2007 Sun Microsystems, Inc. All rights reserved.
005: *
006: * The contents of this file are subject to the terms of either the GNU General
007: * Public License Version 2 only ("GPL") or the Common Development and Distribution
008: * License("CDDL") (collectively, the "License"). You may not use this file except in
009: * compliance with the License. You can obtain a copy of the License at
010: * http://www.netbeans.org/cddl-gplv2.html or nbbuild/licenses/CDDL-GPL-2-CP. See the
011: * License for the specific language governing permissions and limitations under the
012: * License. When distributing the software, include this License Header Notice in
013: * each file and include the License file at nbbuild/licenses/CDDL-GPL-2-CP. Sun
014: * designates this particular file as subject to the "Classpath" exception as
015: * provided by Sun in the GPL Version 2 section of the License file that
016: * accompanied this code. If applicable, add the following below the License Header,
017: * with the fields enclosed by brackets [] replaced by your own identifying
018: * information: "Portions Copyrighted [year] [name of copyright owner]"
019: *
020: * Contributor(s):
021: *
022: * The Original Software is NetBeans. The Initial Developer of the Original Software
023: * is Sun Microsystems, Inc. Portions Copyright 1997-2007 Sun Microsystems, Inc. All
024: * Rights Reserved.
025: *
026: * If you wish your version of this file to be governed by only the CDDL or only the
027: * GPL Version 2, indicate your decision by adding "[Contributor] elects to include
028: * this software in this distribution under the [CDDL or GPL Version 2] license." If
029: * you do not indicate a single choice of license, a recipient has the option to
030: * distribute your version of this file under either the CDDL, the GPL Version 2 or
031: * to extend the choice of license to its licensees as provided above. However, if
032: * you add GPL Version 2 code and therefore, elected the GPL Version 2 license, then
033: * the option applies only if the new code is made subject to such option by the
034: * copyright holder.
035: */
036:
037: package org.netbeans.installer.downloader.dispatcher.impl;
038:
039: import java.util.LinkedList;
040: import java.util.Queue;
041: import java.util.Set;
042: import java.util.HashMap;
043: import java.util.HashSet;
044: import java.util.Map;
045: import java.util.concurrent.ArrayBlockingQueue;
046: import java.util.concurrent.BlockingQueue;
047: import org.netbeans.installer.downloader.dispatcher.LoadFactor;
048: import org.netbeans.installer.downloader.dispatcher.Process;
049: import org.netbeans.installer.downloader.dispatcher.ProcessDispatcher;
050: import org.netbeans.installer.utils.ErrorManager;
051: import org.netbeans.installer.utils.SystemUtils;
052: import org.netbeans.installer.utils.helper.MutualHashMap;
053: import org.netbeans.installer.utils.helper.MutualMap;
054:
055: /**
056: * @author Danila_Dugurov
057: */
058: public class RoundRobinDispatcher implements ProcessDispatcher {
059: /////////////////////////////////////////////////////////////////////////////////
060: // Static
061: private static final Map<LoadFactor, Byte> quantumToSkip = new HashMap<LoadFactor, Byte>();
062:
063: static {
064: quantumToSkip.put(LoadFactor.FULL, (byte) 0);
065: quantumToSkip.put(LoadFactor.AVERAGE, (byte) 2);
066: quantumToSkip.put(LoadFactor.LOW, (byte) 10);
067: }
068:
069: /////////////////////////////////////////////////////////////////////////////////
070: // Instance
071: private final int timeQuantum;
072: private final WorkersPool pool;
073: private final BlockingQueue<Worker> workingQueue;
074: private final Queue<Process> waitingQueue;
075: private final MutualMap<Process, Worker> proc2Worker;
076: private Set<Worker> makedToStop = new HashSet<Worker>();
077:
078: private Thread dispatcherThread;
079: private Terminator terminator = new Terminator();
080: private boolean isActive;
081: private LoadFactor factor;
082:
083: public RoundRobinDispatcher(final int quantum, final int poolSize) {
084: if (quantum < 10 || poolSize < 1) {
085: throw new IllegalArgumentException();
086: }
087: this .timeQuantum = quantum;
088: this .pool = new WorkersPool(poolSize);
089: workingQueue = new ArrayBlockingQueue<Worker>(poolSize);
090: waitingQueue = new LinkedList<Process>();
091: proc2Worker = new MutualHashMap<Process, Worker>();
092: factor = LoadFactor.FULL;
093: }
094:
095: public synchronized boolean schedule(final Process process) {
096: synchronized (waitingQueue) {
097: waitingQueue.offer(process);
098: waitingQueue.notify();
099: }
100: return true;
101: }
102:
103: public synchronized void terminate(final Process process) {
104: synchronized (waitingQueue) {
105: if (waitingQueue.remove(process)) {
106: return;
107: }
108: }
109: final Worker stoped = proc2Worker.get(process);
110: makedToStop.add(stoped);
111: terminateInternal(process);
112: }
113:
114: public void setLoadFactor(final LoadFactor factor) {
115: this .factor = factor;
116: }
117:
118: public LoadFactor loadFactor() {
119: return factor;
120: }
121:
122: private void terminateInternal(final Process process) {
123: final Worker worker = proc2Worker.get(process);
124: if (worker == null) {
125: return;
126: }
127: if (worker.isFree()) {
128: return;
129: }
130: if (!terminator.isAlive()) {
131: terminator.start();
132: }
133: terminator.terminate(process);
134: SystemUtils.sleep(timeQuantum);
135: if (terminator.isBusy()) {
136: terminator.stop();
137: terminator = new Terminator();
138: }
139: if (!worker.isFree()) {
140: worker.stop();
141: }
142: proc2Worker.remove(process);
143: pool.release(worker);
144: workingQueue.remove(worker);
145: }
146:
147: public synchronized boolean isActive() {
148: return isActive;
149: }
150:
151: // for tracknig perpose no synchronization so no sure of correctness
152: public int activeCount() {
153: return proc2Worker.size();
154: }
155:
156: // for tracknig perpose no synchronization so no sure of correctness
157: public int waitingCount() {
158: return waitingQueue.size();
159: }
160:
161: public synchronized void start() {
162: if (isActive) {
163: return;
164: }
165: dispatcherThread = new Thread(new DispatcherWorker());
166: dispatcherThread.setDaemon(true);
167: dispatcherThread.start();
168: isActive = true;
169: }
170:
171: public synchronized void stop() {
172: if (!isActive) {
173: return;
174: }
175: dispatcherThread.interrupt();
176: try {
177: dispatcherThread
178: .join((timeQuantum) * (pool.capacity() + 3));
179: } catch (InterruptedException exit) {
180: } finally {
181: //this condition mustn't happens to true
182: if (dispatcherThread.isAlive()) {
183: dispatcherThread.stop();
184: }
185: }
186: waitingQueue.clear();
187: isActive = false;
188: }
189:
190: /////////////////////////////////////////////////////////////////////////////////
191: // Inner Classes
192: private class DispatcherWorker implements Runnable {
193: Worker current;
194:
195: public void run() {
196: while (true) {
197: if (Thread.interrupted()) {
198: break;
199: }
200: try {
201: current = workingQueue.poll();
202: if (current == null
203: || makedToStop.contains(current)) {
204: synchronized (waitingQueue) {
205: if (waitingQueue.isEmpty()) {
206: waitingQueue.wait();
207: }
208: }
209: filWorkingQueue();
210: continue;
211: }
212: invokeCurrent();
213: Thread.sleep(timeQuantum);
214: suspendCurrent();
215: if (factor != LoadFactor.FULL) {
216: Thread.sleep(quantumToSkip.get(factor)
217: * timeQuantum);
218: }
219: } catch (InterruptedException exit) {
220: suspendCurrent();
221: break;
222: }
223: }
224: terminateAll();
225: }
226:
227: private void terminateAll() {
228: for (Worker worker : workingQueue.toArray(new Worker[0])) {
229: terminateInternal(proc2Worker.reversedGet(worker));
230: }
231: }
232:
233: private void invokeCurrent() {
234: switch (current.getState()) {
235: case NEW:
236: current.start();
237: break;
238: case RUNNABLE:
239: current.resume();
240: break;
241: case TERMINATED:
242: break;
243: default:
244: current.resume();
245: //temprorary while blocking queue not impl.
246: }
247: }
248:
249: private void suspendCurrent() {
250: if (current == null) {
251: return;
252: }
253: if (makedToStop.contains(current)) {
254: return;
255: }
256: current.suspend();
257: if (current.isAlive() && !current.isFree()) {
258: workingQueue.offer(current);
259: } else {
260: proc2Worker.reversedRemove(current);
261: pool.release(current);
262: }
263: filWorkingQueue();
264: }
265:
266: private void filWorkingQueue() {
267: if (waitingQueue.size() == 0 || pool.remaining() == 0) {
268: return;
269: }
270: synchronized (waitingQueue) {
271: while (workingQueue.remainingCapacity() > 0) {
272: if (waitingQueue.isEmpty()) {
273: return;
274: }
275: final Worker worker = pool.tryAcquire();
276: final Process process = waitingQueue.poll();
277: worker.setCurrent(process);
278: proc2Worker.put(process, worker);
279: makedToStop.remove(worker);
280: workingQueue.add(worker);
281: }
282: }
283: }
284: }
285:
286: private class Terminator extends Thread {
287: private Process current;
288:
289: public Terminator() {
290: setDaemon(true);
291: }
292:
293: public synchronized void terminate(final Process process) {
294: current = process;
295: notifyAll();
296: }
297:
298: @Override
299: public void run() {
300: while (true) {
301: synchronized (this ) {
302: try {
303: Thread.interrupted();
304: if (current == null) {
305: wait();
306: if (current == null) {
307: continue;
308: }
309: }
310: final Worker worker = proc2Worker.get(current);
311: worker.resume();
312: worker.interrupt();
313: try {
314: current.terminate();
315: } catch (Exception ignored) { //may be log?
316: }
317: current = null;
318: } catch (InterruptedException e) {
319: ErrorManager.notifyDebug(
320: "Terminator thread interrupted", e); // NOI18N
321: break;
322: }
323: }
324: }
325: }
326:
327: public synchronized boolean isBusy() {
328: return current == null;
329: }
330: }
331: }
|