001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one
003: * or more contributor license agreements. See the NOTICE file
004: * distributed with this work for additional information
005: * regarding copyright ownership. The ASF licenses this file
006: * to you under the Apache License, Version 2.0 (the
007: * "License"); you may not use this file except in compliance
008: * with the License. You may obtain a copy of the License at
009: *
010: * http://www.apache.org/licenses/LICENSE-2.0
011: *
012: * Unless required by applicable law or agreed to in writing,
013: * software distributed under the License is distributed on an
014: * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015: * KIND, either express or implied. See the License for the
016: * specific language governing permissions and limitations
017: * under the License.
018: *
019: */
020: package org.apache.mina.common;
021:
022: import java.nio.ByteBuffer;
023: import java.nio.ByteOrder;
024: import java.util.HashMap;
025: import java.util.Map;
026: import java.util.Queue;
027:
028: import org.apache.mina.util.CircularQueue;
029:
030: /**
031: * An {@link IoBufferAllocator} that caches the buffers which are likely to
032: * be reused during auto-expansion of the buffers.
033: * <p>
034: * In {@link SimpleBufferAllocator}, the underlying {@link ByteBuffer} of
035: * the {@link IoBuffer} is reallocated on its capacity change, which means
036: * the newly allocated bigger {@link ByteBuffer} replaces the old small
037: * {@link ByteBuffer}. Consequently, the old {@link ByteBuffer} is marked
038: * for garbage collection.
039: * <p>
040: * It's not a problem in most cases as long as the capacity change doesn't
041: * happen frequently. However, once it happens too often, it burdens the
042: * VM and the cost of filling the newly allocated {@link ByteBuffer} with
043: * {@code NUL} surpass the cost of accessing the cache. In 2 dual-core
044: * Opteron Italy 270 processors, {@link CachedBufferAllocator} outperformed
045: * {@link SimpleBufferAllocator} in the following situation:
046: * <ul>
047: * <li>when a 32 bytes buffer is expanded 4 or more times,</li>
048: * <li>when a 64 bytes buffer is expanded 4 or more times,</li>
049: * <li>when a 128 bytes buffer is expanded 2 or more times,</li>
050: * <li>and when a 256 bytes or bigger buffer is expanded 1 or more times.</li>
051: * </ul>
052: * Please note the observation above is subject to change in a different
053: * environment.
054: * <p>
055: * {@link CachedBufferAllocator} uses {@link ThreadLocal} to store the cached
056: * buffer, allocates buffers whose capacity is power of 2 only and provides
057: * performance advantage if {@link IoBuffer#free()} is called properly.
058: *
059: * @author The Apache MINA Project (dev@mina.apache.org)
060: * @version $Rev: 594400 $, $Date: 2007-11-12 19:49:43 -0700 (Mon, 12 Nov 2007) $
061: */
062: public class CachedBufferAllocator implements IoBufferAllocator {
063:
064: private static final int DEFAULT_MAX_POOL_SIZE = 8;
065: private static final int DEFAULT_MAX_CACHED_BUFFER_SIZE = 1 << 18; // 256KB
066:
067: private final int maxPoolSize;
068: private final int maxCachedBufferSize;
069:
070: private final ThreadLocal<Map<Integer, Queue<CachedBuffer>>> heapBuffers;
071: private final ThreadLocal<Map<Integer, Queue<CachedBuffer>>> directBuffers;
072:
073: /**
074: * Creates a new instance with the default parameters
075: * ({@literal #DEFAULT_MAX_POOL_SIZE} and {@literal #DEFAULT_MAX_CACHED_BUFFER_SIZE}).
076: */
077: public CachedBufferAllocator() {
078: this (DEFAULT_MAX_POOL_SIZE, DEFAULT_MAX_CACHED_BUFFER_SIZE);
079: }
080:
081: /**
082: * Creates a new instance.
083: *
084: * @param maxPoolSize the maximum number of buffers with the same capacity per thread.
085: * <tt>0</tt> disables this limitation.
086: * @param maxCachedBufferSize the maximum capacity of a cached buffer.
087: * A buffer whose capacity is bigger than this value is
088: * not pooled. <tt>0</tt> disables this limitation.
089: */
090: public CachedBufferAllocator(int maxPoolSize,
091: int maxCachedBufferSize) {
092: if (maxPoolSize < 0) {
093: throw new IllegalArgumentException("maxPoolSize: "
094: + maxPoolSize);
095: }
096: if (maxCachedBufferSize < 0) {
097: throw new IllegalArgumentException("maxCachedBufferSize: "
098: + maxCachedBufferSize);
099: }
100:
101: this .maxPoolSize = maxPoolSize;
102: this .maxCachedBufferSize = maxCachedBufferSize;
103:
104: this .heapBuffers = new ThreadLocal<Map<Integer, Queue<CachedBuffer>>>() {
105: @Override
106: protected Map<Integer, Queue<CachedBuffer>> initialValue() {
107: return newPoolMap();
108: }
109: };
110: this .directBuffers = new ThreadLocal<Map<Integer, Queue<CachedBuffer>>>() {
111: @Override
112: protected Map<Integer, Queue<CachedBuffer>> initialValue() {
113: return newPoolMap();
114: }
115: };
116: }
117:
118: /**
119: * Returns the maximum number of buffers with the same capacity per thread.
120: * <tt>0</tt> means 'no limitation'.
121: */
122: public int getMaxPoolSize() {
123: return maxPoolSize;
124: }
125:
126: /**
127: * Returns the maximum capacity of a cached buffer. A buffer whose
128: * capacity is bigger than this value is not pooled. <tt>0</tt> means
129: * 'no limitation'.
130: */
131: public int getMaxCachedBufferSize() {
132: return maxCachedBufferSize;
133: }
134:
135: private Map<Integer, Queue<CachedBuffer>> newPoolMap() {
136: Map<Integer, Queue<CachedBuffer>> poolMap = new HashMap<Integer, Queue<CachedBuffer>>();
137: int poolSize = maxPoolSize == 0 ? DEFAULT_MAX_POOL_SIZE
138: : maxPoolSize;
139: for (int i = 0; i < 31; i++) {
140: poolMap.put(1 << i, new CircularQueue<CachedBuffer>(
141: poolSize));
142: }
143: poolMap.put(0, new CircularQueue<CachedBuffer>(poolSize));
144: poolMap.put(Integer.MAX_VALUE, new CircularQueue<CachedBuffer>(
145: poolSize));
146: return poolMap;
147: }
148:
149: public IoBuffer allocate(int requestedCapacity, boolean direct) {
150: int actualCapacity = IoBuffer
151: .normalizeCapacity(requestedCapacity);
152: IoBuffer buf;
153: if (maxCachedBufferSize != 0
154: && actualCapacity > maxCachedBufferSize) {
155: if (direct) {
156: buf = wrap(ByteBuffer.allocateDirect(actualCapacity));
157: } else {
158: buf = wrap(ByteBuffer.allocate(actualCapacity));
159: }
160: } else {
161: Queue<CachedBuffer> pool;
162: if (direct) {
163: pool = directBuffers.get().get(actualCapacity);
164: } else {
165: pool = heapBuffers.get().get(actualCapacity);
166: }
167:
168: // Recycle if possible.
169: buf = pool.poll();
170: if (buf != null) {
171: buf.clear();
172: buf.setAutoExpand(false);
173: buf.order(ByteOrder.BIG_ENDIAN);
174: } else {
175: if (direct) {
176: buf = wrap(ByteBuffer
177: .allocateDirect(actualCapacity));
178: } else {
179: buf = wrap(ByteBuffer.allocate(actualCapacity));
180: }
181: }
182: }
183:
184: buf.limit(requestedCapacity);
185: return buf;
186: }
187:
188: public ByteBuffer allocateNioBuffer(int capacity, boolean direct) {
189: return allocate(capacity, direct).buf();
190: }
191:
192: public IoBuffer wrap(ByteBuffer nioBuffer) {
193: return new CachedBuffer(nioBuffer);
194: }
195:
196: public void dispose() {
197: }
198:
199: private class CachedBuffer extends AbstractIoBuffer {
200: private final Thread ownerThread;
201: private ByteBuffer buf;
202:
203: protected CachedBuffer(ByteBuffer buf) {
204: super (CachedBufferAllocator.this , buf.capacity());
205: this .ownerThread = Thread.currentThread();
206: this .buf = buf;
207: buf.order(ByteOrder.BIG_ENDIAN);
208: }
209:
210: protected CachedBuffer(CachedBuffer parent, ByteBuffer buf) {
211: super (parent);
212: this .ownerThread = Thread.currentThread();
213: this .buf = buf;
214: }
215:
216: @Override
217: public ByteBuffer buf() {
218: if (buf == null) {
219: throw new IllegalStateException(
220: "Buffer has been freed already.");
221: }
222: return buf;
223: }
224:
225: @Override
226: protected void buf(ByteBuffer buf) {
227: ByteBuffer oldBuf = this .buf;
228: this .buf = buf;
229: free(oldBuf);
230: }
231:
232: @Override
233: protected IoBuffer duplicate0() {
234: return new CachedBuffer(this , buf().duplicate());
235: }
236:
237: @Override
238: protected IoBuffer slice0() {
239: return new CachedBuffer(this , buf().slice());
240: }
241:
242: @Override
243: protected IoBuffer asReadOnlyBuffer0() {
244: return new CachedBuffer(this , buf().asReadOnlyBuffer());
245: }
246:
247: @Override
248: public byte[] array() {
249: return buf().array();
250: }
251:
252: @Override
253: public int arrayOffset() {
254: return buf().arrayOffset();
255: }
256:
257: @Override
258: public boolean hasArray() {
259: return buf().hasArray();
260: }
261:
262: @Override
263: public void free() {
264: free(buf);
265: buf = null;
266: }
267:
268: private void free(ByteBuffer oldBuf) {
269: if (oldBuf == null
270: || oldBuf.capacity() > maxCachedBufferSize
271: || oldBuf.isReadOnly() || isDerived()
272: || Thread.currentThread() != ownerThread) {
273: return;
274: }
275:
276: // Add to the cache.
277: Queue<CachedBuffer> pool;
278: if (oldBuf.isDirect()) {
279: pool = directBuffers.get().get(oldBuf.capacity());
280: } else {
281: pool = heapBuffers.get().get(oldBuf.capacity());
282: }
283:
284: if (pool == null) {
285: return;
286: }
287:
288: // Restrict the size of the pool to prevent OOM.
289: if (maxPoolSize == 0 || pool.size() < maxPoolSize) {
290: pool.offer(new CachedBuffer(oldBuf));
291: }
292: }
293: }
294: }
|