001: /*
002: * Copyright (c) xsocket.org, 2006 - 2008. All rights reserved.
003: *
004: * This library is free software; you can redistribute it and/or
005: * modify it under the terms of the GNU Lesser General Public
006: * License as published by the Free Software Foundation; either
007: * version 2.1 of the License, or (at your option) any later version.
008: *
009: * This library is distributed in the hope that it will be useful,
010: * but WITHOUT ANY WARRANTY; without even the implied warranty of
011: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
012: * Lesser General Public License for more details.
013: *
014: * You should have received a copy of the GNU Lesser General Public
015: * License along with this library; if not, write to the Free Software
016: * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
017: *
018: * Please refer to the LGPL license at: http://www.gnu.org/copyleft/lesser.txt
019: * The latest copy of this software may be found on http://www.xsocket.org/
020: */
021: package org.xsocket.connection.spi;
022:
023: import java.io.IOException;
024: import java.lang.ref.WeakReference;
025: import java.nio.ByteBuffer;
026: import java.util.ArrayList;
027: import java.util.TimerTask;
028: import java.util.logging.Level;
029: import java.util.logging.Logger;
030:
031: import org.xsocket.DataConverter;
032: import org.xsocket.connection.INonBlockingConnection;
033:
034: /**
035: * Delayed write IO handler
036: *
037: * @author grro@xsocket.org
038: */
039: final class IoThrottledWriteHandler extends ChainableIoHandler {
040:
041: private static final Logger LOG = Logger
042: .getLogger(IoThrottledWriteHandler.class.getName());
043:
044: // write queue
045: private final ArrayList<DelayQueueEntry> sendQueue = new ArrayList<DelayQueueEntry>(
046: 1);
047:
048: // timer handling
049: private int sendBytesPerSec = INonBlockingConnection.UNLIMITED;
050: private TimerTask delayedDelivererTask = null;
051:
052: /**
053: * constructor
054: * @param successor the successor
055: */
056: IoThrottledWriteHandler(ChainableIoHandler successor) {
057: super (successor);
058: }
059:
060: /**
061: * {@inheritDoc}
062: */
063: public void init(IIoHandlerCallback callbackHandler)
064: throws IOException {
065: setPreviousCallback(callbackHandler);
066: getSuccessor().init(callbackHandler);
067: }
068:
069: /**
070: * {@inheritDoc}
071: */
072: public boolean reset() {
073: sendQueue.clear();
074:
075: sendBytesPerSec = INonBlockingConnection.UNLIMITED;
076: if (delayedDelivererTask != null) {
077: delayedDelivererTask.cancel();
078: delayedDelivererTask = null;
079: }
080:
081: return super .reset();
082: }
083:
084: /**
085: * set the write rate in sec
086: *
087: * @param writeRateSec the write rate
088: */
089: void setWriteRateSec(int writeRateSec) {
090: this .sendBytesPerSec = writeRateSec;
091: }
092:
093: /**
094: * {@inheritDoc}
095: */
096: @Override
097: public int getPendingWriteDataSize() {
098: return getSendQueueSize() + super .getPendingWriteDataSize();
099: }
100:
101: @Override
102: public boolean hasDataToSend() {
103: return ((getSendQueueSize() > 0) || super .hasDataToSend());
104: }
105:
106: @SuppressWarnings("unchecked")
107: private int getSendQueueSize() {
108: int size = 0;
109:
110: ArrayList<DelayQueueEntry> copy = null;
111: synchronized (sendQueue) {
112: copy = (ArrayList<DelayQueueEntry>) sendQueue.clone();
113: }
114:
115: for (DelayQueueEntry entry : copy) {
116: size += entry.buffer.remaining();
117: }
118:
119: return size;
120: }
121:
122: /**
123: * {@inheritDoc}
124: */
125: public void close(boolean immediate) throws IOException {
126: if (!immediate) {
127: flushOutgoing();
128: }
129:
130: getSuccessor().close(immediate);
131: }
132:
133: /**
134: * {@inheritDoc}
135: */
136: public void write(ByteBuffer[] buffers) {
137: for (ByteBuffer buffer : buffers) {
138: writeOutgoing(buffer);
139: }
140: }
141:
142: private void writeOutgoing(ByteBuffer buffer) {
143:
144: // append to delay queue
145: int size = buffer.remaining();
146: if (size > 0) {
147:
148: DelayQueueEntry delayQueueEntry = new DelayQueueEntry(
149: buffer.duplicate(), sendBytesPerSec);
150:
151: if (LOG.isLoggable(Level.FINE)) {
152: LOG.fine("[" + getId() + "] add " + delayQueueEntry
153: + " to delay queue");
154: }
155: synchronized (sendQueue) {
156: sendQueue.add(delayQueueEntry);
157: }
158: }
159:
160: // create delivery task if not exists
161: if (delayedDelivererTask == null) {
162: int period = 500;
163:
164: if (LOG.isLoggable(Level.FINE)) {
165: LOG
166: .fine("["
167: + getId()
168: + "] delay delivery task is null. Starting task (period="
169: + DataConverter
170: .toFormatedDuration(period)
171: + ")");
172: }
173:
174: delayedDelivererTask = new DeliveryTask(this );
175: DefaultIoProvider.getTimer().schedule(delayedDelivererTask,
176: 0, 500);
177: }
178: }
179:
180: /**
181: * {@inheritDoc}
182: */
183: public void flushOutgoing() throws IOException {
184: if (LOG.isLoggable(Level.FINE)) {
185: LOG.fine("flush remaning data");
186: }
187:
188: synchronized (sendQueue) {
189: if (!sendQueue.isEmpty()) {
190: DelayQueueEntry[] entries = sendQueue
191: .toArray(new DelayQueueEntry[sendQueue.size()]);
192: sendQueue.clear();
193:
194: ByteBuffer[] buffers = new ByteBuffer[entries.length];
195: for (int i = 0; i < buffers.length; i++) {
196: buffers[i] = entries[i].getBuffer();
197: }
198:
199: if (LOG.isLoggable(Level.FINE)) {
200: LOG.fine("[" + getId() + "] flushing "
201: + buffers.length
202: + " buffers of delay queue");
203: }
204:
205: try {
206: IoThrottledWriteHandler.this .getSuccessor().write(
207: buffers);
208: } catch (Exception e) {
209: if (LOG.isLoggable(Level.FINE)) {
210: LOG
211: .fine("["
212: + getId()
213: + "] error occured while writing. Reason: "
214: + e.toString());
215: }
216: }
217: }
218: }
219:
220: getSuccessor().flushOutgoing();
221: }
222:
223: private static final class DeliveryTask extends TimerTask {
224:
225: private WeakReference<IoThrottledWriteHandler> ioThrottledWriteHandlerRef = null;
226:
227: public DeliveryTask(
228: IoThrottledWriteHandler ioThrottledWriteHandler) {
229: ioThrottledWriteHandlerRef = new WeakReference<IoThrottledWriteHandler>(
230: ioThrottledWriteHandler);
231: }
232:
233: @Override
234: public void run() {
235:
236: IoThrottledWriteHandler ioThrottledWriteHandler = ioThrottledWriteHandlerRef
237: .get();
238:
239: if (ioThrottledWriteHandler == null) {
240: cancel();
241:
242: } else {
243: synchronized (ioThrottledWriteHandler.sendQueue) {
244:
245: long currentTime = System.currentTimeMillis();
246: while (!ioThrottledWriteHandler.sendQueue.isEmpty()) {
247: try {
248:
249: // get the oldest entry (index 0) and write based on rate
250: DelayQueueEntry qe = ioThrottledWriteHandler.sendQueue
251: .get(0);
252: int remaingSize = qe.write(currentTime);
253:
254: // if all data of this entry is written remove entry and stay in loop
255: if (remaingSize == 0) {
256: ioThrottledWriteHandler.sendQueue
257: .remove(qe);
258:
259: if (LOG.isLoggable(Level.FINE)) {
260: LOG
261: .fine("throttling write queue is emtpy");
262: }
263:
264: // ... else break loop and wait for next time event
265: } else {
266: break;
267: }
268:
269: } catch (Exception e) {
270: if (LOG.isLoggable(Level.FINE)) {
271: LOG
272: .fine("["
273: + ioThrottledWriteHandler
274: .getId()
275: + "] Error occured while write delayed. Reason: "
276: + e.toString());
277: }
278: }
279: }
280: }
281: }
282: }
283: }
284:
285: private final class DelayQueueEntry {
286: private ByteBuffer buffer = null;
287: private int bytesPerSec = 0;
288: private long lastWriteTime = 0;
289:
290: DelayQueueEntry(ByteBuffer buffer, int bytesPerSec) {
291: this .buffer = buffer;
292: this .bytesPerSec = bytesPerSec;
293: this .lastWriteTime = System.currentTimeMillis();
294: }
295:
296: ByteBuffer getBuffer() {
297: return buffer;
298: }
299:
300: int write(long currentTime) throws IOException {
301: int remaingSize = buffer.remaining();
302:
303: long elapsedTimeMillis = currentTime - lastWriteTime;
304:
305: if (elapsedTimeMillis > 0) {
306: int elapsedTimeSec = ((int) (elapsedTimeMillis)) / 1000;
307:
308: if (elapsedTimeSec > 0) {
309: int sizeToWrite = bytesPerSec * elapsedTimeSec;
310:
311: if (sizeToWrite > 0) {
312: ByteBuffer bytesToWrite = null;
313: if (buffer.remaining() <= sizeToWrite) {
314: bytesToWrite = buffer;
315: remaingSize = 0;
316:
317: } else {
318: int saveLimit = buffer.limit();
319: buffer.limit(sizeToWrite);
320: bytesToWrite = buffer.slice();
321: buffer.position(buffer.limit());
322: buffer.limit(saveLimit);
323: buffer = buffer.slice();
324: remaingSize = buffer.remaining();
325: }
326:
327: lastWriteTime = currentTime;
328: if (LOG.isLoggable(Level.FINE)) {
329: LOG.fine("[" + getId() + "] release "
330: + sizeToWrite
331: + " bytes from delay queue");
332: }
333:
334: ByteBuffer[] buffers = new ByteBuffer[1];
335: buffers[0] = bytesToWrite;
336: getSuccessor().write(buffers);
337: }
338: }
339: }
340:
341: return remaingSize;
342: }
343:
344: @Override
345: public String toString() {
346: return "buffer "
347: + DataConverter.toFormatedBytesSize(buffer
348: .remaining()) + " (write rate "
349: + bytesPerSec + " bytes/sec)";
350: }
351: }
352:
353: /**
354: * {@inheritDoc}
355: */
356: @Override
357: public String toString() {
358: try {
359: return this .getClass().getSimpleName()
360: + "(pending delayQueueSize="
361: + DataConverter
362: .toFormatedBytesSize(getPendingWriteDataSize())
363: + ") ->" + "\r\n" + getSuccessor().toString();
364: } catch (Exception e) {
365: return super.toString();
366: }
367: }
368:
369: }
|