001: /*
002: * Copyright (c) xsocket.org, 2006 - 2008. All rights reserved.
003: *
004: * This library is free software; you can redistribute it and/or
005: * modify it under the terms of the GNU Lesser General Public
006: * License as published by the Free Software Foundation; either
007: * version 2.1 of the License, or (at your option) any later version.
008: *
009: * This library is distributed in the hope that it will be useful,
010: * but WITHOUT ANY WARRANTY; without even the implied warranty of
011: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
012: * Lesser General Public License for more details.
013: *
014: * You should have received a copy of the GNU Lesser General Public
015: * License along with this library; if not, write to the Free Software
016: * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
017: *
018: * Please refer to the LGPL license at: http://www.gnu.org/copyleft/lesser.txt
019: * The latest copy of this software may be found on http://www.xsocket.org/
020: */
021: package org.xsocket.connection.spi;
022:
023: import java.io.IOException;
024: import java.lang.ref.WeakReference;
025: import java.nio.ByteBuffer;
026: import java.nio.channels.ClosedChannelException;
027: import java.util.TimerTask;
028: import java.util.concurrent.atomic.AtomicInteger;
029: import java.util.logging.Level;
030: import java.util.logging.Logger;
031:
032: import org.xsocket.connection.INonBlockingConnection;
033:
034: /**
035: * Delayed read IO handler
036: *
037: * @author grro@xsocket.org
038: */
039: final class IoThrottledReadHandler extends ChainableIoHandler {
040:
041: private static final Logger LOG = Logger
042: .getLogger(IoThrottledReadHandler.class.getName());
043:
044: private static final int CHECK_PERIOD_MILLIS = 500;
045:
046: // event handling
047: private final IOEventHandler ioEventHandler = new IOEventHandler();
048:
049: // timer handling
050: private int readBytesPerSec = INonBlockingConnection.UNLIMITED;
051: private final AtomicInteger currentReceived = new AtomicInteger();
052: private TimerTask readControlTask = null;
053:
054: // suspend flag
055: private boolean isSuspended = false;
056:
057: private int orgReadBufferSize = 0;
058:
059: /**
060: * constructor
061: * @param successor the successor
062: */
063: IoThrottledReadHandler(ChainableIoHandler successor) {
064: super (successor);
065:
066: readControlTask = new ReadControlTask(this );
067: DefaultIoProvider.getTimer().schedule(readControlTask,
068: CHECK_PERIOD_MILLIS, CHECK_PERIOD_MILLIS);
069: }
070:
071: /**
072: * {@inheritDoc}
073: */
074: public void init(IIoHandlerCallback callbackHandler)
075: throws IOException {
076: setPreviousCallback(callbackHandler);
077: getSuccessor().init(ioEventHandler);
078:
079: orgReadBufferSize = (Integer) getSuccessor().getOption(
080: IClientIoProvider.SO_RCVBUF);
081:
082: getSocketHandler().setRetryRead(false);
083: }
084:
085: /**
086: * {@inheritDoc}
087: */
088: public boolean reset() {
089: readBytesPerSec = INonBlockingConnection.UNLIMITED;
090: if (readControlTask != null) {
091: readControlTask.cancel();
092: readControlTask = null;
093: }
094:
095: getSocketHandler().setRetryRead(true);
096:
097: try {
098: getSuccessor().setOption(IClientIoProvider.SO_RCVBUF,
099: orgReadBufferSize);
100: getSuccessor().resumeRead();
101: } catch (IOException ioe) {
102: if (LOG.isLoggable(Level.FINE)) {
103: LOG.fine("Error occured by resuming read "
104: + ioe.toString());
105: }
106: }
107:
108: return super .reset();
109: }
110:
111: /**
112: * set the read rate in sec
113: *
114: * @param readRateSec the read rate
115: */
116: void setReadRateSec(int readRateSec) throws IOException {
117:
118: if (readRateSec < orgReadBufferSize) {
119: getSuccessor().setOption(IClientIoProvider.SO_RCVBUF,
120: readRateSec);
121: } else {
122: getSuccessor().setOption(IClientIoProvider.SO_RCVBUF,
123: orgReadBufferSize);
124: }
125: this .readBytesPerSec = readRateSec;
126: }
127:
128: /**
129: * {@inheritDoc}
130: */
131: public void close(boolean immediate) throws IOException {
132: if (!immediate) {
133: flushOutgoing();
134: }
135:
136: getSuccessor().close(immediate);
137: }
138:
139: public void write(ByteBuffer[] buffers)
140: throws ClosedChannelException, IOException {
141: getSuccessor().write(buffers);
142: }
143:
144: @Override
145: public void flushOutgoing() throws IOException {
146: getSuccessor().flushOutgoing();
147: }
148:
149: /**
150: * {@inheritDoc}
151: */
152: public void setPreviousCallback(IIoHandlerCallback callbackHandler) {
153: super .setPreviousCallback(callbackHandler);
154: getSuccessor().setPreviousCallback(ioEventHandler);
155: }
156:
157: private IoSocketHandler getSocketHandler() {
158: ChainableIoHandler successor = this ;
159: do {
160: successor = getSuccessor();
161: if (successor != null) {
162: if (successor instanceof IoSocketHandler) {
163: return (IoSocketHandler) successor;
164: }
165: }
166:
167: } while (successor != null);
168:
169: return null;
170: }
171:
172: private static final class ReadControlTask extends TimerTask {
173:
174: private WeakReference<IoThrottledReadHandler> ioThrottledReadHandlerRef = null;
175:
176: private int outstanding = 0;
177:
178: public ReadControlTask(
179: IoThrottledReadHandler ioThrottledReadHandler) {
180: ioThrottledReadHandlerRef = new WeakReference<IoThrottledReadHandler>(
181: ioThrottledReadHandler);
182: }
183:
184: @Override
185: public void run() {
186:
187: IoThrottledReadHandler ioThrottledReadHandler = ioThrottledReadHandlerRef
188: .get();
189:
190: if (ioThrottledReadHandler == null) {
191: cancel();
192:
193: } else {
194: outstanding += ioThrottledReadHandler.currentReceived
195: .getAndSet(0);
196:
197: if (outstanding > 0) {
198: double periodSec = ((double) CHECK_PERIOD_MILLIS) / 1000;
199: int delta = (int) (ioThrottledReadHandler.readBytesPerSec * periodSec);
200: outstanding -= delta;
201: }
202:
203: if (outstanding < 0) {
204: outstanding = 0;
205: }
206:
207: if (outstanding > 0) {
208: if (!ioThrottledReadHandler.isSuspended) {
209: try {
210: if (LOG.isLoggable(Level.FINE)) {
211: LOG.fine("suspending read");
212: }
213: ioThrottledReadHandler.getSuccessor()
214: .suspendRead();
215: ioThrottledReadHandler.isSuspended = true;
216:
217: } catch (IOException ioe) {
218: if (LOG.isLoggable(Level.FINE)) {
219: LOG
220: .fine("Error occured by suspendig read "
221: + ioe.toString());
222: }
223: }
224: }
225: } else {
226:
227: if (ioThrottledReadHandler.isSuspended) {
228: try {
229: if (LOG.isLoggable(Level.FINE)) {
230: LOG.fine("resuming read");
231: }
232: ioThrottledReadHandler.getSuccessor()
233: .resumeRead();
234: ioThrottledReadHandler.isSuspended = false;
235:
236: } catch (IOException ioe) {
237: if (LOG.isLoggable(Level.FINE)) {
238: LOG
239: .fine("Error occured by resuming read "
240: + ioe.toString());
241: }
242: }
243: }
244: }
245:
246: }
247: }
248: }
249:
250: private final class IOEventHandler implements IIoHandlerCallback {
251:
252: public void onData(ByteBuffer[] data) {
253:
254: int read = 0;
255:
256: for (ByteBuffer byteBuffer : data) {
257: read += byteBuffer.remaining();
258: }
259:
260: currentReceived.addAndGet(read);
261:
262: getPreviousCallback().onData(data);
263: }
264:
265: public void onConnect() {
266: getPreviousCallback().onConnect();
267: }
268:
269: public void onWriteException(IOException ioException,
270: ByteBuffer data) {
271: getPreviousCallback().onWriteException(ioException, data);
272: }
273:
274: public void onWritten(ByteBuffer data) {
275: getPreviousCallback().onWritten(data);
276: }
277:
278: public void onDisconnect() {
279: getPreviousCallback().onDisconnect();
280: }
281:
282: public void onConnectionAbnormalTerminated() {
283: getPreviousCallback().onConnectionAbnormalTerminated();
284: }
285:
286: public void onConnectionTimeout() {
287: getPreviousCallback().onConnectionTimeout();
288: }
289:
290: public void onIdleTimeout() {
291: getPreviousCallback().onIdleTimeout();
292: }
293: }
294: }
|