001: package org.jacorb.notification.queue;
002:
003: /*
004: * JacORB - a free Java ORB
005: *
006: * Copyright (C) 1999-2004 Gerald Brose
007: *
008: * This library is free software; you can redistribute it and/or
009: * modify it under the terms of the GNU Library General Public
010: * License as published by the Free Software Foundation; either
011: * version 2 of the License, or (at your option) any later version.
012: *
013: * This library is distributed in the hope that it will be useful,
014: * but WITHOUT ANY WARRANTY; without even the implied warranty of
015: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
016: * Library General Public License for more details.
017: *
018: * You should have received a copy of the GNU Library General Public
019: * License along with this library; if not, write to the Free
020: * Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
021: *
022: */
023:
024: import java.util.ArrayList;
025: import java.util.Iterator;
026: import java.util.List;
027:
028: import org.jacorb.notification.interfaces.Message;
029:
030: /**
031: * @author Alphonse Bendt
032: * @version $Id: AbstractBoundedEventQueue.java,v 1.11 2006/02/25 15:28:40 alphonse.bendt Exp $
033: */
034:
035: public abstract class AbstractBoundedEventQueue implements MessageQueue {
036: private final Object lock_;
037: private final int capacity_;
038: private final List listeners_ = new ArrayList();
039: private final EventQueueOverflowStrategy overflowStrategy_;
040:
041: protected AbstractBoundedEventQueue(int capacity,
042: EventQueueOverflowStrategy overflowStrategy, Object lock) {
043: lock_ = lock;
044: capacity_ = capacity;
045: overflowStrategy_ = overflowStrategy;
046: }
047:
048: public final String getDiscardPolicyName() {
049: return overflowStrategy_.getDiscardPolicyName();
050: }
051:
052: protected abstract Message getEarliestTimeout();
053:
054: protected abstract Message getLeastPriority();
055:
056: protected abstract Message getNextElement();
057:
058: protected abstract Message getOldestElement();
059:
060: protected abstract Message getYoungestElement();
061:
062: protected abstract Message[] getElements(int max);
063:
064: protected abstract void addElement(Message message);
065:
066: protected abstract Message[] getAllElements();
067:
068: public abstract String getOrderPolicyName();
069:
070: public Message[] getAllMessages(boolean wait)
071: throws InterruptedException {
072: synchronized (lock_) {
073: if (wait) {
074: return getAllBlocking();
075: }
076:
077: return getAllElements();
078: }
079: }
080:
081: /**
082: * @pre current thread own monitor lock_
083: */
084: private Message[] getAllBlocking() throws InterruptedException {
085: while (isEmpty()) {
086: lock_.wait();
087: }
088:
089: return getAllElements();
090: }
091:
092: public Message getMessage(boolean wait) throws InterruptedException {
093: synchronized (lock_) {
094: if (wait) {
095: return getEventBlocking();
096: }
097:
098: if (isEmpty()) {
099: return null;
100: }
101:
102: return getNextElement();
103: }
104: }
105:
106: public Message[] getMessages(int max, boolean wait)
107: throws InterruptedException {
108: synchronized (lock_) {
109: if (wait) {
110: return getEventsBlocking(max);
111: }
112:
113: return getElements(max);
114: }
115: }
116:
117: /**
118: * @pre current thread owns monitor lock_
119: */
120: private Message[] getEventsBlocking(int max)
121: throws InterruptedException {
122: while (isEmpty()) {
123: lock_.wait();
124: }
125:
126: return getElements(max);
127: }
128:
129: /**
130: * @pre current thread owns monitor lock_
131: */
132: private Message getEventBlocking() throws InterruptedException {
133: while (isEmpty()) {
134: lock_.wait();
135: }
136:
137: return getNextElement();
138: }
139:
140: public void put(Message event) {
141: synchronized (lock_) {
142: while (getSize() >= capacity_) {
143: overflowStrategy_.removeElementFromQueue(this );
144:
145: fireMessageDiscarded();
146: }
147:
148: addElement(event);
149:
150: lock_.notifyAll();
151: }
152: }
153:
154: private void fireMessageDiscarded() {
155: final Iterator i = listeners_.iterator();
156:
157: while (i.hasNext()) {
158: ((DiscardListener) i.next()).messageDiscarded(capacity_);
159: }
160: }
161:
162: public void addDiscardListener(DiscardListener listener) {
163: listeners_.add(listener);
164: }
165:
166: public void removeDiscardListener(DiscardListener listener) {
167: listeners_.remove(listener);
168: }
169: }
|