001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one
003: * or more contributor license agreements. See the NOTICE file
004: * distributed with this work for additional information
005: * regarding copyright ownership. The ASF licenses this file
006: * to you under the Apache License, Version 2.0 (the
007: * "License"); you may not use this file except in compliance
008: * with the License. You may obtain a copy of the License at
009: *
010: * http://www.apache.org/licenses/LICENSE-2.0
011: *
012: * Unless required by applicable law or agreed to in writing,
013: * software distributed under the License is distributed on an
014: * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015: * KIND, either express or implied. See the License for the
016: * specific language governing permissions and limitations
017: * under the License.
018: *
019: */
020: package org.apache.mina.filter.executor;
021:
022: import java.util.ArrayList;
023: import java.util.Collection;
024: import java.util.Collections;
025: import java.util.EnumSet;
026: import java.util.concurrent.Executor;
027: import java.util.concurrent.ExecutorService;
028: import java.util.concurrent.Executors;
029: import java.util.concurrent.ThreadFactory;
030: import java.util.concurrent.TimeUnit;
031:
032: import org.apache.mina.common.IdleStatus;
033: import org.apache.mina.common.IoEventType;
034: import org.apache.mina.common.IoFilterAdapter;
035: import org.apache.mina.common.IoFilterChain;
036: import org.apache.mina.common.IoFilterEvent;
037: import org.apache.mina.common.IoSession;
038: import org.apache.mina.common.WriteRequest;
039:
040: /**
041: * A filter that forwards I/O events to {@link Executor} to enforce a certain
042: * thread model while allowing the events per session to be processed
043: * simultaneously. You can apply various thread model by inserting this filter
044: * to a {@link IoFilterChain}.
045: *
046: * <h2>Life Cycle Management</h2>
047: *
048: * Please note that this filter doesn't manage the life cycle of the {@link Executor}.
049: * If you created this filter using {@link #ExecutorFilter(Executor)} or similar
050: * constructor that accepts an {@link Executor} that you've instantiated, you have
051: * full control and responsibility of managing its life cycle (e.g. calling
052: * {@link ExecutorService#shutdown()}.
053: * <p>
054: * If you created this filter using convenience constructors like
055: * {@link #ExecutorFilter(int)}, then you can shut down the executor by calling
056: * {@link #destroy()} explicitly.
057: *
058: * <h2>Event Ordering</h2>
059: *
060: * All convenience constructors of this filter creates a new
061: * {@link OrderedThreadPoolExecutor} instance. Therefore, the order of event is
062: * maintained like the following:
063: * <ul>
064: * <li>All event handler methods are called exclusively.
065: * (e.g. messageReceived and messageSent can't be invoked at the same time.)</li>
066: * <li>The event order is never mixed up.
067: * (e.g. messageReceived is always invoked before sessionClosed or messageSent.)</li>
068: * </ul>
069: * However, if you specified other {@link Executor} instance in the constructor,
070: * the order of events are not maintained at all. This means more than one event
071: * handler methods can be invoked at the same time with mixed order. For example,
072: * let's assume that messageReceived, messageSent, and sessionClosed events are
073: * fired.
074: * <ul>
075: * <li>All event handler methods can be called simultaneously.
076: * (e.g. messageReceived and messageSent can be invoked at the same time.)</li>
077: * <li>The event order can be mixed up.
078: * (e.g. sessionClosed or messageSent can be invoked before messageReceived
079: * is invoked.)</li>
080: * </ul>
081: * If you need to maintain the order of events per session, please specify an
082: * {@link OrderedThreadPoolExecutor} instance or use the convenience constructors.
083: *
084: * <h2>Selective Filtering</h2>
085: *
086: * By default, all event types but <tt>sessionCreated</tt>, <tt>filterWrite</tt>,
087: * <tt>filterClose</tt> and <tt>filterSetTrafficMask</tt> are submitted to the
088: * underlying executor, which is most common setting.
089: * <p>
090: * If you want to submit only a certain set of event types, you can specify them
091: * in the constructor. For example, you could configure a thread pool for
092: * write operation for the maximum performance:
093: * <pre><code>
094: * IoService service = ...;
095: * DefaultIoFilterChainBuilder chain = service.getFilterChain();
096: *
097: * chain.addLast("codec", new ProtocolCodecFilter(...));
098: * // Use one thread pool for most events.
099: * chain.addLast("executor1", new ExecutorFilter());
100: * // and another dedicated thread pool for 'filterWrite' events.
101: * chain.addLast("executor2", new ExecutorFilter(IoEventType.WRITE));
102: * </code></pre>
103: *
104: * <h2>Preventing {@link OutOfMemoryError}</h2>
105: *
106: * Please refer to {@link IoEventQueueThrottle}, which is specified as
107: * a parameter of the convenience constructors.
108: *
109: * @author The Apache MINA Project (dev@mina.apache.org)
110: * @version $Rev: 593428 $, $Date: 2007-11-08 23:35:16 -0700 (Thu, 08 Nov 2007) $
111: *
112: * @see OrderedThreadPoolExecutor
113: * @see UnorderedThreadPoolExecutor
114: */
115: public class ExecutorFilter extends IoFilterAdapter {
116:
117: private final EnumSet<IoEventType> eventTypes;
118: private final Executor executor;
119: private final boolean createdExecutor;
120:
121: /**
122: * (Convenience constructor) Creates a new instance with a new
123: * {@link OrderedThreadPoolExecutor}.
124: */
125: public ExecutorFilter() {
126: this (16, (IoEventType[]) null);
127: }
128:
129: /**
130: * (Convenience constructor) Creates a new instance with a new
131: * {@link OrderedThreadPoolExecutor}.
132: */
133: public ExecutorFilter(int maximumPoolSize) {
134: this (0, maximumPoolSize, (IoEventType[]) null);
135: }
136:
137: /**
138: * (Convenience constructor) Creates a new instance with a new
139: * {@link OrderedThreadPoolExecutor}.
140: */
141: public ExecutorFilter(int corePoolSize, int maximumPoolSize) {
142: this (corePoolSize, maximumPoolSize, 30, TimeUnit.SECONDS,
143: (IoEventType[]) null);
144: }
145:
146: /**
147: * (Convenience constructor) Creates a new instance with a new
148: * {@link OrderedThreadPoolExecutor}.
149: */
150: public ExecutorFilter(int corePoolSize, int maximumPoolSize,
151: long keepAliveTime, TimeUnit unit) {
152: this (corePoolSize, maximumPoolSize, keepAliveTime, unit,
153: (IoEventType[]) null);
154: }
155:
156: /**
157: * (Convenience constructor) Creates a new instance with a new
158: * {@link OrderedThreadPoolExecutor}.
159: */
160: public ExecutorFilter(int corePoolSize, int maximumPoolSize,
161: long keepAliveTime, TimeUnit unit,
162: IoEventQueueHandler queueHandler) {
163: this (corePoolSize, maximumPoolSize, keepAliveTime, unit,
164: Executors.defaultThreadFactory(), queueHandler,
165: (IoEventType[]) null);
166: }
167:
168: /**
169: * (Convenience constructor) Creates a new instance with a new
170: * {@link OrderedThreadPoolExecutor}.
171: */
172: public ExecutorFilter(int corePoolSize, int maximumPoolSize,
173: long keepAliveTime, TimeUnit unit,
174: ThreadFactory threadFactory) {
175: this (corePoolSize, maximumPoolSize, keepAliveTime, unit,
176: threadFactory, null, (IoEventType[]) null);
177: }
178:
179: /**
180: * (Convenience constructor) Creates a new instance with a new
181: * {@link OrderedThreadPoolExecutor}.
182: */
183: public ExecutorFilter(int corePoolSize, int maximumPoolSize,
184: long keepAliveTime, TimeUnit unit,
185: ThreadFactory threadFactory,
186: IoEventQueueHandler queueHandler) {
187: this (new OrderedThreadPoolExecutor(corePoolSize,
188: maximumPoolSize, keepAliveTime, unit, threadFactory,
189: queueHandler), true, (IoEventType[]) null);
190: }
191:
192: /**
193: * (Convenience constructor) Creates a new instance with a new
194: * {@link OrderedThreadPoolExecutor}.
195: */
196: public ExecutorFilter(IoEventType... eventTypes) {
197: this (16, eventTypes);
198: }
199:
200: /**
201: * (Convenience constructor) Creates a new instance with a new
202: * {@link OrderedThreadPoolExecutor}.
203: */
204: public ExecutorFilter(int maximumPoolSize,
205: IoEventType... eventTypes) {
206: this (0, maximumPoolSize, eventTypes);
207: }
208:
209: /**
210: * (Convenience constructor) Creates a new instance with a new
211: * {@link OrderedThreadPoolExecutor}.
212: */
213: public ExecutorFilter(int corePoolSize, int maximumPoolSize,
214: IoEventType... eventTypes) {
215: this (corePoolSize, maximumPoolSize, 30, TimeUnit.SECONDS,
216: eventTypes);
217: }
218:
219: /**
220: * (Convenience constructor) Creates a new instance with a new
221: * {@link OrderedThreadPoolExecutor}.
222: */
223: public ExecutorFilter(int corePoolSize, int maximumPoolSize,
224: long keepAliveTime, TimeUnit unit,
225: IoEventType... eventTypes) {
226: this (corePoolSize, maximumPoolSize, keepAliveTime, unit,
227: Executors.defaultThreadFactory(), eventTypes);
228: }
229:
230: /**
231: * (Convenience constructor) Creates a new instance with a new
232: * {@link OrderedThreadPoolExecutor}.
233: */
234: public ExecutorFilter(int corePoolSize, int maximumPoolSize,
235: long keepAliveTime, TimeUnit unit,
236: IoEventQueueHandler queueHandler, IoEventType... eventTypes) {
237: this (corePoolSize, maximumPoolSize, keepAliveTime, unit,
238: Executors.defaultThreadFactory(), queueHandler,
239: eventTypes);
240: }
241:
242: /**
243: * (Convenience constructor) Creates a new instance with a new
244: * {@link OrderedThreadPoolExecutor}.
245: */
246: public ExecutorFilter(int corePoolSize, int maximumPoolSize,
247: long keepAliveTime, TimeUnit unit,
248: ThreadFactory threadFactory, IoEventType... eventTypes) {
249: this (corePoolSize, maximumPoolSize, keepAliveTime, unit,
250: threadFactory, null, eventTypes);
251: }
252:
253: /**
254: * (Convenience constructor) Creates a new instance with a new
255: * {@link OrderedThreadPoolExecutor}.
256: */
257: public ExecutorFilter(int corePoolSize, int maximumPoolSize,
258: long keepAliveTime, TimeUnit unit,
259: ThreadFactory threadFactory,
260: IoEventQueueHandler queueHandler, IoEventType... eventTypes) {
261: this (new OrderedThreadPoolExecutor(corePoolSize,
262: maximumPoolSize, keepAliveTime, unit, threadFactory,
263: queueHandler), true, eventTypes);
264: }
265:
266: /**
267: * Creates a new instance with the specified {@link Executor}.
268: */
269: public ExecutorFilter(Executor executor) {
270: this (executor, false, (IoEventType[]) null);
271: }
272:
273: /**
274: * Creates a new instance with the specified {@link Executor}.
275: */
276: public ExecutorFilter(Executor executor, IoEventType... eventTypes) {
277: this (executor, false, eventTypes);
278: }
279:
280: private ExecutorFilter(Executor executor, boolean createdExecutor,
281: IoEventType... eventTypes) {
282: if (executor == null) {
283: throw new NullPointerException("executor");
284: }
285: if (eventTypes == null || eventTypes.length == 0) {
286: eventTypes = new IoEventType[] {
287: IoEventType.EXCEPTION_CAUGHT,
288: IoEventType.MESSAGE_RECEIVED,
289: IoEventType.MESSAGE_SENT,
290: IoEventType.SESSION_CLOSED,
291: IoEventType.SESSION_IDLE,
292: IoEventType.SESSION_OPENED, };
293: }
294:
295: for (IoEventType t : eventTypes) {
296: if (t == IoEventType.SESSION_CREATED) {
297: throw new IllegalArgumentException(
298: IoEventType.SESSION_CREATED
299: + " is not allowed.");
300: }
301: }
302:
303: this .executor = executor;
304: this .createdExecutor = createdExecutor;
305:
306: Collection<IoEventType> eventTypeCollection = new ArrayList<IoEventType>(
307: eventTypes.length);
308: Collections.addAll(eventTypeCollection, eventTypes);
309: this .eventTypes = EnumSet.copyOf(eventTypeCollection);
310: }
311:
312: /**
313: * Shuts down the underlying executor if this filter is creates via
314: * a convenience constructor.
315: */
316: @Override
317: public void destroy() {
318: if (createdExecutor) {
319: ((ExecutorService) executor).shutdown();
320: }
321: }
322:
323: /**
324: * Returns the underlying {@link Executor} instance this filter uses.
325: */
326: public final Executor getExecutor() {
327: return executor;
328: }
329:
330: /**
331: * Fires the specified event through the underlying executor.
332: */
333: protected void fireEvent(IoFilterEvent event) {
334: getExecutor().execute(event);
335: }
336:
337: @Override
338: public void onPreAdd(IoFilterChain parent, String name,
339: NextFilter nextFilter) throws Exception {
340: if (parent.contains(this )) {
341: throw new IllegalArgumentException(
342: "You can't add the same filter instance more than once. Create another instance and add it.");
343: }
344: }
345:
346: @Override
347: public final void sessionCreated(NextFilter nextFilter,
348: IoSession session) {
349: nextFilter.sessionCreated(session);
350: }
351:
352: @Override
353: public final void sessionOpened(NextFilter nextFilter,
354: IoSession session) {
355: if (eventTypes.contains(IoEventType.SESSION_OPENED)) {
356: fireEvent(new IoFilterEvent(nextFilter,
357: IoEventType.SESSION_OPENED, session, null));
358: } else {
359: nextFilter.sessionOpened(session);
360: }
361: }
362:
363: @Override
364: public final void sessionClosed(NextFilter nextFilter,
365: IoSession session) {
366: if (eventTypes.contains(IoEventType.SESSION_CLOSED)) {
367: fireEvent(new IoFilterEvent(nextFilter,
368: IoEventType.SESSION_CLOSED, session, null));
369: } else {
370: nextFilter.sessionClosed(session);
371: }
372: }
373:
374: @Override
375: public final void sessionIdle(NextFilter nextFilter,
376: IoSession session, IdleStatus status) {
377: if (eventTypes.contains(IoEventType.SESSION_IDLE)) {
378: fireEvent(new IoFilterEvent(nextFilter,
379: IoEventType.SESSION_IDLE, session, status));
380: } else {
381: nextFilter.sessionIdle(session, status);
382: }
383: }
384:
385: @Override
386: public final void exceptionCaught(NextFilter nextFilter,
387: IoSession session, Throwable cause) {
388: if (eventTypes.contains(IoEventType.EXCEPTION_CAUGHT)) {
389: fireEvent(new IoFilterEvent(nextFilter,
390: IoEventType.EXCEPTION_CAUGHT, session, cause));
391: } else {
392: nextFilter.exceptionCaught(session, cause);
393: }
394: }
395:
396: @Override
397: public final void messageReceived(NextFilter nextFilter,
398: IoSession session, Object message) {
399: if (eventTypes.contains(IoEventType.MESSAGE_RECEIVED)) {
400: fireEvent(new IoFilterEvent(nextFilter,
401: IoEventType.MESSAGE_RECEIVED, session, message));
402: } else {
403: nextFilter.messageReceived(session, message);
404: }
405: }
406:
407: @Override
408: public final void messageSent(NextFilter nextFilter,
409: IoSession session, WriteRequest writeRequest) {
410: if (eventTypes.contains(IoEventType.MESSAGE_SENT)) {
411: fireEvent(new IoFilterEvent(nextFilter,
412: IoEventType.MESSAGE_SENT, session, writeRequest));
413: } else {
414: nextFilter.messageSent(session, writeRequest);
415: }
416: }
417:
418: @Override
419: public final void filterWrite(NextFilter nextFilter,
420: IoSession session, WriteRequest writeRequest) {
421: if (eventTypes.contains(IoEventType.WRITE)) {
422: fireEvent(new IoFilterEvent(nextFilter, IoEventType.WRITE,
423: session, writeRequest));
424: } else {
425: nextFilter.filterWrite(session, writeRequest);
426: }
427: }
428:
429: @Override
430: public final void filterClose(NextFilter nextFilter,
431: IoSession session) throws Exception {
432: if (eventTypes.contains(IoEventType.CLOSE)) {
433: fireEvent(new IoFilterEvent(nextFilter, IoEventType.CLOSE,
434: session, null));
435: } else {
436: nextFilter.filterClose(session);
437: }
438: }
439: }
|