01: // ***************************************************************
02: // * *
03: // * File: ConsumerProducer.java *
04: // * *
05: // * Copyright (c) 2001 Sun Microsystems, Inc. *
06: // * All rights reserved. *
07: // * *
08: // * *
09: // * Date - Dec/11/2001 *
10: // * Author - alejandro.abdelnur@sun.com *
11: // * *
12: // ***************************************************************
13:
14: package com.sun.portal.common.concurrent;
15:
16: import java.util.List;
17: import java.util.ArrayList;
18:
19: public class ConsumerProducer {
20:
21: public static interface Threshold {
22:
23: public void reachedStopLimit();
24:
25: public void reachedResumeLimit();
26:
27: };
28:
29: private List _queue;
30: private int _stopLimit = 0;
31: private int _resumeLimit = 0;
32: private Threshold _threshold = null;
33: private boolean _reachedLimit = false;
34:
35: public ConsumerProducer() {
36: this (0, 0, null);
37: }
38:
39: public ConsumerProducer(int stopLimit, int resumeLimit,
40: Threshold threshold) {
41: _stopLimit = stopLimit;
42: _resumeLimit = resumeLimit;
43: _threshold = threshold;
44: _queue = new ArrayList(100);
45: }
46:
47: public synchronized void put(Object o) {
48: _queue.add(o);
49: int size = _queue.size();
50: if (size == 1) {
51: notify();
52: }
53: if (_threshold != null && _stopLimit > 0 && size > _stopLimit) {
54: _threshold.reachedStopLimit();
55: _reachedLimit = true;
56: }
57:
58: }
59:
60: public synchronized Object get() {
61: int size = _queue.size();
62: if (size == 0)
63: try {
64: wait();
65: } catch (Exception e) {
66: }
67:
68: if (_reachedLimit && size < _resumeLimit) {
69: _threshold.reachedResumeLimit();
70: _reachedLimit = false;
71: }
72:
73: Object o = _queue.get(0);
74: _queue.remove(0);
75: return o;
76: }
77:
78: public boolean isEmpty() {
79: return _queue.size() == 0;
80: }
81:
82: public synchronized void empty() {
83: _queue.clear();
84: _reachedLimit = false;
85: }
86:
87: }
|