001: package org.jacorb.poa;
002:
003: /*
004: * JacORB - a free Java ORB
005: *
006: * Copyright (C) 1997-2004 Gerald Brose.
007: *
008: * This library is free software; you can redistribute it and/or
009: * modify it under the terms of the GNU Library General Public
010: * License as published by the Free Software Foundation; either
011: * version 2 of the License, or (at your option) any later version.
012: *
013: * This library is distributed in the hope that it will be useful,
014: * but WITHOUT ANY WARRANTY; without even the implied warranty of
015: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
016: * Library General Public License for more details.
017: *
018: * You should have received a copy of the GNU Library General Public
019: * License along with this library; if not, write to the Free
020: * Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
021: */
022:
023: import java.util.*;
024:
025: import org.apache.avalon.framework.logger.Logger;
026: import org.apache.avalon.framework.configuration.*;
027:
028: import org.jacorb.orb.dsi.ServerRequest;
029: import org.jacorb.poa.except.ResourceLimitReachedException;
030: import org.jacorb.poa.util.StringPair;
031: import org.omg.CORBA.BAD_INV_ORDER;
032:
033: /**
034: * This class manages a queue of ServerRequest objects.
035: *
036: * @author Reimo Tiedemann, FU Berlin
037: * @version $Id: RequestQueue.java,v 1.20 2006/07/19 15:26:39 alphonse.bendt Exp $
038: */
039: public class RequestQueue implements Configurable {
040: private RequestQueueListener queueListener;
041: private final RequestController controller;
042:
043: /** the configuration object for this queue */
044: private org.jacorb.config.Configuration configuration = null;
045: private Logger logger;
046: private int queueMin;
047: private int queueMax;
048: private boolean queueWait;
049: private List queueListeners;
050:
051: private boolean configured = false;
052:
053: private final LinkedList queue = new LinkedList();
054:
055: protected RequestQueue(RequestController controller) {
056: this .controller = controller;
057: }
058:
059: public synchronized void configure(Configuration myConfiguration)
060: throws ConfigurationException {
061: if (configured) {
062: return;
063: }
064:
065: this .configuration = (org.jacorb.config.Configuration) myConfiguration;
066: logger = configuration.getNamedLogger("jacorb.poa.queue");
067: queueMax = configuration.getAttributeAsInteger(
068: "jacorb.poa.queue_max", 100);
069: queueMin = configuration.getAttributeAsInteger(
070: "jacorb.poa.queue_min", 10);
071: queueWait = configuration.getAttributeAsBoolean(
072: "jacorb.poa.queue_wait", false);
073: queueListeners = configuration
074: .getAttributeList("jacorb.poa.queue_listeners");
075: configured = true;
076:
077: for (Iterator i = queueListeners.iterator(); i.hasNext();) {
078: String className = (String) i.next();
079: try {
080: RequestQueueListener rql = (RequestQueueListener) org.jacorb.util.ObjectUtil
081: .classForName(className).newInstance();
082: addRequestQueueListener(rql);
083: } catch (Exception ex) {
084: throw new ConfigurationException(
085: "could not instantiate queue listener", ex);
086: }
087: }
088: }
089:
090: /**
091: * Adds a request to this queue. The properties
092: * <code>jacorb.poa.queue_{min,max,wait}</code> specify what happens
093: * when the queue is full, i.e. when it already contains
094: * <code>queue_max</code> requests. If <code>queue_wait</code> is
095: * <i>off</i>, then this method does not add the request and throws a
096: * <code>ResourceLimitReachedException</code>. If <code>queue_wait</code>
097: * is <i>on</i>, then this method blocks until no more than
098: * <code>queue_min</code> requests are in the queue; it then adds the
099: * request, and returns.
100: */
101:
102: protected synchronized void add(ServerRequest request)
103: throws ResourceLimitReachedException {
104: checkIsConfigured();
105:
106: if (queue.size() >= queueMax) {
107: if (logger.isWarnEnabled()) {
108: logger
109: .warn("Request queue is full, consider increasing "
110: + "jacorb.poa.queue_max (currently: "
111: + queueMax + ")");
112: }
113:
114: if (queueWait) {
115: while (queue.size() > queueMin) {
116: try {
117: this .wait();
118: } catch (InterruptedException ex) {
119: // ignore
120: }
121: }
122: } else {
123: throw new ResourceLimitReachedException();
124: }
125: }
126: queue.add(request);
127:
128: if (queue.size() == 1) {
129: controller.continueToWork();
130: }
131:
132: if (logger.isDebugEnabled()) {
133: logger.debug("rid: " + request.requestId() + " opname: "
134: + request.operation() + " is queued (queue size: "
135: + queue.size() + ")");
136: }
137:
138: // notify a queue listener
139: if (queueListener != null) {
140: queueListener.requestAddedToQueue(request, queue.size());
141: }
142: }
143:
144: protected synchronized void addRequestQueueListener(
145: RequestQueueListener listener) {
146: checkIsConfigured();
147:
148: queueListener = EventMulticaster.add(queueListener, listener);
149: }
150:
151: protected synchronized StringPair[] deliverContent() {
152: checkIsConfigured();
153:
154: StringPair[] result = new StringPair[queue.size()];
155: Iterator en = queue.iterator();
156: ServerRequest sr;
157: for (int i = 0; i < result.length; i++) {
158: sr = (ServerRequest) en.next();
159: result[i] = new StringPair(
160: Integer.toString(sr.requestId()), new String(sr
161: .objectId()));
162: }
163: return result;
164: }
165:
166: protected synchronized ServerRequest getElementAndRemove(int rid) {
167: checkIsConfigured();
168:
169: if (!queue.isEmpty()) {
170: Iterator en = queue.iterator();
171: ServerRequest result;
172: while (en.hasNext()) {
173: result = (ServerRequest) en.next();
174: if (result.requestId() == rid) {
175: en.remove();
176: this .notifyAll();
177: // notify a queue listener
178: if (queueListener != null) {
179: queueListener.requestRemovedFromQueue(result,
180: queue.size());
181: }
182: return result;
183: }
184: }
185: }
186: return null;
187: }
188:
189: protected synchronized ServerRequest getFirst() {
190: checkIsConfigured();
191:
192: if (!queue.isEmpty()) {
193: return (ServerRequest) queue.getFirst();
194: }
195: return null;
196: }
197:
198: protected boolean isEmpty() {
199: checkIsConfigured();
200:
201: return queue.isEmpty();
202: }
203:
204: protected synchronized ServerRequest removeFirst() {
205: checkIsConfigured();
206:
207: if (!queue.isEmpty()) {
208: ServerRequest result = (ServerRequest) queue.removeFirst();
209: this .notifyAll();
210: // notify a queue listener
211:
212: if (queueListener != null) {
213: queueListener.requestRemovedFromQueue(result, queue
214: .size());
215: }
216: return result;
217: }
218: return null;
219: }
220:
221: protected synchronized ServerRequest removeLast() {
222: checkIsConfigured();
223:
224: if (!queue.isEmpty()) {
225: ServerRequest result = (ServerRequest) queue.removeLast();
226: this .notifyAll();
227: // notify a queue listener
228: if (queueListener != null) {
229: queueListener.requestRemovedFromQueue(result, queue
230: .size());
231: }
232: return result;
233: }
234: return null;
235: }
236:
237: protected synchronized void removeRequestQueueListener(
238: RequestQueueListener listener) {
239: checkIsConfigured();
240: queueListener = EventMulticaster
241: .remove(queueListener, listener);
242: }
243:
244: protected int size() {
245: return queue.size();
246: }
247:
248: private void checkIsConfigured() {
249: if (!configured) {
250: throw new BAD_INV_ORDER(
251: "RequestQueue is not configured yet.");
252: }
253: }
254: }
|