0001: /*
0002: * $RCSfile: SunTileScheduler.java,v $
0003: *
0004: * Copyright (c) 2005 Sun Microsystems, Inc. All rights reserved.
0005: *
0006: * Use is subject to license terms.
0007: *
0008: * $Revision: 1.1 $
0009: * $Date: 2005/02/11 04:57:02 $
0010: * $State: Exp $
0011: */
0012: package com.sun.media.jai.util;
0013:
0014: import java.awt.Point;
0015: import java.awt.RenderingHints;
0016: import java.awt.image.Raster;
0017: import java.io.ByteArrayOutputStream;
0018: import java.io.PrintStream;
0019: import java.lang.ref.WeakReference;
0020: import java.lang.reflect.Method;
0021: import java.math.BigInteger;
0022: import java.util.ArrayList;
0023: import java.util.Arrays;
0024: import java.util.Collections;
0025: import java.util.HashMap;
0026: import java.util.HashSet;
0027: import java.util.Hashtable;
0028: import java.util.Iterator;
0029: import java.util.LinkedList;
0030: import java.util.List;
0031: import java.util.ListIterator;
0032: import java.util.Map;
0033: import java.util.Set;
0034: import java.util.Vector;
0035: import javax.media.jai.OpImage;
0036: import javax.media.jai.PlanarImage;
0037: import javax.media.jai.TileCache;
0038: import javax.media.jai.TileComputationListener;
0039: import javax.media.jai.TileRequest;
0040: import javax.media.jai.TileScheduler;
0041: import javax.media.jai.remote.SerializableRenderedImage;
0042: import javax.media.jai.util.ImagingException;
0043: import javax.media.jai.util.ImagingListener;
0044: import com.sun.media.jai.util.ImageUtil;
0045:
0046: /**
0047: * A class representing a request for non-prefetch background computation
0048: * of tiles. The object stores the image, the indices of all tiles being
0049: * requested, and references to all listeners associated with the request.
0050: *
0051: * <code>TileRequest</code> methods are not commented.
0052: */
0053: class Request implements TileRequest {
0054:
0055: private final TileScheduler scheduler;
0056:
0057: final PlanarImage image;
0058: final List indices;
0059: final Set listeners;
0060:
0061: final Hashtable tileStatus;
0062:
0063: /**
0064: * Constructs a <code>Request</code>.
0065: *
0066: * @param scheduler The scheduler processing this request.
0067: * @param image The image for which tiles are being computed.
0068: * @param tileIndices The indices of the tiles to be computed.
0069: * @param tileListeners The listeners to be notified of tile
0070: * computation, cancellation, or failure.
0071: *
0072: * @exception IllegalArgumentException if <code>scheduler</code>,
0073: * <code>image</code>, or <code>tileIndices</code> is
0074: * <code>null</code> or if <code>tileIndices</code> is
0075: * zero-length.
0076: */
0077: Request(TileScheduler scheduler, PlanarImage image,
0078: Point[] tileIndices, TileComputationListener[] tileListeners) {
0079:
0080: // Save a reference to the scheduler.
0081: if (scheduler == null) {
0082: throw new IllegalArgumentException(); // Internal error - no message.
0083: }
0084: this .scheduler = scheduler;
0085:
0086: // Save a reference to the image.
0087: if (image == null) {
0088: throw new IllegalArgumentException(); // Internal error - no message.
0089: }
0090: this .image = image;
0091:
0092: // Ensure there is at least one tile in the request.
0093: if (tileIndices == null || tileIndices.length == 0) {
0094: // If this happens it is an internal programming error.
0095: throw new IllegalArgumentException(); // Internal error - no message.
0096: }
0097:
0098: // Save the tile indices.
0099: indices = Arrays.asList(tileIndices);
0100:
0101: // Save references to the listeners, if any.
0102: if (tileListeners != null) {
0103: int numListeners = tileListeners.length;
0104: if (numListeners > 0) {
0105: listeners = new HashSet(numListeners);
0106: for (int i = 0; i < numListeners; i++) {
0107: listeners.add(tileListeners[i]);
0108: }
0109: } else {
0110: listeners = null;
0111: }
0112: } else {
0113: listeners = null;
0114: }
0115:
0116: // Initialize status table.
0117: tileStatus = new Hashtable(tileIndices.length);
0118: }
0119:
0120: // --- TileRequest implementation ---
0121:
0122: public PlanarImage getImage() {
0123: return image;
0124: }
0125:
0126: public Point[] getTileIndices() {
0127: return (Point[]) indices.toArray(new Point[0]);
0128: }
0129:
0130: public TileComputationListener[] getTileListeners() {
0131: return (TileComputationListener[]) listeners
0132: .toArray(new TileComputationListener[0]);
0133: }
0134:
0135: public boolean isStatusAvailable() {
0136: return true;
0137: }
0138:
0139: public int getTileStatus(int tileX, int tileY) {
0140: Point p = new Point(tileX, tileY);
0141:
0142: int status;
0143: if (tileStatus.containsKey(p)) {
0144: status = ((Integer) tileStatus.get(p)).intValue();
0145: } else {
0146: status = TileRequest.TILE_STATUS_PENDING;
0147: }
0148:
0149: return status;
0150: }
0151:
0152: public void cancelTiles(Point[] tileIndices) {
0153: // Forward the call to the scheduler.
0154: scheduler.cancelTiles(this , tileIndices);
0155: }
0156: }
0157:
0158: /** A job to put in a job queue. */
0159: interface Job {
0160: /** Computes the job required. */
0161: void compute();
0162:
0163: /** Returns <code>true</code> if the job is not done. */
0164: boolean notDone();
0165:
0166: /** Returns the image for which tiles are being computed. */
0167: PlanarImage getOwner();
0168:
0169: /**
0170: * Returns <code>true</code> if and only if the job should block the
0171: * thread which processes it. In this case the scheduler and the
0172: * processing thread must communicate using <code>wait()</code> and
0173: * <code>notify()</code>.
0174: */
0175: boolean isBlocking();
0176:
0177: /** Returns the first exception encountered or <code>null</code>. */
0178: Exception getException();
0179: }
0180:
0181: /**
0182: * A <code>Job</code> which computes a single tile at a time for a
0183: * non-prefetch background job queued by the version of scheduleTiles()
0184: * which returns a <code>TileRequest</code>. This <code>Job</code>
0185: * notifies all <code>TileComputationListener</code>s of all
0186: * <code>TileRequest</code>s with which this tile is associated of
0187: * whether the tile was computed or the computation failed.
0188: */
0189: final class RequestJob implements Job {
0190:
0191: final SunTileScheduler scheduler; // the TileScheduler
0192:
0193: final PlanarImage owner; // the image this tile belongs to
0194: final int tileX; // tile's X index
0195: final int tileY; // tile's Y index
0196: final Raster[] tiles; // the computed tiles
0197: final int offset; // offset into arrays
0198:
0199: boolean done = false; // flag indicating completion status
0200: Exception exception = null; // Any exception that might have occured
0201:
0202: // during computeTile
0203:
0204: /** Constructor. */
0205: RequestJob(SunTileScheduler scheduler, PlanarImage owner,
0206: int tileX, int tileY, Raster[] tiles, int offset) {
0207: this .scheduler = scheduler;
0208: this .owner = owner;
0209: this .tileX = tileX;
0210: this .tileY = tileY;
0211: this .tiles = tiles;
0212: this .offset = offset;
0213: }
0214:
0215: /**
0216: * Tile computation. Does the actual call to getTile().
0217: */
0218: public void compute() {
0219: // Get the Request List.
0220: List reqList;
0221: synchronized (scheduler.tileRequests) {
0222: // Initialize the tile ID.
0223: Object tileID = SunTileScheduler.tileKey(owner, tileX,
0224: tileY);
0225:
0226: // Remove the List of Requests from the request Map.
0227: reqList = (List) scheduler.tileRequests.remove(tileID);
0228:
0229: // Remove the tile Job from the job Map.
0230: scheduler.tileJobs.remove(tileID);
0231: }
0232:
0233: // Check whether reqList is valid in case job was cancelled while
0234: // blocking on the tileRequests Map above.
0235: // XXX Do not need empty check in next line?
0236: if (reqList != null && !reqList.isEmpty()) {
0237: // Update tile status to "processing".
0238: Point p = new Point(tileX, tileY);
0239: Integer tileStatus = new Integer(
0240: TileRequest.TILE_STATUS_PROCESSING);
0241: Iterator reqIter = reqList.iterator();
0242: while (reqIter.hasNext()) {
0243: Request r = (Request) reqIter.next();
0244: r.tileStatus.put(p, tileStatus);
0245: }
0246:
0247: try {
0248: tiles[offset] = owner.getTile(tileX, tileY);
0249: } catch (Exception e) {
0250: exception = e;
0251: } finally {
0252: // Extract the Set of all TileComputationListeners.
0253: int numReq = reqList.size();
0254: Set listeners = SunTileScheduler.getListeners(reqList);
0255:
0256: // XXX Do not need empty check in next line.
0257: if (listeners != null && !listeners.isEmpty()) {
0258: // Get TileRequests as an array for later use.
0259: TileRequest[] requests = (TileRequest[]) reqList
0260: .toArray(new TileRequest[0]);
0261:
0262: // Update tile status as needed.
0263: tileStatus = new Integer(
0264: exception == null ? TileRequest.TILE_STATUS_COMPUTED
0265: : TileRequest.TILE_STATUS_FAILED);
0266: for (int i = 0; i < numReq; i++) {
0267: ((Request) requests[i]).tileStatus.put(p,
0268: tileStatus);
0269: }
0270:
0271: // Create an Iterator over the listeners.
0272: Iterator iter = listeners.iterator();
0273:
0274: // Notify listeners.
0275: if (exception == null) {
0276: // Tile computation successful.
0277: while (iter.hasNext()) {
0278: TileComputationListener listener = (TileComputationListener) iter
0279: .next();
0280: listener.tileComputed(scheduler, requests,
0281: owner, tileX, tileY, tiles[offset]);
0282: }
0283: } else {
0284: // Tile computation unsuccessful.
0285: while (iter.hasNext()) {
0286: TileComputationListener listener = (TileComputationListener) iter
0287: .next();
0288: listener.tileComputationFailure(scheduler,
0289: requests, owner, tileX, tileY,
0290: exception);
0291: }
0292: }
0293: }
0294: }
0295: }
0296:
0297: // Set the flag indicating job completion.
0298: done = true;
0299: }
0300:
0301: /**
0302: * Returns <code>true</code> if the job is not done; that is,
0303: * the tile is not computed and no exceptions have occurred.
0304: */
0305: public boolean notDone() {
0306: return !done;
0307: }
0308:
0309: /** Returns the image for which the tile is being computed. */
0310: public PlanarImage getOwner() {
0311: return owner;
0312: }
0313:
0314: /** Always returns <code>true</code>. */
0315: public boolean isBlocking() {
0316: return false;
0317: }
0318:
0319: /** Returns any encountered exception or <code>null</code>. */
0320: public Exception getException() {
0321: return exception;
0322: }
0323:
0324: /** Returns a string representation of the class object. */
0325: public String toString() {
0326: String tString = "null";
0327: if (tiles[offset] != null) {
0328: tString = tiles[offset].toString();
0329: }
0330: return getClass().getName() + "@"
0331: + Integer.toHexString(hashCode()) + ": owner = "
0332: + owner.toString() + " tileX = "
0333: + Integer.toString(tileX) + " tileY = "
0334: + Integer.toString(tileY) + " tile = " + tString;
0335: }
0336: }
0337:
0338: /**
0339: * A <code>Job</code> which computes one or more tiles at a time for either
0340: * a prefetch job or a blocking job.
0341: */
0342: final class TileJob implements Job {
0343:
0344: final SunTileScheduler scheduler; // the TileScheduler
0345:
0346: final boolean isBlocking; // whether the job is blocking
0347: final PlanarImage owner; // the image this tile belongs to
0348: final Point[] tileIndices; // the tile indices
0349: final Raster[] tiles; // the computed tiles
0350: final int offset; // offset into arrays
0351: final int numTiles; // number of elements to use in indices array
0352:
0353: boolean done = false; // flag indicating completion status
0354: Exception exception = null; // The first exception that might have
0355:
0356: // occured during computeTile
0357:
0358: /** Constructor. */
0359: TileJob(SunTileScheduler scheduler, boolean isBlocking,
0360: PlanarImage owner, Point[] tileIndices, Raster[] tiles,
0361: int offset, int numTiles) {
0362: this .scheduler = scheduler;
0363: this .isBlocking = isBlocking;
0364: this .owner = owner;
0365: this .tileIndices = tileIndices;
0366: this .tiles = tiles;
0367: this .offset = offset;
0368: this .numTiles = numTiles;
0369: }
0370:
0371: /**
0372: * Tile computation. Does the actual calls to getTile().
0373: */
0374: public void compute() {
0375: exception = scheduler.compute(owner, tileIndices, tiles,
0376: offset, numTiles, null);
0377: done = true;
0378: }
0379:
0380: /**
0381: * Returns <code>true</code> if the job is not done; that is,
0382: * the tile is not computed and no exceptions have occurred.
0383: */
0384: public boolean notDone() {
0385: return !done;
0386: }
0387:
0388: /** Returns the image for which tiles are being computed. */
0389: public PlanarImage getOwner() {
0390: return owner;
0391: }
0392:
0393: /** Returns <code>true</code> if and only if there is a listener. */
0394: public boolean isBlocking() {
0395: return isBlocking;
0396: }
0397:
0398: /** Returns any encountered exception or <code>null</code>. */
0399: public Exception getException() {
0400: return exception;
0401: }
0402: }
0403:
0404: /**
0405: * Worker thread that takes jobs from the tile computation queue and does
0406: * the actual computation.
0407: */
0408: class WorkerThread extends Thread {
0409:
0410: /** <code>Object</code> indicating the the thread should exit. */
0411: public static final Object TERMINATE = new Object();
0412:
0413: /** The scheduler that spawned this thread. */
0414: SunTileScheduler scheduler;
0415:
0416: /** Whether this is a prefetch thread. */
0417: boolean isPrefetch;
0418:
0419: /** Constructor. */
0420: public WorkerThread(ThreadGroup group, SunTileScheduler scheduler,
0421: boolean isPrefetch) {
0422: super (group, group.getName() + group.activeCount());
0423: this .scheduler = scheduler;
0424: this .isPrefetch = isPrefetch;
0425:
0426: setDaemon(true);
0427: start();
0428: }
0429:
0430: /** Does the tile computation. */
0431: public void run() {
0432: LinkedList jobQueue = scheduler.getQueue(isPrefetch);
0433:
0434: while (true) {
0435: Object dequeuedObject = null;
0436:
0437: // Check the job queue.
0438: synchronized (jobQueue) {
0439: if (jobQueue.size() > 0) {
0440: // Remove the first job.
0441: dequeuedObject = jobQueue.removeFirst();
0442: } else {
0443: try {
0444: // Wait for a notify() on the queue.
0445: jobQueue.wait();
0446: continue;
0447: } catch (InterruptedException ie) {
0448: // Ignore: should never happen.
0449: }
0450: }
0451: }
0452:
0453: if (dequeuedObject == TERMINATE || getThreadGroup() == null
0454: || getThreadGroup().isDestroyed()) {
0455: // Remove WorkerThread from appropriate Vector.
0456: Vector threads;
0457: synchronized (threads = scheduler
0458: .getWorkers(isPrefetch)) {
0459: threads.remove(this );
0460: }
0461:
0462: // Exit the thread.
0463: return;
0464: }
0465:
0466: Job job = (Job) dequeuedObject;
0467:
0468: // Execute tile job.
0469: if (job != null) {
0470: job.compute();
0471:
0472: // Notify the scheduler only if the Job is blocking.
0473: if (job.isBlocking()) {
0474: synchronized (scheduler) {
0475: scheduler.notify();
0476: }
0477: }
0478: }
0479: } // infinite loop
0480: }
0481: }
0482:
0483: /**
0484: * This is Sun Microsystems' reference implementation of the
0485: * <code>javax.media.jai.TileScheduler</code> interface. It provides
0486: * a mechanism for scheduling tile calculation. Multi-threading is
0487: * used whenever possible.
0488: *
0489: * @see javax.media.jai.TileScheduler
0490: */
0491: public final class SunTileScheduler implements TileScheduler {
0492:
0493: /** The default number of worker threads. */
0494: private static final int NUM_THREADS_DEFAULT = 2;
0495:
0496: /** The default number of worker threads. */
0497: private static final int NUM_PREFETCH_THREADS_DEFAULT = 1;
0498:
0499: /** The instance counter. It is used to compose the name of the
0500: * ThreadGroup.
0501: */
0502: private static int numInstances = 0;
0503:
0504: /** The tile schedular name. It is used to compose the name of the
0505: * ThreadGroup.
0506: */
0507: private static String name = JaiI18N
0508: .getString("SunTileSchedulerName");
0509:
0510: /** The root ThreadGroup, which holds two sub-groups:
0511: * the ThreadGroup for the standard jobs, and the ThreadGroup for
0512: * the prefetch jobs.
0513: */
0514: private ThreadGroup rootGroup;
0515:
0516: /** The ThreadGroup contains all the standard jobs. */
0517: private ThreadGroup standardGroup;
0518:
0519: /** The ThreadGroup contains all the prefetch jobs. */
0520: private ThreadGroup prefetchGroup;
0521:
0522: /** The worker thread parallelism. */
0523: private int parallelism = NUM_THREADS_DEFAULT;
0524:
0525: /** The processing thread parallelism. */
0526: private int prefetchParallelism = NUM_PREFETCH_THREADS_DEFAULT;
0527:
0528: /** The worker thread priority. */
0529: private int priority = Thread.NORM_PRIORITY;
0530:
0531: /** The prefetch thread priority. */
0532: private int prefetchPriority = Thread.MIN_PRIORITY;
0533:
0534: /** A job queue for tiles waiting to be computed by the worker threads. */
0535: private LinkedList queue = null;
0536:
0537: /** A job queue for tiles waiting to be computed by prefetch workers. */
0538: private LinkedList prefetchQueue = null;
0539:
0540: /**
0541: * A <code>Vector</code> of <code>WorkerThread</code>s that persist
0542: * to do the actual tile computation for normal processing. This
0543: * variable should never be set to <code>null</code>.
0544: */
0545: private Vector workers = new Vector();
0546:
0547: /**
0548: * A <code>Vector</code> of <code>WorkerThread</code>s that persist
0549: * to do the actual tile computation for prefetch processing. This
0550: * variable should never be set to <code>null</code>.
0551: */
0552: private Vector prefetchWorkers = new Vector();
0553:
0554: /**
0555: * The effective number of worker threads; may differ from
0556: * <code>workers.size()</code> due to latency. This value should
0557: * equal the size of <code>workers</code> less the number of
0558: * <code>WorkerThread.TERMINATE</code>s in <code>queue</code>.
0559: */
0560: private int numWorkerThreads = 0;
0561:
0562: /**
0563: * The effective number of prefetch worker threads; may differ from
0564: * <code>prefetchWorkers.size()</code> due to latency. This value should
0565: * equal the size of <code>prefetchWorkers</code> less the number of
0566: * <code>WorkerThread.TERMINATE</code>s in <code>prefetchQueue</code>.
0567: */
0568: private int numPrefetchThreads = 0;
0569:
0570: /**
0571: * <code>Map</code> of tiles currently being computed. The key is
0572: * created from the image and tile indices by the <code>tileKey()</code>
0573: * method. Each key is mapped to an <code>Object[1]</code> which may
0574: * contain <code>null</code>, a <code>Raster</code>, or an indefinite
0575: * <code>Object</code> which represent, respectively, that the tile is
0576: * being computed, the tile itself, and that the tile computation failed.
0577: */
0578: private Map tilesInProgress = new HashMap();
0579:
0580: /**
0581: * <code>Map</code> of tiles to <code>Request</code>s. The key is
0582: * created from the image and tile indices by the <code>tileKey()</code>
0583: * method. Each key is mapped to a <code>List</code> of
0584: * <code>Request</code> for the tile. If there is no mapping for the
0585: * tile, then there are no current requests. If a mapping exists, it
0586: * should always be non-null and the <code>List</code> value should
0587: * have size of at least unity.
0588: */
0589: Map tileRequests = new HashMap();
0590:
0591: /**
0592: * <code>Map</code> of tiles to <code>Job</code>s.The key is
0593: * created from the image and tile indices by the <code>tileKey()</code>
0594: * method. Each key is mapped to a <code>Job</code> for the tile. If
0595: * there is no mapping for the tile, then there is no enqueued
0596: * <code>RequestJob</code>.
0597: */
0598: Map tileJobs = new HashMap();
0599:
0600: /** The name of this instance. */
0601: private String nameOfThisInstance;
0602:
0603: /**
0604: * Returns the hash table "key" as a <code>Object</code> for this
0605: * tile. For <code>PlanarImage</code> and
0606: * <code>SerializableRenderedImage</code>, the key is generated by
0607: * the method <code>ImageUtilgenerateID(Object) </code>. For the
0608: * other cases, a <code>Long</code> object is returned.
0609: * The upper 32 bits for this <code>Long</code> is the tile owner's
0610: * hash code, and the lower 32 bits is the tile's index.
0611: */
0612: static Object tileKey(PlanarImage owner, int tileX, int tileY) {
0613: long idx = tileY * (long) owner.getNumXTiles() + tileX;
0614:
0615: BigInteger imageID = (BigInteger) owner.getImageID();
0616: byte[] buf = imageID.toByteArray();
0617: int length = buf.length;
0618: byte[] buf1 = new byte[length + 8];
0619: System.arraycopy(buf, 0, buf1, 0, length);
0620: for (int i = 7, j = 0; i >= 0; i--, j += 8)
0621: buf1[length++] = (byte) (idx >> j);
0622: return new BigInteger(buf1);
0623: }
0624:
0625: /**
0626: * Returns all <code>TileComputationListener</code>s for the supplied
0627: * <code>List</code> of <code>Request</code>s.
0628: */
0629: static Set getListeners(List reqList) {
0630: // Extract the Set of all TileComputationListeners.
0631: int numReq = reqList.size();
0632: HashSet listeners = null;
0633: for (int j = 0; j < numReq; j++) {
0634: Request req = (Request) reqList.get(j);
0635: // XXX Do not need empty check in next line.
0636: if (req.listeners != null && !req.listeners.isEmpty()) {
0637: if (listeners == null) {
0638: listeners = new HashSet();
0639: }
0640: listeners.addAll(req.listeners);
0641: }
0642: }
0643:
0644: return listeners;
0645: }
0646:
0647: /**
0648: * Converts the supplied <code>Exception</code>'s stack trace
0649: * to a <code>String</code>.
0650: */
0651: private static String getStackTraceString(Throwable e) {
0652: ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
0653: PrintStream printStream = new PrintStream(byteStream);
0654: e.printStackTrace(printStream);
0655: printStream.flush();
0656: String stackTraceString = byteStream.toString();
0657: printStream.close();
0658: return stackTraceString;
0659: }
0660:
0661: /**
0662: * Constructor.
0663: *
0664: * @param parallelism The number of worker threads to do tile computation.
0665: * If this number is less than 1, no multi-threading is used.
0666: * @param priority The priority of worker threads.
0667: * @param prefetchParallelism The number of threads to do prefetching.
0668: * If this number is less than 1, no multi-threading is used.
0669: * @param prefetchPriority The priority of prefetch threads.
0670: */
0671: public SunTileScheduler(int parallelism, int priority,
0672: int prefetchParallelism, int prefetchPriority) {
0673: // Create queues and set parallelism and priority to default values.
0674: this ();
0675:
0676: setParallelism(parallelism);
0677: setPriority(priority);
0678: setPrefetchParallelism(prefetchParallelism);
0679: setPrefetchPriority(prefetchPriority);
0680: }
0681:
0682: /**
0683: * Constructor. Processing and prefetch queues are created and all
0684: * parallelism and priority values are set to default values.
0685: */
0686: public SunTileScheduler() {
0687: queue = new LinkedList();
0688: prefetchQueue = new LinkedList();
0689:
0690: nameOfThisInstance = name + numInstances;
0691: rootGroup = new ThreadGroup(nameOfThisInstance);
0692: rootGroup.setDaemon(true);
0693:
0694: standardGroup = new ThreadGroup(rootGroup, nameOfThisInstance
0695: + "Standard");
0696: standardGroup.setDaemon(true);
0697:
0698: prefetchGroup = new ThreadGroup(rootGroup, nameOfThisInstance
0699: + "Prefetch");
0700: prefetchGroup.setDaemon(true);
0701:
0702: numInstances++;
0703: }
0704:
0705: /**
0706: * Tile computation. Does the actual calls to getTile().
0707: */
0708: Exception compute(PlanarImage owner, Point[] tileIndices,
0709: Raster[] tiles, int offset, int numTiles, Request request) {
0710: Exception exception = null;
0711:
0712: int j = offset;
0713: if (request == null || request.listeners == null) {
0714: for (int i = 0; i < numTiles; i++, j++) {
0715: Point p = tileIndices[j];
0716:
0717: try {
0718: tiles[j] = owner.getTile(p.x, p.y);
0719: } catch (Exception e) {
0720: exception = e;
0721:
0722: // Abort the remaining tiles in the job.
0723: break;
0724: }
0725: }
0726: } else { // listeners present
0727: Request[] reqs = new Request[] { request };
0728: for (int i = 0; i < numTiles; i++, j++) {
0729: Point p = tileIndices[j];
0730:
0731: // Update tile status to "processing".
0732: Integer tileStatus = new Integer(
0733: TileRequest.TILE_STATUS_PROCESSING);
0734: request.tileStatus.put(p, tileStatus);
0735:
0736: try {
0737: tiles[j] = owner.getTile(p.x, p.y);
0738: Iterator iter = request.listeners.iterator();
0739: while (iter.hasNext()) {
0740: // Update tile status to "computed".
0741: tileStatus = new Integer(
0742: TileRequest.TILE_STATUS_COMPUTED);
0743: request.tileStatus.put(p, tileStatus);
0744:
0745: TileComputationListener listener = (TileComputationListener) iter
0746: .next();
0747: listener.tileComputed(this , reqs, owner, p.x,
0748: p.y, tiles[j]);
0749: }
0750: } catch (Exception e) {
0751: exception = e;
0752:
0753: // Abort the remaining tiles in the job.
0754: break;
0755: }
0756: /* XXX
0757: try {
0758: List reqList;
0759: synchronized(tileRequests) {
0760: Long tileID = tileKey(owner, p.x, p.y);
0761: reqList = (List)tileRequests.remove(tileID);
0762: tileJobs.remove(tileID);
0763: }
0764: if(reqList != null) {
0765: tiles[j] = owner.getTile(p.x, p.y);
0766: TileRequest[] reqs =
0767: (TileRequest[])reqList.toArray(new TileRequest[0]);
0768: Set listeners = getListeners(reqList);
0769: if(listeners != null) {
0770: Iterator iter = listeners.iterator();
0771: while(iter.hasNext()) {
0772: TileComputationListener listener =
0773: (TileComputationListener)iter.next();
0774: listener.tileComputed(this,
0775: reqs,
0776: owner,
0777: p.x, p.y,
0778: tiles[j]);
0779: }
0780: }
0781: }
0782: } catch (Exception e) {
0783: exception = e;
0784:
0785: // Abort the remaining tiles in the job.
0786: break;
0787: }
0788: */
0789: }
0790: }
0791:
0792: // If an exception occured, notify listeners that all remaining
0793: // tiles in the job have failed.
0794: if (exception != null && request != null
0795: && request.listeners != null) {
0796: int lastOffset = j;
0797: int numFailed = numTiles - (lastOffset - offset);
0798:
0799: // Mark all tiles starting with the one which generated the
0800: // Exception as "failed".
0801: for (int i = 0, k = lastOffset; i < numFailed; i++) {
0802: Integer tileStatus = new Integer(
0803: TileRequest.TILE_STATUS_FAILED);
0804: request.tileStatus.put(tileIndices[k++], tileStatus);
0805: }
0806:
0807: // Notify listeners.
0808: Request[] reqs = new Request[] { request };
0809: for (int i = 0, k = lastOffset; i < numFailed; i++) {
0810: Point p = tileIndices[k++];
0811: Iterator iter = request.listeners.iterator();
0812: while (iter.hasNext()) {
0813: TileComputationListener listener = (TileComputationListener) iter
0814: .next();
0815: listener.tileComputationFailure(this , reqs, owner,
0816: p.x, p.y, exception);
0817: }
0818: }
0819: }
0820:
0821: /* XXX
0822: if(exception != null) {
0823: int numFailed = numTiles - (j - offset);
0824: for(int i = 0; i < numFailed; i++) {
0825: Point p = tileIndices[j++];
0826: Long tileID = tileKey(owner, p.x, p.y);
0827: List reqList = (List)tileRequests.remove(tileID);
0828: tileJobs.remove(tileID);
0829: if(reqList != null) {
0830: TileRequest[] reqs =
0831: (TileRequest[])reqList.toArray(new TileRequest[0]);
0832: Set listeners = getListeners(reqList);
0833: if(listeners != null) {
0834: Iterator iter = listeners.iterator();
0835: while(iter.hasNext()) {
0836: TileComputationListener listener =
0837: (TileComputationListener)iter.next();
0838: listener.tileComputationFailure(this, reqs,
0839: owner, p.x, p.y,
0840: exception);
0841: }
0842: }
0843: }
0844: }
0845: }
0846: */
0847:
0848: return exception;
0849: }
0850:
0851: /**
0852: * Schedules a single tile for computation.
0853: *
0854: * @param owner The image the tiles belong to.
0855: * @param tileX The tile's X index.
0856: * @param tileY The tile's Y index.
0857: *
0858: * @exception IllegalArgumentException if <code>owner</code> is
0859: * <code>null</code>.
0860: *
0861: * @return The computed tile
0862: */
0863: //
0864: // This method blocks on the 'tilesInProgress' Map to avoid simultaneous
0865: // computation of the same tile in two or more different threads. The idea
0866: // is to release the resources of all but one thread so that the computation
0867: // occurs more quickly. The synchronization variable is an Object[] of length
0868: // unity. The computed tile is passed from the computing thread to the
0869: // waiting threads via the contents of this Object[]. Thus this method does
0870: // not depend on the TileCache to transfer the data.
0871: //
0872: public Raster scheduleTile(OpImage owner, int tileX, int tileY) {
0873: if (owner == null) {
0874: throw new IllegalArgumentException(JaiI18N
0875: .getString("SunTileScheduler1"));
0876: }
0877:
0878: // Eventual tile to be returned.
0879: Raster tile = null;
0880:
0881: // Get the tile's unique ID.
0882: Object tileID = tileKey(owner, tileX, tileY);
0883:
0884: // Set the computation flag and initialize or retrieve the tile cache.
0885: boolean computeTile = false;
0886: Object[] cache = null;
0887: synchronized (tilesInProgress) {
0888: if (computeTile = !tilesInProgress.containsKey(tileID)) {
0889: // Computing: add tile ID to the map.
0890: tilesInProgress.put(tileID, cache = new Object[1]);
0891: } else {
0892: // Waiting: get tile cache from the Map.
0893: cache = (Object[]) tilesInProgress.get(tileID);
0894: }
0895: }
0896:
0897: if (computeTile) {
0898: try {
0899: try {
0900: // Attempt to compute the tile.
0901: tile = owner.computeTile(tileX, tileY);
0902: } catch (OutOfMemoryError e) {
0903: // Empty the cache and call System.gc()
0904: TileCache tileCache = owner.getTileCache();
0905: if (tileCache != null) {
0906: tileCache.flush();
0907: System.gc(); //slow
0908: }
0909:
0910: // Re-attempt to compute the tile.
0911: tile = owner.computeTile(tileX, tileY);
0912: }
0913: } catch (Throwable e) {
0914: // Re-throw the Error or Exception.
0915: if (e instanceof Error) {
0916: throw (Error) e;
0917: } else if (e instanceof RuntimeException) {
0918: sendExceptionToListener(JaiI18N
0919: .getString("SunTileScheduler6"), e);
0920: // throw (RuntimeException)e;
0921: } else {
0922: String message = JaiI18N
0923: .getString("SunTileScheduler6");
0924: sendExceptionToListener(message,
0925: new ImagingException(message, e));
0926: /*
0927: throw new RuntimeException(e.getMessage()+"\n"+
0928: getStackTraceString(e));
0929: */
0930: }
0931: } finally {
0932: synchronized (cache) {
0933: // Always set the cached tile to a non-null value.
0934: cache[0] = tile != null ? tile : new Object();
0935:
0936: // Notify the thread(s).
0937: cache.notifyAll();
0938:
0939: synchronized (tilesInProgress) {
0940: // Remove the tile ID from the Map.
0941: tilesInProgress.remove(tileID);
0942: }
0943: }
0944: }
0945: } else {
0946: synchronized (cache) {
0947: // Check the cache: a null value indicates computation is
0948: // still in progress.
0949: if (cache[0] == null) {
0950: // Wait for the computation to complete.
0951: try {
0952: cache.wait(); // XXX Should there be a timeout?
0953: } catch (Exception e) {
0954: // XXX What response here?
0955: }
0956: }
0957:
0958: // Set the result only if cache contains a Raster.
0959: if (cache[0] instanceof Raster) {
0960: tile = (Raster) cache[0];
0961: } else {
0962: throw new RuntimeException(JaiI18N
0963: .getString("SunTileScheduler5"));
0964: }
0965: }
0966: }
0967:
0968: return tile;
0969: }
0970:
0971: /**
0972: * General purpose method for job creation and queueing. Note that
0973: * the returned value should be ignored if the <code>listener</code>
0974: * parameter is non-<code>null</code>.
0975: *
0976: * @param owner The image for which tile computation jobs will be queued.
0977: * @param tileIndices The indices of the tiles to be computed.
0978: * @param isPrefetch Whether the operation is a prefetch.
0979: * @param listener A <code>TileComputationListener</code> of the
0980: * processing. May be <code>null</code>.
0981: *
0982: * @return The computed tiles. This value is meaningless if
0983: * <code>listener</code> is non-<code>null</code>.
0984: */
0985: // The allowable arguments are constained as follows:
0986: // A) owner and tileIndices non-null.
0987: // B) (isBlocking,isPrefetch) in {(true,false),(false,false),(false,true)}
0988: // C) listeners != null <=> (isBlocking,isPrefetch) == (false,false)
0989: // The returned value is one of:
0990: // Raster[] <=> (isBlocking,isPrefetch) == (true,false)
0991: // Integer <=> (isBlocking,isPrefetch) == (false,false)
0992: // (Raster[])null <=> (isBlocking,isPrefetch) == (false,true)
0993: private Object scheduleJob(PlanarImage owner, Point[] tileIndices,
0994: boolean isBlocking, boolean isPrefetch,
0995: TileComputationListener[] listeners) {
0996: if (owner == null || tileIndices == null) {
0997: // null parameters
0998: throw new IllegalArgumentException(); // coding error - no message
0999: } else if ((isBlocking || isPrefetch) && listeners != null) {
1000: // listeners for blocking or prefetch job
1001: throw new IllegalArgumentException(); // coding error - no message
1002: } else if (isBlocking && isPrefetch) {
1003: throw new IllegalArgumentException(); // coding error - no message
1004: }
1005:
1006: int numTiles = tileIndices.length;
1007: Raster[] tiles = new Raster[numTiles];
1008: Object returnValue = tiles;
1009:
1010: int numThreads = 0;
1011: Job[] jobs = null;
1012: int numJobs = 0;
1013:
1014: synchronized (getWorkers(isPrefetch)) {
1015: numThreads = getNumThreads(isPrefetch);
1016:
1017: if (numThreads > 0) { // worker threads exist
1018: if (numTiles <= numThreads || // no more tiles than threads
1019: (!isBlocking && !isPrefetch)) { // non-blocking, non-prefetch
1020:
1021: jobs = new Job[numTiles];
1022:
1023: if (!isBlocking && !isPrefetch) {
1024: Request request = new Request(this , owner,
1025: tileIndices, listeners);
1026:
1027: // Override return value.
1028: returnValue = request;
1029:
1030: // Queue all tiles as single-tile jobs.
1031: while (numJobs < numTiles) {
1032: Point p = tileIndices[numJobs];
1033:
1034: Object tileID = tileKey(owner, p.x, p.y);
1035:
1036: synchronized (tileRequests) {
1037: List reqList = null;
1038: if (tileRequests.containsKey(tileID)) {
1039: // This tile is already queued in a
1040: // non-blocking, non-prefetch job.
1041: reqList = (List) tileRequests
1042: .get(tileID);
1043: reqList.add(request);
1044: numTiles--;
1045: } else {
1046: // This tile has not yet been queued.
1047: reqList = new ArrayList();
1048: reqList.add(request);
1049: tileRequests.put(tileID, reqList);
1050:
1051: jobs[numJobs] = new RequestJob(
1052: this , owner, p.x, p.y,
1053: tiles, numJobs);
1054:
1055: tileJobs.put(tileID, jobs[numJobs]);
1056:
1057: addJob(jobs[numJobs++], false);
1058: }
1059: }
1060: }
1061: } else { // numTiles <= numThreads
1062: while (numJobs < numTiles) {
1063: jobs[numJobs] = new TileJob(this ,
1064: isBlocking, owner, tileIndices,
1065: tiles, numJobs, 1);
1066: addJob(jobs[numJobs++], isPrefetch);
1067: }
1068: }
1069: } else { // more tiles than worker threads
1070: // Set the fraction of unqueued tiles to be processed by
1071: // each worker thread.
1072: float frac = 1.0F / (2.0F * numThreads);
1073:
1074: // Set the minimum number of tiles each thread may process.
1075: // If there is only one thread this will equal the total
1076: // number of tiles.
1077: int minTilesPerThread = numThreads == 1 ? numTiles
1078: : Math.min(Math.max(1, (int) (frac
1079: * numTiles / 2.0F + 0.5F)),
1080: numTiles);
1081:
1082: // Allocate the maximum possible number of multi-tile jobs.
1083: // This will be larger than the actual number of jobs but
1084: // a more precise calcuation is not possible and a dynamic
1085: // storage object such as a Collection would not be useful
1086: // since as calculated maxNumJobs = 4*numThreads if the
1087: // preceeding values of "frac" and "minTilesPerThread" are
1088: // 1/(2*numThreads) and frac*numTiles/2, respectively.
1089: int maxNumJobs = numThreads == 1 ? 1
1090: : (int) ((float) numTiles
1091: / (float) minTilesPerThread + 0.5F);
1092: jobs = new TileJob[maxNumJobs];
1093:
1094: // Set the number of enqueued tiles and the number left.
1095: int numTilesQueued = 0;
1096: int numTilesLeft = numTiles - numTilesQueued;
1097:
1098: // Assign a number of tiles to each thread determined by
1099: // the number of remaining tiles, the fraction of remaining
1100: // tiles to be processed and the minimum chunk size.
1101: while (numTilesLeft > 0) {
1102: // Set the number of tiles to the pre-calculated
1103: // fraction of tiles yet to be computed.
1104: int numTilesInThread = (int) (frac
1105: * numTilesLeft + 0.5F);
1106:
1107: // Ensure that the number to be processed is at
1108: // least the minimum chunk size.
1109: if (numTilesInThread < minTilesPerThread) {
1110: numTilesInThread = minTilesPerThread;
1111: }
1112:
1113: // Clamp number of tiles in thread to number unqueued.
1114: if (numTilesInThread > numTilesLeft) {
1115: numTilesInThread = numTilesLeft;
1116: }
1117:
1118: // Decrement the count of remaining tiles. Note that
1119: // this value will be non-negative due to the clamping
1120: // above.
1121: numTilesLeft -= numTilesInThread;
1122:
1123: // If the number left is smaller than the minimum chunk
1124: // size then process these tiles in the current job.
1125: if (numTilesLeft < minTilesPerThread) {
1126: numTilesInThread += numTilesLeft;
1127: numTilesLeft = 0;
1128: }
1129:
1130: // Create a job to process the number of tiles needed.
1131: jobs[numJobs] = new TileJob(this , isBlocking,
1132: owner, tileIndices, tiles,
1133: numTilesQueued, numTilesInThread);
1134:
1135: // Queue the job and increment the job count.
1136: addJob(jobs[numJobs++], isPrefetch);
1137:
1138: // Increment the count of tiles queued.
1139: numTilesQueued += numTilesInThread;
1140: }
1141: } // SingleTile vs. MultiTile Jobs
1142: } // numThreads > 0
1143: } // end synchronized block
1144:
1145: if (numThreads != 0) {
1146: // If blocking, wait until all tiles have been computed.
1147: // There is no 'else' block for non-blocking as in that
1148: // case we just want to continue.
1149: if (isBlocking) {
1150: LinkedList jobQueue = getQueue(isPrefetch);
1151:
1152: for (int i = 0; i < numJobs; i++) {
1153: synchronized (this ) {
1154: while (jobs[i].notDone()) {
1155: try {
1156: wait();
1157: } catch (InterruptedException ie) {
1158: // Ignore: should never happen.
1159: }
1160: }
1161: }
1162:
1163: // XXX: should we re-throw the exception or
1164: // should we reschedule this job ?? krishnag
1165: Exception e = jobs[i].getException();
1166:
1167: if (e != null) {
1168: // Throw a RuntimeException with the Exception's
1169: // message concatenated with the stack trace.
1170: String message = JaiI18N
1171: .getString("SunTileScheduler7");
1172: sendExceptionToListener(message,
1173: new ImagingException(message, e));
1174: /*
1175: throw new RuntimeException(e.getMessage()+"\n"+
1176: getStackTraceString(e));
1177: */
1178: }
1179: }
1180: }
1181: } else { // numThreads == 0
1182: Request request = null;
1183: if (!isBlocking && !isPrefetch) {
1184: request = new Request(this , owner, tileIndices,
1185: listeners);
1186: returnValue = request;
1187: }
1188:
1189: // no workers; sequentially compute tiles in main thread
1190: Exception e = compute(owner, tileIndices, tiles, 0,
1191: numTiles, request);
1192:
1193: // Throw a RuntimeException with the Exception's
1194: // message concatenated with the stack trace.
1195: if (e != null) {
1196: String message = JaiI18N.getString("SunTileScheduler7");
1197: sendExceptionToListener(message, new ImagingException(
1198: message, e));
1199: /*
1200: throw new RuntimeException(e.getMessage()+"\n"+
1201: getStackTraceString(e));
1202: */
1203: }
1204: }
1205:
1206: return returnValue;
1207: }
1208:
1209: /**
1210: * Schedules multiple tiles of an image for computation.
1211: *
1212: * @param owner The image the tiles belong to.
1213: * @param tileIndices An array of tile X and Y indices.
1214: *
1215: * @return An array of computed tiles.
1216: */
1217: public Raster[] scheduleTiles(OpImage owner, Point tileIndices[]) {
1218: if (owner == null || tileIndices == null) {
1219: throw new IllegalArgumentException(JaiI18N
1220: .getString("SunTileScheduler0"));
1221: }
1222: return (Raster[]) scheduleJob(owner, tileIndices, true, false,
1223: null);
1224: }
1225:
1226: /**
1227: * Schedule a list of tiles for computation. The supplied listeners
1228: * will be notified after each tile has been computed. This
1229: * method ideally should be non-blocking. If the <code>TileScheduler</code>
1230: * implementation uses multithreading, it is at the discretion of the
1231: * implementation which thread invokes the
1232: * <code>TileComputationListener</code> methods.
1233: */
1234: public TileRequest scheduleTiles(PlanarImage target,
1235: Point[] tileIndices, TileComputationListener[] tileListeners) {
1236: if (target == null || tileIndices == null) {
1237: throw new IllegalArgumentException(JaiI18N
1238: .getString("SunTileScheduler4"));
1239: }
1240: return (TileRequest) scheduleJob(target, tileIndices, false,
1241: false, tileListeners);
1242: }
1243:
1244: /**
1245: * Issues an advisory cancellation request to the
1246: * <code>TileScheduler</code> stating that the indicated tiles of the
1247: * specified image should not be processed. The handling of this request
1248: * is at the discretion of the scheduler which may cancel tile processing
1249: * in progress and remove tiles from its internal queue, remove tiles from
1250: * the queue but not terminate current processing, or simply do nothing.
1251: *
1252: * <p> In the Sun Microsystems reference implementation of
1253: * <code>TileScheduler</code> the second tile cancellation option is
1254: * implemented, i.e., tiles are removed from the internal queue but
1255: * computation already in progress is not terminated. If there is at
1256: * least one worker thread this method should be non-blocking. Any tiles
1257: * allowed to complete computation subsequent to this call are complete
1258: * and will be treated as if they had not been cancelled, e.g., with
1259: * respect to caching, notification of registered listeners, etc.
1260: * Furthermore, cancelling a tile request in no way invalidates the tile
1261: * as a candidate for future recomputation.
1262: */
1263: public void cancelTiles(TileRequest request, Point[] tileIndices) {
1264: if (request == null) {
1265: throw new IllegalArgumentException(JaiI18N
1266: .getString("SunTileScheduler3"));
1267: }
1268:
1269: Request req = (Request) request;
1270: synchronized (tileRequests) {
1271: // Save the list of all tile indices in this request.
1272: List reqIndexList = req.indices;
1273:
1274: // Initialize the set of tile indices to cancel.
1275: Point[] indices;
1276: if (tileIndices != null && tileIndices.length > 0) {
1277: // Create a Set from the supplied indices.
1278: List tileIndexList = Arrays.asList(tileIndices);
1279:
1280: // Retain only indices which were actually in the request.
1281: tileIndexList.retainAll(reqIndexList);
1282:
1283: indices = (Point[]) tileIndexList.toArray(new Point[0]);
1284: } else {
1285: indices = (Point[]) reqIndexList.toArray(new Point[0]);
1286: }
1287:
1288: // Cache the count.
1289: int numTiles = indices.length;
1290:
1291: // Cache status value.
1292: Integer tileStatus = new Integer(
1293: TileRequest.TILE_STATUS_CANCELLED);
1294:
1295: // Loop over tile indices to be cancelled.
1296: for (int i = 0; i < numTiles; i++) {
1297: Point p = indices[i];
1298:
1299: // Get the tile's ID.
1300: Object tileID = tileKey(req.image, p.x, p.y);
1301:
1302: // Get the list of requests for this tile.
1303: List reqList = (List) tileRequests.get(tileID);
1304:
1305: // If there are none, proceed to next index.
1306: if (reqList == null) {
1307: continue;
1308: }
1309:
1310: // Remove this Request from the Request List for this tile.
1311: reqList.remove(req);
1312:
1313: // If the request list is now empty, dequeue the job and
1314: // remove the tile from the hashes.
1315: if (reqList.isEmpty()) {
1316: synchronized (queue) {
1317: Object job = tileJobs.remove(tileID);
1318: if (job != null) {
1319: queue.remove(job);
1320: }
1321: }
1322: tileRequests.remove(tileID);
1323: }
1324:
1325: // Update tile status to "cancelled".
1326: req.tileStatus.put(p, tileStatus);
1327:
1328: // Notify any listeners.
1329: if (req.listeners != null) {
1330: TileRequest[] reqArray = new TileRequest[] { req };
1331: Iterator iter = req.listeners.iterator();
1332: while (iter.hasNext()) {
1333: TileComputationListener listener = (TileComputationListener) iter
1334: .next();
1335: listener.tileCancelled(this , reqArray,
1336: req.image, p.x, p.y);
1337: }
1338: }
1339: }
1340: }
1341: }
1342:
1343: /**
1344: * Prefetchs a list of tiles of an image.
1345: *
1346: * @param owner The image the tiles belong to.
1347: * @param tileIndices An array of tile X and Y indices.
1348: */
1349: public void prefetchTiles(PlanarImage owner, Point[] tileIndices) {
1350: if (owner == null || tileIndices == null) {
1351: throw new IllegalArgumentException(JaiI18N
1352: .getString("SunTileScheduler0"));
1353: }
1354: scheduleJob(owner, tileIndices, false, true, null);
1355: }
1356:
1357: /**
1358: * Suggests to the scheduler the degree of parallelism to use in
1359: * processing invocations of <code>scheduleTiles()</code>. For
1360: * example, this might set the number of threads to spawn. It is
1361: * legal to implement this method as a no-op.
1362: *
1363: * <p> In the Sun Microsystems reference implementation of TileScheduler
1364: * this method sets the number of worker threads actually used for tile
1365: * computation. Ideally this number should equal the number of processors
1366: * actually available on the system. It is the responsibility of the
1367: * application to set this value as the number of processors is not
1368: * available via the virtual machine. A parallelism value of zero
1369: * indicates that all tile computation will be effected in the primary
1370: * thread. A parallelism value of <i>N</i> indicates that there will be
1371: * <i>N</i> worker threads in addition to the primary scheduler thread.
1372: * In JAI the parallelism defaults to a value of 2 unless explicity set
1373: * by the application.
1374: *
1375: * @param parallelism The suggested degree of parallelism.
1376: * @throws IllegalArgumentException if <code>parallelism</code>
1377: * is negative.
1378: */
1379: public void setParallelism(int parallelism) {
1380: if (parallelism < 0) {
1381: throw new IllegalArgumentException(JaiI18N
1382: .getString("SunTileScheduler2"));
1383: }
1384: this .parallelism = parallelism;
1385: }
1386:
1387: /**
1388: * Returns the degree of parallelism of the scheduler.
1389: */
1390: public int getParallelism() {
1391: return parallelism;
1392: }
1393:
1394: /**
1395: * Identical to <code>setParallelism()</code> but applies only to
1396: * <code>prefetchTiles()</code>.
1397: */
1398: public void setPrefetchParallelism(int parallelism) {
1399: if (parallelism < 0) {
1400: throw new IllegalArgumentException(JaiI18N
1401: .getString("SunTileScheduler2"));
1402: }
1403: prefetchParallelism = parallelism;
1404: }
1405:
1406: /**
1407: * Identical to <code>getParallelism()</code> but applies only to
1408: * <code>prefetchTiles()</code>.
1409: */
1410: public int getPrefetchParallelism() {
1411: return prefetchParallelism;
1412: }
1413:
1414: /**
1415: * Suggests to the scheduler the priority to assign to processing
1416: * effected by <code>scheduleTiles()</code>. For example, this might
1417: * set thread priority. Values outside of the accepted priority range
1418: * will be clamped to the nearest extremum. An implementation may clamp
1419: * the prefetch priority to less than the scheduling priority. It is
1420: * legal to implement this method as a no-op.
1421: *
1422: * <p> In the Sun Microsystems reference implementation of TileScheduler
1423: * this method sets the priority of the worker threads used for tile
1424: * computation. Its initial value is <code>Thread.NORM_PRIORITY</code>.
1425: *
1426: * @param priority The suggested priority.
1427: */
1428: public void setPriority(int priority) {
1429: this .priority = Math.max(Math
1430: .min(priority, Thread.MAX_PRIORITY),
1431: Thread.MIN_PRIORITY);
1432: }
1433:
1434: /**
1435: * Returns the priority of <code>scheduleTiles()</code> processing.
1436: */
1437: public int getPriority() {
1438: return priority;
1439: }
1440:
1441: /**
1442: * Identical to <code>setPriority()</code> but applies only to
1443: * <code>prefetchTiles()</code>.
1444: *
1445: * <p> In the Sun Microsystems reference implementation of
1446: * <code>TileScheduler</code>, this method sets the priority of any threads
1447: * spawned to prefetch tiles. Its initial value is
1448: * <code>Thread.MIN_PRIORITY</code>.
1449: */
1450: public void setPrefetchPriority(int priority) {
1451: prefetchPriority = Math.max(Math.min(priority,
1452: Thread.MAX_PRIORITY), Thread.MIN_PRIORITY);
1453: }
1454:
1455: /**
1456: * Identical to <code>getPriority()</code> but applies only to
1457: * <code>prefetchTiles()</code>.
1458: */
1459: public int getPrefetchPriority() {
1460: return prefetchPriority;
1461: }
1462:
1463: /** Recreate the <code>ThreadGroup</code>is and <code>WorkThread</code>s.
1464: * This happens in the case of applet: the java plugin will exist after
1465: * the termination of the applet so that JAI and SunTileScheduler will
1466: * also exist. However, the <code>ThreadGroup</code>s are destroyed.
1467: * Thus, the old workers should be terminated and new i
1468: * <code>ThreadGroup</code> and workers should be created.
1469: */
1470: // private synchronized void createThreadGroup(boolean isPrefetch) {
1471: private void createThreadGroup(boolean isPrefetch) {
1472: if (rootGroup == null || rootGroup.isDestroyed()) {
1473: rootGroup = new ThreadGroup(nameOfThisInstance);
1474: rootGroup.setDaemon(true);
1475: }
1476:
1477: if (isPrefetch
1478: && (prefetchGroup == null || prefetchGroup
1479: .isDestroyed())) {
1480: prefetchGroup = new ThreadGroup(rootGroup,
1481: nameOfThisInstance + "Prefetch");
1482: prefetchGroup.setDaemon(true);
1483: }
1484:
1485: if (!isPrefetch
1486: && (standardGroup == null || standardGroup
1487: .isDestroyed())) {
1488: standardGroup = new ThreadGroup(rootGroup,
1489: nameOfThisInstance + "Standard");
1490: standardGroup.setDaemon(true);
1491: }
1492:
1493: Vector thr = getWorkers(isPrefetch);
1494: int size = thr.size();
1495:
1496: for (int i = size - 1; i >= 0; i--) {
1497: Thread t = (Thread) thr.get(i);
1498: if (!t.isAlive())
1499: thr.remove(t);
1500: }
1501:
1502: if (isPrefetch)
1503: numPrefetchThreads = thr.size();
1504: else
1505: numWorkerThreads = thr.size();
1506:
1507: }
1508:
1509: /**
1510: * Returns the effective number of threads of the specified type.
1511: * This method also updates the number and priority of threads of
1512: * the specified type according to the global settings. This method
1513: * may add <code>WorkerThread.TERMINATE</code>s to the appropriate
1514: * queue if there are too many effective threads.
1515: */
1516: private int getNumThreads(boolean isPrefetch) {
1517: createThreadGroup(isPrefetch);
1518:
1519: // Local variables.
1520: Vector thr = getWorkers(isPrefetch);
1521: int nthr;
1522: int prll;
1523: int prty;
1524:
1525: // Set local variables depending on the thread type.
1526: if (isPrefetch) {
1527: nthr = numPrefetchThreads;
1528: prll = prefetchParallelism;
1529: prty = prefetchPriority;
1530: } else {
1531: nthr = numWorkerThreads;
1532: prll = parallelism;
1533: prty = priority;
1534: }
1535:
1536: // Update priority if it has changed.
1537: if (nthr > 0 && ((Thread) thr.get(0)).getPriority() != prty) {
1538: int size = thr.size();
1539: for (int i = 0; i < size; i++) {
1540: Thread t = (Thread) thr.get(i);
1541: if (t != null && t.getThreadGroup() != null) {
1542: t.setPriority(prty);
1543: }
1544: }
1545: }
1546:
1547: if (nthr < prll) {
1548: // Not enough processing threads.
1549: // Add more threads at current priority.
1550: while (nthr < prll) {
1551: Thread t = new WorkerThread(isPrefetch ? prefetchGroup
1552: : standardGroup, this , isPrefetch);
1553:
1554: t.setPriority(prty);
1555: thr.add(t);
1556: nthr++;
1557: }
1558: } else {
1559: // Too many processing threads: queue WorkerThread.TERMINATEs.
1560: // WorkerThread will remove itself later from the appropriate
1561: // Vector.
1562: while (nthr > prll) {
1563: addJob(WorkerThread.TERMINATE, isPrefetch);
1564: nthr--;
1565: }
1566: }
1567:
1568: // Update the number of effective threads.
1569: if (isPrefetch) {
1570: numPrefetchThreads = nthr;
1571: } else {
1572: numWorkerThreads = nthr;
1573: }
1574:
1575: return nthr;
1576: }
1577:
1578: /** Returns the appropriate worker list. */
1579: Vector getWorkers(boolean isPrefetch) {
1580: return isPrefetch ? workers : prefetchWorkers;
1581: }
1582:
1583: /** Returns the appropriate queue. */
1584: LinkedList getQueue(boolean isPrefetch) {
1585: return isPrefetch ? prefetchQueue : queue;
1586: }
1587:
1588: /** Append a job to the appropriate queue. */
1589: private void addJob(Object job, boolean isPrefetch) {
1590: if (job == null
1591: || (job != WorkerThread.TERMINATE && !(job instanceof Job))) {
1592: // Programming error: deliberately no message.
1593: throw new IllegalArgumentException();
1594: }
1595:
1596: LinkedList jobQueue;
1597: synchronized (jobQueue = getQueue(isPrefetch)) {
1598: if (isPrefetch || jobQueue.isEmpty()
1599: || job instanceof RequestJob) {
1600: // Append job to queue.
1601: jobQueue.addLast(job);
1602: } else {
1603: // If the queue is non-empty or the job is a TileJob
1604: // insert the job after the last TileJob in the queue.
1605: boolean inserted = false;
1606: for (int idx = jobQueue.size() - 1; idx >= 0; idx--) {
1607: if (jobQueue.get(idx) instanceof TileJob) {
1608: jobQueue.add(idx + 1, job);
1609: inserted = true;
1610: break;
1611: }
1612: }
1613: if (!inserted) {
1614: jobQueue.addFirst(job);
1615: }
1616: }
1617: jobQueue.notify();
1618: }
1619: }
1620:
1621: /** Queue WorkerThread.TERMINATEs to all workers. */
1622: protected void finalize() throws Throwable {
1623: terminateAll(false);
1624: terminateAll(true);
1625: super .finalize();
1626: }
1627:
1628: /** Queue WorkerThread.TERMINATEs to all appropriate workers. */
1629: private void terminateAll(boolean isPrefetch) {
1630: synchronized (getWorkers(isPrefetch)) {
1631: int numThreads = isPrefetch ? numPrefetchThreads
1632: : numWorkerThreads;
1633: for (int i = 0; i < numThreads; i++) {
1634: addJob(WorkerThread.TERMINATE, isPrefetch);
1635: if (isPrefetch) {
1636: numPrefetchThreads--;
1637: } else {
1638: numWorkerThreads--;
1639: }
1640: }
1641: }
1642: }
1643:
1644: void sendExceptionToListener(String message, Throwable e) {
1645: ImagingListener listener = ImageUtil
1646: .getImagingListener((RenderingHints) null);
1647: listener.errorOccurred(message, e, this , false);
1648: }
1649: }
|