001: /*
002: * Copyright (c) 2001 by Matt Welsh and The Regents of the University of
003: * California. All rights reserved.
004: *
005: * Permission to use, copy, modify, and distribute this software and its
006: * documentation for any purpose, without fee, and without written agreement is
007: * hereby granted, provided that the above copyright notice and the following
008: * two paragraphs appear in all copies of this software.
009: *
010: * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY FOR
011: * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT
012: * OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF THE UNIVERSITY OF
013: * CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
014: *
015: * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
016: * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
017: * AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
018: * ON AN "AS IS" BASIS, AND THE UNIVERSITY OF CALIFORNIA HAS NO OBLIGATION TO
019: * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
020: *
021: * Author: Matt Welsh <mdw@cs.berkeley.edu>
022: *
023: */
024:
025: package seda.sandStorm.core;
026:
027: import seda.sandStorm.api.*;
028: import seda.sandStorm.api.internal.*;
029:
030: import java.util.Hashtable;
031:
032: /**
033: * The FiniteQueue class is a simple implementation of the QueueIF
034: * interface, using a linked list.
035: *
036: * @author Matt Welsh
037: * @see seda.sandStorm.api.QueueIF
038: */
039:
040: public class FiniteQueue implements QueueIF, ProfilableIF {
041:
042: private static final boolean DEBUG = false;
043:
044: private ssLinkedList qlist;
045: private int queueSize;
046: private Object blocker;
047: private Hashtable provisionalTbl;
048: private EnqueuePredicateIF pred;
049: private String name;
050:
051: /**
052: * Create a FiniteQueue with the given enqueue predicate.
053: */
054: public FiniteQueue(EnqueuePredicateIF pred) {
055: this .name = null;
056: this .pred = pred;
057: qlist = new ssLinkedList();
058: queueSize = 0;
059: blocker = new Object();
060: provisionalTbl = new Hashtable(1);
061: }
062:
063: /**
064: * Create a FiniteQueue with no enqueue predicate.
065: */
066: public FiniteQueue() {
067: this ((EnqueuePredicateIF) null);
068: }
069:
070: /**
071: * Create a FiniteQueue with no enqueue and the given name. Used for
072: * debugging.
073: */
074: public FiniteQueue(String name) {
075: this ((EnqueuePredicateIF) null);
076: this .name = name;
077: }
078:
079: /**
080: * Return the size of the queue.
081: */
082: public int size() {
083: synchronized (blocker) {
084: synchronized (qlist) {
085: return queueSize;
086: }
087: }
088: }
089:
090: public void enqueue(QueueElementIF enqueueMe)
091: throws SinkFullException {
092:
093: if (DEBUG)
094: System.err.println("**** ENQUEUE (" + name
095: + ") **** Entered");
096: synchronized (blocker) {
097:
098: synchronized (qlist) {
099: if (DEBUG)
100: System.err.println("**** ENQUEUE (" + name
101: + ") **** Checking pred");
102: if ((pred != null) && (!pred.accept(enqueueMe)))
103: throw new SinkFullException("FiniteQueue is full!");
104: queueSize++;
105: if (DEBUG)
106: System.err.println("**** ENQUEUE (" + name
107: + ") **** Add to tail");
108: qlist.add_to_tail(enqueueMe); // wake up one blocker
109: }
110: // XXX MDW: Trying to track down a bug here ...
111: if (DEBUG)
112: System.err.println("**** ENQUEUE (" + name
113: + ") **** Doing notify");
114: blocker.notify();
115: if (DEBUG)
116: System.err.println("**** ENQUEUE (" + name
117: + ") **** Done with notify");
118: //blocker.notifyAll();
119: }
120: if (DEBUG)
121: System.err.println("**** ENQUEUE (" + name
122: + ") **** Exiting");
123: }
124:
125: public boolean enqueue_lossy(QueueElementIF enqueueMe) {
126: try {
127: this .enqueue(enqueueMe);
128: } catch (Exception e) {
129: return false;
130: }
131: return true;
132: }
133:
134: public void enqueue_many(QueueElementIF[] enqueueMe)
135: throws SinkFullException {
136: synchronized (blocker) {
137: int qlen = enqueueMe.length;
138:
139: synchronized (qlist) {
140: if (pred != null) {
141: int i = 0;
142: while ((i < qlen) && (pred.accept(enqueueMe[i])))
143: i++;
144: if (i != qlen)
145: throw new SinkFullException(
146: "FiniteQueue is full!");
147: }
148:
149: queueSize += qlen;
150: for (int i = 0; i < qlen; i++) {
151: qlist.add_to_tail(enqueueMe[i]);
152: }
153: }
154: blocker.notifyAll(); // wake up all sleepers
155: }
156: }
157:
158: public QueueElementIF dequeue() {
159:
160: QueueElementIF el = null;
161: synchronized (blocker) {
162: synchronized (qlist) {
163: if (qlist.size() == 0)
164: return null;
165:
166: el = (QueueElementIF) qlist.remove_head();
167: queueSize--;
168: return el;
169: }
170: }
171: }
172:
173: public QueueElementIF[] dequeue_all() {
174:
175: synchronized (blocker) {
176: synchronized (qlist) {
177: int qs = qlist.size();
178: if (qs == 0)
179: return null;
180:
181: QueueElementIF[] retIF = new QueueElementIF[qs];
182: for (int i = 0; i < qs; i++)
183: retIF[i] = (QueueElementIF) qlist.remove_head();
184: queueSize -= qs;
185: return retIF;
186: }
187: }
188: }
189:
190: public QueueElementIF[] dequeue(int num) {
191:
192: synchronized (blocker) {
193: synchronized (qlist) {
194: int qs = Math.min(qlist.size(), num);
195:
196: if (qs == 0)
197: return null;
198:
199: QueueElementIF[] retIF = new QueueElementIF[qs];
200: for (int i = 0; i < qs; i++)
201: retIF[i] = (QueueElementIF) qlist.remove_head();
202: queueSize -= qs;
203: return retIF;
204: }
205: }
206: }
207:
208: public QueueElementIF[] dequeue(int num, boolean mustReturnNum) {
209:
210: synchronized (blocker) {
211: synchronized (qlist) {
212: int qs;
213:
214: if (!mustReturnNum) {
215: qs = Math.min(qlist.size(), num);
216: } else {
217: if (qlist.size() < num)
218: return null;
219: qs = num;
220: }
221:
222: if (qs == 0)
223: return null;
224:
225: QueueElementIF[] retIF = new QueueElementIF[qs];
226: for (int i = 0; i < qs; i++)
227: retIF[i] = (QueueElementIF) qlist.remove_head();
228: queueSize -= qs;
229: return retIF;
230: }
231: }
232: }
233:
234: public QueueElementIF[] blocking_dequeue_all(int timeout_millis) {
235: QueueElementIF[] rets = null;
236: long goal_time;
237: int num_spins = 0;
238:
239: if (DEBUG)
240: System.err.println("**** B_DEQUEUE_A (" + name
241: + ") **** Entered");
242:
243: goal_time = System.currentTimeMillis() + timeout_millis;
244: while (true) {
245: synchronized (blocker) {
246:
247: if (DEBUG)
248: System.err.println("**** B_DEQUEUE_A (" + name
249: + ") **** Doing D_A");
250: rets = this .dequeue_all();
251: if (DEBUG)
252: System.err.println("**** B_DEQUEUE_A (" + name
253: + ") **** RETS IS " + rets);
254: if ((rets != null) || (timeout_millis == 0)) {
255: if (DEBUG)
256: System.err.println("**** B_DEQUEUE_A (" + name
257: + ") **** RETURNING (1)");
258: return rets;
259: }
260:
261: if (timeout_millis == -1) {
262: try {
263: blocker.wait();
264: } catch (InterruptedException ie) {
265: }
266: } else {
267: try {
268: if (DEBUG)
269: System.err.println("**** B_DEQUEUE_A ("
270: + name
271: + ") **** WAITING ON BLOCKER");
272: blocker.wait(timeout_millis);
273: } catch (InterruptedException ie) {
274: }
275: }
276:
277: if (DEBUG)
278: System.err.println("**** B_DEQUEUE_A (" + name
279: + ") **** Doing D_A (2)");
280: rets = this .dequeue_all();
281: if (DEBUG)
282: System.err.println("**** B_DEQUEUE_A (" + name
283: + ") **** RETS(2) IS " + rets);
284: if (rets != null) {
285: if (DEBUG)
286: System.err.println("**** B_DEQUEUE_A (" + name
287: + ") **** RETURNING(2)");
288: return rets;
289: }
290:
291: if (timeout_millis != -1) {
292: if (System.currentTimeMillis() >= goal_time) {
293: if (DEBUG)
294: System.err.println("**** B_DEQUEUE_A ("
295: + name + ") **** RETURNING(3)");
296: return null;
297: }
298: }
299: }
300: }
301: }
302:
303: public QueueElementIF[] blocking_dequeue(int timeout_millis,
304: int num, boolean mustReturnNum) {
305:
306: QueueElementIF[] rets = null;
307: long goal_time;
308: int num_spins = 0;
309:
310: goal_time = System.currentTimeMillis() + timeout_millis;
311: while (true) {
312: synchronized (blocker) {
313:
314: rets = this .dequeue(num, mustReturnNum);
315: if ((rets != null) || (timeout_millis == 0)) {
316: return rets;
317: }
318:
319: if (timeout_millis == -1) {
320: try {
321: blocker.wait();
322: } catch (InterruptedException ie) {
323: }
324: } else {
325: try {
326: blocker.wait(timeout_millis);
327: } catch (InterruptedException ie) {
328: }
329: }
330:
331: rets = this .dequeue(num, mustReturnNum);
332: if (rets != null) {
333: return rets;
334: }
335:
336: if (timeout_millis != -1) {
337: if (System.currentTimeMillis() >= goal_time) {
338: // Timeout - take whatever we can get
339: return this .dequeue(num);
340: }
341: }
342: }
343: }
344: }
345:
346: public QueueElementIF[] blocking_dequeue(int timeout_millis, int num) {
347: return blocking_dequeue(timeout_millis, num, false);
348: }
349:
350: public QueueElementIF blocking_dequeue(int timeout_millis) {
351: QueueElementIF rets = null;
352: long goal_time;
353: int num_spins = 0;
354:
355: goal_time = System.currentTimeMillis() + timeout_millis;
356: while (true) {
357: synchronized (blocker) {
358:
359: rets = this .dequeue();
360: if ((rets != null) || (timeout_millis == 0)) {
361: return rets;
362: }
363:
364: if (timeout_millis == -1) {
365: try {
366: blocker.wait();
367: } catch (InterruptedException ie) {
368: }
369: } else {
370: try {
371: blocker.wait(timeout_millis);
372: } catch (InterruptedException ie) {
373: }
374: }
375:
376: rets = this .dequeue();
377: if (rets != null) {
378: return rets;
379: }
380:
381: if (timeout_millis != -1) {
382: if (System.currentTimeMillis() >= goal_time)
383: return null;
384: }
385: }
386: }
387: }
388:
389: /**
390: * Return the profile size of the queue.
391: */
392: public int profileSize() {
393: return size();
394: }
395:
396: /**
397: * Provisionally enqueue the given elements.
398: */
399: public Object enqueue_prepare(QueueElementIF enqueueMe[])
400: throws SinkException {
401: int qlen = enqueueMe.length;
402: synchronized (blocker) {
403: synchronized (qlist) {
404: if (pred != null) {
405: int i = 0;
406: while ((i < qlen) && (pred.accept(enqueueMe[i])))
407: i++;
408: if (i != qlen)
409: throw new SinkFullException(
410: "FiniteQueue is full!");
411: }
412: queueSize += qlen;
413: Object key = new Object();
414: provisionalTbl.put(key, enqueueMe);
415: return key;
416: }
417: }
418: }
419:
420: /**
421: * Commit a provisional enqueue.
422: */
423: public void enqueue_commit(Object key) {
424: synchronized (blocker) {
425: synchronized (qlist) {
426: QueueElementIF elements[] = (QueueElementIF[]) provisionalTbl
427: .remove(key);
428: if (elements == null)
429: throw new IllegalArgumentException(
430: "Unknown enqueue key " + key);
431: for (int i = 0; i < elements.length; i++) {
432: qlist.add_to_tail(elements[i]);
433: }
434: }
435: blocker.notifyAll();
436: }
437: }
438:
439: /**
440: * Abort a provisional enqueue.
441: */
442: public void enqueue_abort(Object key) {
443: synchronized (blocker) {
444: synchronized (qlist) {
445: QueueElementIF elements[] = (QueueElementIF[]) provisionalTbl
446: .remove(key);
447: if (elements == null)
448: throw new IllegalArgumentException(
449: "Unknown enqueue key " + key);
450: queueSize -= elements.length;
451: }
452: }
453: }
454:
455: /**
456: * Set the enqueue predicate for this sink.
457: */
458: public void setEnqueuePredicate(EnqueuePredicateIF pred) {
459: this .pred = pred;
460: }
461:
462: /**
463: * Return the enqueue predicate for this sink.
464: */
465: public EnqueuePredicateIF getEnqueuePredicate() {
466: return pred;
467: }
468:
469: public String toString() {
470: return "FiniteQueue <" + name + ">";
471: }
472:
473: }
|