001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one
003: * or more contributor license agreements. See the NOTICE file
004: * distributed with this work for additional information
005: * regarding copyright ownership. The ASF licenses this file
006: * to you under the Apache License, Version 2.0 (the
007: * "License"); you may not use this file except in compliance
008: * with the License. You may obtain a copy of the License at
009: *
010: * http://www.apache.org/licenses/LICENSE-2.0
011: *
012: * Unless required by applicable law or agreed to in writing,
013: * software distributed under the License is distributed on an
014: * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015: * KIND, either express or implied. See the License for the
016: * specific language governing permissions and limitations
017: * under the License.
018: *
019: */
020: package org.apache.mina.filter.executor;
021:
022: import java.util.concurrent.ThreadPoolExecutor;
023: import java.util.concurrent.atomic.AtomicInteger;
024:
025: import org.apache.mina.common.IoEvent;
026: import org.slf4j.Logger;
027: import org.slf4j.LoggerFactory;
028:
029: /**
030: * Throttles incoming events into {@link OrderedThreadPoolExecutor} or
031: * {@link UnorderedThreadPoolExecutor}.
032: *
033: * @author The Apache MINA Project (dev@mina.apache.org)
034: * @version $Rev: 598191 $, $Date: 2007-11-26 02:49:49 -0700 (Mon, 26 Nov 2007) $
035: */
036: public class IoEventQueueThrottle implements IoEventQueueHandler {
037:
038: private final Logger logger = LoggerFactory.getLogger(getClass());
039:
040: private final IoEventSizeEstimator eventSizeEstimator;
041: private volatile int threshold;
042:
043: private final Object lock = new Object();
044: private final AtomicInteger counter = new AtomicInteger();
045: private int waiters;
046:
047: public IoEventQueueThrottle() {
048: this (new DefaultIoEventSizeEstimator(), 65536);
049: }
050:
051: public IoEventQueueThrottle(int threshold) {
052: this (new DefaultIoEventSizeEstimator(), threshold);
053: }
054:
055: public IoEventQueueThrottle(
056: IoEventSizeEstimator eventSizeEstimator, int threshold) {
057: if (eventSizeEstimator == null) {
058: throw new NullPointerException("eventSizeEstimator");
059: }
060: this .eventSizeEstimator = eventSizeEstimator;
061:
062: setThreshold(threshold);
063: }
064:
065: public IoEventSizeEstimator getEventSizeEstimator() {
066: return eventSizeEstimator;
067: }
068:
069: public int getThreshold() {
070: return threshold;
071: }
072:
073: public int getCounter() {
074: return counter.get();
075: }
076:
077: public void setThreshold(int threshold) {
078: if (threshold <= 0) {
079: throw new IllegalArgumentException("threshold: "
080: + threshold);
081: }
082: this .threshold = threshold;
083: }
084:
085: public boolean accept(ThreadPoolExecutor executor, IoEvent event) {
086: return true;
087: }
088:
089: public void offered(ThreadPoolExecutor executor, IoEvent event) {
090: int eventSize = estimateSize(event);
091: int currentCounter = counter.addAndGet(eventSize);
092: logState();
093:
094: if (currentCounter >= threshold) {
095: block();
096: }
097: }
098:
099: public void polled(ThreadPoolExecutor executor, IoEvent event) {
100: int eventSize = estimateSize(event);
101: int currentCounter = counter.addAndGet(-eventSize);
102:
103: logState();
104:
105: if (currentCounter < threshold) {
106: unblock();
107: }
108: }
109:
110: private int estimateSize(IoEvent event) {
111: int size = getEventSizeEstimator().estimateSize(event);
112: if (size < 0) {
113: throw new IllegalStateException(IoEventSizeEstimator.class
114: .getSimpleName()
115: + " returned "
116: + "a negative value ("
117: + size
118: + "): " + event);
119: }
120: return size;
121: }
122:
123: private void logState() {
124: if (logger.isDebugEnabled()) {
125: logger.debug(Thread.currentThread().getName() + " state: "
126: + counter.get() + " / " + getThreshold());
127: }
128: }
129:
130: protected void block() {
131: if (logger.isDebugEnabled()) {
132: logger
133: .debug(Thread.currentThread().getName()
134: + " blocked: " + counter.get() + " >= "
135: + threshold);
136: }
137:
138: synchronized (lock) {
139: while (counter.get() >= threshold) {
140: waiters++;
141: try {
142: lock.wait();
143: } catch (InterruptedException e) {
144: // Wait uninterruptably.
145: } finally {
146: waiters--;
147: }
148: }
149: }
150:
151: if (logger.isDebugEnabled()) {
152: logger.debug(Thread.currentThread().getName()
153: + " unblocked: " + counter.get() + " < "
154: + threshold);
155: }
156: }
157:
158: protected void unblock() {
159: synchronized (lock) {
160: if (waiters > 0) {
161: lock.notify();
162: }
163: }
164: }
165: }
|