001: /*
002: * Javolution - Java(TM) Solution for Real-Time and Embedded Systems
003: * Copyright (C) 2006 - Javolution (http://javolution.org/)
004: * All rights reserved.
005: *
006: * Permission to use, copy, modify, and distribute this software is
007: * freely granted, provided that this notice is preserved.
008: */
009: package javolution.context;
010:
011: import j2mex.realtime.MemoryArea;
012:
013: import javolution.lang.Configurable;
014: import javolution.lang.MathLib;
015: import javolution.lang.Reflection;
016:
017: /**
018: * <p> This class represents a context to take advantage of concurrent
019: * algorithms on multi-processors systems.</p>
020: *
021: * <p> When a thread enters a concurrent context, it may performs concurrent
022: * executions by calling the {@link #execute(Runnable)} static method.
023: * The logic is then executed by a concurrent thread or by the current
024: * thread itself if there is no concurrent thread immediately available
025: * (the number of concurrent threads is limited, see
026: * <a href="{@docRoot}/overview-summary.html#configuration">
027: * Javolution Configuration</a> for details).</p>
028: *
029: * <p> Only after all concurrent executions are completed, is the current
030: * thread allowed to exit the scope of the concurrent context
031: * (internal synchronization).</p>
032: *
033: * <p> Concurrent logics always execute within the same {@link Context} as
034: * the calling thread. For example, if the main thread runs in a
035: * {@link StackContext}, concurrent executions are performed in the
036: * same {@link StackContext} as well.</p>
037: *
038: * <p> Concurrent contexts are easy to use, and provide automatic
039: * load-balancing between processors with almost no overhead. Here is
040: * an example of <b>concurrent/recursive</b> implementation of the
041: * Karatsuba multiplication for large integers:[code]
042: * public LargeInteger multiply(LargeInteger that) {
043: * if (that._size <= 1) {
044: * return multiply(that.longValue()); // Direct multiplication.
045: * } else { // Karatsuba multiplication in O(n^log2(3))
046: * int bitLength = this.bitLength();
047: * int n = (bitLength >> 1) + (bitLength & 1);
048: *
049: * // this = a + 2^n b, that = c + 2^n d
050: * LargeInteger b = this.shiftRight(n);
051: * LargeInteger a = this.minus(b.shiftLeft(n));
052: * LargeInteger d = that.shiftRight(n);
053: * LargeInteger c = that.minus(d.shiftLeft(n));
054: * Multiply ac = Multiply.valueOf(a, c);
055: * Multiply bd = Multiply.valueOf(b, d);
056: * Multiply abcd = Multiply.valueOf(a.plus(b), c.plus(d));
057: * ConcurrentContext.enter();
058: * try {
059: * ConcurrentContext.execute(ac);
060: * ConcurrentContext.execute(bd);
061: * ConcurrentContext.execute(abcd);
062: * } finally {
063: * ConcurrentContext.exit(); // Waits for all concurrent threads to complete.
064: * }
065: * // a*c + ((a+b)*(c+d)-a*c-b*d) 2^n + b*d 2^2n
066: * return ac.value().plus(
067: * abcd.value().minus(ac.value().plus(bd.value())).shiftWordLeft(n)).plus(
068: * bd.value().shiftWordLeft(n << 1));
069: * }
070: * }
071: * private static class Multiply implements Runnable {
072: * LargeInteger _left, _right, _value;
073: * static Multiply valueOf(LargeInteger left, LargeInteger right) {
074: * Multiply multiply = new Multiply(); // Or use an ObjectFactory (to allow stack allocation).
075: * multiply._left = left;
076: * multiply._right = right;
077: * return multiply;
078: * }
079: * public void run() {
080: * _value = _left.times(_right); // Recursive.
081: * }
082: * public LargeInteger value() {
083: * return _result;
084: * }
085: * };[/code]
086: *
087: * Here is a concurrent/recursive quick/merge sort using anonymous inner
088: * classes (the same method is used for
089: * <a href="http://javolution.org/doc/benchmark.html">benchmark</a>):[code]
090: * private void quickSort(final FastTable<? extends Comparable> table) {
091: * final int size = table.size();
092: * if (size < 100) {
093: * table.sort(); // Direct quick sort.
094: * } else {
095: * // Splits table in two and sort both part concurrently.
096: * final FastTable<? extends Comparable> t1 = FastTable.newInstance();
097: * final FastTable<? extends Comparable> t2 = FastTable.newInstance();
098: * ConcurrentContext.enter();
099: * try {
100: * ConcurrentContext.execute(new Runnable() {
101: * public void run() {
102: * t1.addAll(table.subList(0, size / 2));
103: * quickSort(t1); // Recursive.
104: * }
105: * });
106: * ConcurrentContext.execute(new Runnable() {
107: * public void run() {
108: * t2.addAll(table.subList(size / 2, size));
109: * quickSort(t2); // Recursive.
110: * }
111: * });
112: * } finally {
113: * ConcurrentContext.exit();
114: * }
115: * // Merges results.
116: * for (int i=0, i1=0, i2=0; i < size; i++) {
117: * if (i1 >= t1.size()) {
118: * table.set(i, t2.get(i2++));
119: * } else if (i2 >= t2.size()) {
120: * table.set(i, t1.get(i1++));
121: * } else {
122: * Comparable o1 = t1.get(i1);
123: * Comparable o2 = t2.get(i2);
124: * if (o1.compareTo(o2) < 0) {
125: * table.set(i, o1);
126: * i1++;
127: * } else {
128: * table.set(i, o2);
129: * i2++;
130: * }
131: * }
132: * }
133: * FastTable.recycle(t1);
134: * FastTable.recycle(t2);
135: * }
136: * }[/code]
137: *
138: * <p> Concurrent contexts ensure the same behavior whether or not the execution
139: * is performed by the current thread or a concurrent thread. Any exception
140: * raised during the concurrent logic executions is propagated to the
141: * current thread.</p>
142: *
143: * <p> {@link #getConcurrency() Concurrency} can be {@link LocalContext locally}
144: * adjusted. For example:[code]
145: * LocalContext.enter();
146: * try { // Do not use more than half of the processors during analysis.
147: * ConcurrentContext.setConcurrency((Runtime.getRuntime().availableProcessors() / 2) - 1);
148: * runAnalysis(); // Use concurrent contexts internally.
149: * } finally {
150: * LocalContext.exit();
151: * }[/code] </p>
152: * It should be noted that the concurrency cannot be increased above the
153: * configurable {@link #MAXIMUM_CONCURRENCY maximum concurrency}.
154: * In other words, if the maximum concurrency is <code>0</code>,
155: * concurrency is disabled regardless of local concurrency settings.</p>
156: *
157: * @author <a href="mailto:jean-marie@dautelle.com">Jean-Marie Dautelle</a>
158: * @version 5.1, July 2, 2007
159: */
160: public abstract class ConcurrentContext extends Context {
161:
162: /**
163: * Holds the default implementation. Concurrent executions are performed
164: * in the same memory area and at the same priority as the calling thread.
165: * This implementation uses <code>javax.realtime.RealtimeThread</code>
166: * for concurrent threads. Alternative (RTSJ) implementations could also use
167: * <code>javax.realtime.NoHeapRealtimeThread</code>.
168: */
169: public static final Configurable/*<Class<? extends ConcurrentContext>>*/
170: DEFAULT = new Configurable(Default.CLASS);
171:
172: /**
173: * Holds the maximum number of concurrent executors
174: * (see <a href="{@docRoot}/overview-summary.html#configuration">
175: * Javolution Configuration</a> for details).
176: */
177: public static final Configurable/*<Integer>*/MAXIMUM_CONCURRENCY = new Configurable(
178: new Integer(availableProcessors() - 1)) {
179: protected void notifyChange() { // The maximum concurrency is also the default concurrency.
180: CONCURRENCY.setDefault(this .get());
181: }
182: };
183:
184: private static int availableProcessors() {
185: Reflection.Method availableProcessors = Reflection
186: .getMethod("java.lang.Runtime.availableProcessors()");
187: if (availableProcessors != null) {
188: Integer processors = (Integer) availableProcessors
189: .invoke(Runtime.getRuntime());
190: return processors.intValue();
191: } else { // J2ME.
192: return 1;
193: }
194: }
195:
196: /**
197: * Holds the current concurrency.
198: */
199: private static final LocalContext.Reference CONCURRENCY = new LocalContext.Reference(
200: MAXIMUM_CONCURRENCY.get());
201:
202: /**
203: * Default constructor.
204: */
205: protected ConcurrentContext() {
206: }
207:
208: /**
209: * Enters a concurrent context (instance of {@link #DEFAULT}).
210: *
211: * @return the concurrent context being entered.
212: */
213: public static ConcurrentContext enter() {
214: return (ConcurrentContext) Context.enter((Class) DEFAULT.get());
215: }
216:
217: /**
218: * Exits the current concurrent context.
219: *
220: * @return the concurrent context being exited.
221: * @throws ClassCastException if the context is not a concurrent context.
222: */
223: public static/*ConcurrentContext*/Context exit() {
224: return (ConcurrentContext) Context.exit();
225: }
226:
227: /**
228: * Set the {@link LocalContext local} concurrency. Concurrency is
229: * hard limited by {@link #MAXIMUM_CONCURRENCY}.
230: *
231: * @param concurrency the new concurrency (<code>0</code> or negative
232: * number to disable concurrency).
233: */
234: public static void setConcurrency(int concurrency) {
235: concurrency = MathLib.max(0, concurrency);
236: concurrency = MathLib.min(((Integer) MAXIMUM_CONCURRENCY.get())
237: .intValue(), concurrency);
238: CONCURRENCY.set(new Integer(concurrency));
239: }
240:
241: /**
242: * Returns the {@link LocalContext local} concurrency.
243: *
244: * @return the maximum number of concurrent thread.
245: */
246: public static int getConcurrency() {
247: return ((Integer) CONCURRENCY.get()).intValue();
248: }
249:
250: /**
251: * Executes the specified logic by a concurrent thread if
252: * one available; otherwise the logic is executed by the current thread.
253: * Any exception or error occuring during concurrent executions is
254: * propagated to the current thread upon {@link #exit}
255: * of the concurrent context.
256: *
257: * @param logic the logic to execute concurrently if possible.
258: * @throws ClassCastException if the current context is not a
259: * {@link ConcurrentContext}.
260: */
261: public static void execute(Runnable logic) {
262: ConcurrentContext ctx = (ConcurrentContext) Context
263: .getCurrent();
264: ctx.executeAction(logic);
265: }
266:
267: /**
268: * Executes the specified logic concurrently if possible.
269: *
270: * @param logic the logic to execute.
271: */
272: protected abstract void executeAction(Runnable logic);
273:
274: /**
275: * Default implementation using {@link ConcurrentThread} executors.
276: */
277: static final class Default extends ConcurrentContext {
278:
279: private static final Class CLASS = new Default().getClass();
280:
281: /**
282: * Holds the concurrent executors.
283: */
284: private static ConcurrentThread[] _Executors = new ConcurrentThread[0];
285:
286: /**
287: * Holds the executors creation logic (to be performed in
288: * ImmortalMemory). The number of executors can only be increased
289: * (typically through configuration).
290: */
291: private static final Runnable CREATE_EXECUTORS = new Runnable() {
292: public synchronized void run() {
293: int max = ((Integer) MAXIMUM_CONCURRENCY.get())
294: .intValue();
295: int count = _Executors.length;
296: if (count >= max)
297: return; // We have enough executors.
298: ConcurrentThread[] executors = new ConcurrentThread[max];
299: System.arraycopy(_Executors, 0, executors, 0, count);
300: for (int i = count; i < max; i++) {
301: executors[i] = new ConcurrentThread();
302: executors[i].start();
303: }
304: _Executors = executors;
305: }
306: };
307:
308: /**
309: * Holds the concurrency.
310: */
311: private int _concurrency;
312:
313: /**
314: * Holds any error occuring during concurrent execution.
315: */
316: private Throwable _error;
317:
318: /**
319: * Holds the number of concurrent execution initiated.
320: */
321: private int _initiated;
322:
323: /**
324: * Holds the number of concurrent execution completed.
325: */
326: private int _completed;
327:
328: // Implements Context abstract method.
329: protected void enterAction() {
330: _error = null;
331: _initiated = 0;
332: _completed = 0;
333: _concurrency = ConcurrentContext.getConcurrency();
334: if (_concurrency > _Executors.length) { // We need more executors.
335: MemoryArea.getMemoryArea(_Executors).executeInArea(
336: CREATE_EXECUTORS);
337: }
338: }
339:
340: // Implements ConcurrentContext abstract method.
341: protected void executeAction(Runnable logic) {
342: for (int i = _concurrency; --i >= 0;) {
343: if (_Executors[i].execute(logic, this )) {
344: _initiated++;
345: return; // Done concurrently.
346: }
347: }
348: // Execution by current thread.
349: logic.run();
350: }
351:
352: // Implements Context abstract method.
353: protected void exitAction() {
354: synchronized (this ) {
355: while (_initiated != _completed) {
356: try {
357: this .wait();
358: } catch (InterruptedException e) {
359: throw new ConcurrentException(e);
360: }
361: }
362: }
363: // Propagates any concurrent error to current thread.
364: if (_error != null) {
365: if (_error instanceof RuntimeException)
366: throw ((RuntimeException) _error);
367: if (_error instanceof Error)
368: throw ((Error) _error);
369: throw new ConcurrentException(_error); // Wrapper.
370: }
371:
372: }
373:
374: // Called when a concurrent execution starts.
375: void started() {
376: Context.setCurrent(this );
377: }
378:
379: // Called when a concurrent execution finishes.
380: void completed() {
381: synchronized (this ) {
382: _completed++;
383: this .notify();
384: }
385: ((AllocatorContext) AllocatorContext.getCurrent())
386: .deactivate();
387: }
388:
389: // Called when an error occurs.
390: void error(Throwable error) {
391: synchronized (this ) {
392: if (_error == null) { // First error.
393: _error = error;
394: }
395: }
396: }
397: }
398:
399: // Allows instances of private classes to be factory produced.
400: static {
401: ObjectFactory.setInstance(new ObjectFactory() {
402: protected Object create() {
403: return new Default();
404: }
405: }, Default.CLASS);
406: }
407:
408: }
|