001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one
003: * or more contributor license agreements. See the NOTICE file
004: * distributed with this work for additional information
005: * regarding copyright ownership. The ASF licenses this file
006: * to you under the Apache License, Version 2.0 (the
007: * "License"); you may not use this file except in compliance
008: * with the License. You may obtain a copy of the License at
009: *
010: * http://www.apache.org/licenses/LICENSE-2.0
011: *
012: * Unless required by applicable law or agreed to in writing,
013: * software distributed under the License is distributed on an
014: * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015: * KIND, either express or implied. See the License for the
016: * specific language governing permissions and limitations
017: * under the License.
018: *
019: */
020: package org.apache.mina.filter.codec;
021:
022: import java.net.SocketAddress;
023: import java.util.Queue;
024:
025: import org.apache.mina.common.AttributeKey;
026: import org.apache.mina.common.DefaultWriteFuture;
027: import org.apache.mina.common.DefaultWriteRequest;
028: import org.apache.mina.common.FileRegion;
029: import org.apache.mina.common.IoBuffer;
030: import org.apache.mina.common.IoFilter;
031: import org.apache.mina.common.IoFilterAdapter;
032: import org.apache.mina.common.IoFilterChain;
033: import org.apache.mina.common.IoSession;
034: import org.apache.mina.common.NothingWrittenException;
035: import org.apache.mina.common.WriteFuture;
036: import org.apache.mina.common.WriteRequest;
037: import org.apache.mina.common.WriteRequestWrapper;
038: import org.slf4j.Logger;
039: import org.slf4j.LoggerFactory;
040:
041: /**
042: * An {@link IoFilter} which translates binary or protocol specific data into
043: * message object and vice versa using {@link ProtocolCodecFactory},
044: * {@link ProtocolEncoder}, or {@link ProtocolDecoder}.
045: *
046: * @author The Apache MINA Project (dev@mina.apache.org)
047: * @version $Rev: 627811 $, $Date: 2008-02-14 10:29:25 -0700 (Thu, 14 Feb 2008) $
048: */
049: public class ProtocolCodecFilter extends IoFilterAdapter {
050:
051: private static final Class<?>[] EMPTY_PARAMS = new Class[0];
052: private static final IoBuffer EMPTY_BUFFER = IoBuffer
053: .wrap(new byte[0]);
054:
055: private final AttributeKey ENCODER = new AttributeKey(getClass(),
056: "encoder");
057: private final AttributeKey DECODER = new AttributeKey(getClass(),
058: "decoder");
059: private final AttributeKey DECODER_OUT = new AttributeKey(
060: getClass(), "decoderOut");
061: private final ProtocolCodecFactory factory;
062:
063: private final Logger logger = LoggerFactory.getLogger(getClass());
064:
065: public ProtocolCodecFilter(ProtocolCodecFactory factory) {
066: if (factory == null) {
067: throw new NullPointerException("factory");
068: }
069: this .factory = factory;
070: }
071:
072: public ProtocolCodecFilter(final ProtocolEncoder encoder,
073: final ProtocolDecoder decoder) {
074: if (encoder == null) {
075: throw new NullPointerException("encoder");
076: }
077: if (decoder == null) {
078: throw new NullPointerException("decoder");
079: }
080:
081: this .factory = new ProtocolCodecFactory() {
082: public ProtocolEncoder getEncoder(IoSession session) {
083: return encoder;
084: }
085:
086: public ProtocolDecoder getDecoder(IoSession session) {
087: return decoder;
088: }
089: };
090: }
091:
092: public ProtocolCodecFilter(
093: final Class<? extends ProtocolEncoder> encoderClass,
094: final Class<? extends ProtocolDecoder> decoderClass) {
095: if (encoderClass == null) {
096: throw new NullPointerException("encoderClass");
097: }
098: if (decoderClass == null) {
099: throw new NullPointerException("decoderClass");
100: }
101: if (!ProtocolEncoder.class.isAssignableFrom(encoderClass)) {
102: throw new IllegalArgumentException("encoderClass: "
103: + encoderClass.getName());
104: }
105: if (!ProtocolDecoder.class.isAssignableFrom(decoderClass)) {
106: throw new IllegalArgumentException("decoderClass: "
107: + decoderClass.getName());
108: }
109: try {
110: encoderClass.getConstructor(EMPTY_PARAMS);
111: } catch (NoSuchMethodException e) {
112: throw new IllegalArgumentException(
113: "encoderClass doesn't have a public default constructor.");
114: }
115: try {
116: decoderClass.getConstructor(EMPTY_PARAMS);
117: } catch (NoSuchMethodException e) {
118: throw new IllegalArgumentException(
119: "decoderClass doesn't have a public default constructor.");
120: }
121:
122: this .factory = new ProtocolCodecFactory() {
123: public ProtocolEncoder getEncoder(IoSession session)
124: throws Exception {
125: return encoderClass.newInstance();
126: }
127:
128: public ProtocolDecoder getDecoder(IoSession session)
129: throws Exception {
130: return decoderClass.newInstance();
131: }
132: };
133: }
134:
135: public ProtocolEncoder getEncoder(IoSession session) {
136: return (ProtocolEncoder) session.getAttribute(ENCODER);
137: }
138:
139: public ProtocolDecoder getDecoder(IoSession session) {
140: return (ProtocolDecoder) session.getAttribute(DECODER);
141: }
142:
143: @Override
144: public void onPreAdd(IoFilterChain parent, String name,
145: NextFilter nextFilter) throws Exception {
146: if (parent.contains(this )) {
147: throw new IllegalArgumentException(
148: "You can't add the same filter instance more than once. Create another instance and add it.");
149: }
150: }
151:
152: @Override
153: public void onPostRemove(IoFilterChain parent, String name,
154: NextFilter nextFilter) throws Exception {
155: disposeEncoder(parent.getSession());
156: disposeDecoder(parent.getSession());
157: disposeDecoderOut(parent.getSession());
158: }
159:
160: @Override
161: public void messageReceived(NextFilter nextFilter,
162: IoSession session, Object message) throws Exception {
163: if (!(message instanceof IoBuffer)) {
164: nextFilter.messageReceived(session, message);
165: return;
166: }
167:
168: IoBuffer in = (IoBuffer) message;
169: ProtocolDecoder decoder = getDecoder0(session);
170: ProtocolDecoderOutput decoderOut = getDecoderOut(session,
171: nextFilter);
172:
173: while (in.hasRemaining()) {
174: int oldPos = in.position();
175: try {
176: synchronized (decoderOut) {
177: decoder.decode(session, in, decoderOut);
178: }
179: // Finish decoding if no exception was thrown.
180: decoderOut.flush();
181: break;
182: } catch (Throwable t) {
183: ProtocolDecoderException pde;
184: if (t instanceof ProtocolDecoderException) {
185: pde = (ProtocolDecoderException) t;
186: } else {
187: pde = new ProtocolDecoderException(t);
188: }
189:
190: if (pde.getHexdump() == null) {
191: int curPos = in.position();
192: in.position(oldPos);
193: pde.setHexdump(in.getHexDump());
194: in.position(curPos);
195: }
196:
197: // Fire the exceptionCaught event.
198: decoderOut.flush();
199: nextFilter.exceptionCaught(session, pde);
200:
201: // Retry only if the type of the caught exception is
202: // recoverable and the buffer position has changed.
203: // We check buffer position additionally to prevent an
204: // infinite loop.
205: if (!(t instanceof RecoverableProtocolDecoderException)
206: || in.position() == oldPos) {
207: break;
208: }
209: }
210: }
211: }
212:
213: @Override
214: public void messageSent(NextFilter nextFilter, IoSession session,
215: WriteRequest writeRequest) throws Exception {
216: if (writeRequest instanceof EncodedWriteRequest) {
217: return;
218: }
219:
220: if (!(writeRequest instanceof MessageWriteRequest)) {
221: nextFilter.messageSent(session, writeRequest);
222: return;
223: }
224:
225: MessageWriteRequest wrappedRequest = (MessageWriteRequest) writeRequest;
226: nextFilter.messageSent(session, wrappedRequest
227: .getParentRequest());
228: }
229:
230: @Override
231: public void filterWrite(NextFilter nextFilter, IoSession session,
232: WriteRequest writeRequest) throws Exception {
233: Object message = writeRequest.getMessage();
234: if (message instanceof IoBuffer
235: || message instanceof FileRegion) {
236: nextFilter.filterWrite(session, writeRequest);
237: return;
238: }
239:
240: ProtocolEncoder encoder = getEncoder0(session);
241: ProtocolEncoderOutputImpl encoderOut = getEncoderOut(session,
242: nextFilter, writeRequest);
243:
244: try {
245: encoder.encode(session, message, encoderOut);
246: encoderOut.flushWithoutFuture();
247: nextFilter.filterWrite(session, new MessageWriteRequest(
248: writeRequest));
249: } catch (Throwable t) {
250: ProtocolEncoderException pee;
251: if (t instanceof ProtocolEncoderException) {
252: pee = (ProtocolEncoderException) t;
253: } else {
254: pee = new ProtocolEncoderException(t);
255: }
256: throw pee;
257: }
258: }
259:
260: @Override
261: public void sessionClosed(NextFilter nextFilter, IoSession session)
262: throws Exception {
263: // Call finishDecode() first when a connection is closed.
264: ProtocolDecoder decoder = getDecoder0(session);
265: ProtocolDecoderOutput decoderOut = getDecoderOut(session,
266: nextFilter);
267: try {
268: decoder.finishDecode(session, decoderOut);
269: } catch (Throwable t) {
270: ProtocolDecoderException pde;
271: if (t instanceof ProtocolDecoderException) {
272: pde = (ProtocolDecoderException) t;
273: } else {
274: pde = new ProtocolDecoderException(t);
275: }
276: throw pde;
277: } finally {
278: // Dispose all.
279: disposeEncoder(session);
280: disposeDecoder(session);
281: disposeDecoderOut(session);
282: decoderOut.flush();
283: }
284:
285: nextFilter.sessionClosed(session);
286: }
287:
288: private ProtocolEncoder getEncoder0(IoSession session)
289: throws Exception {
290: ProtocolEncoder encoder = (ProtocolEncoder) session
291: .getAttribute(ENCODER);
292: if (encoder == null) {
293: encoder = factory.getEncoder(session);
294: session.setAttribute(ENCODER, encoder);
295: }
296: return encoder;
297: }
298:
299: private ProtocolEncoderOutputImpl getEncoderOut(IoSession session,
300: NextFilter nextFilter, WriteRequest writeRequest) {
301: return new ProtocolEncoderOutputImpl(session, nextFilter,
302: writeRequest);
303: }
304:
305: private ProtocolDecoder getDecoder0(IoSession session)
306: throws Exception {
307: ProtocolDecoder decoder = (ProtocolDecoder) session
308: .getAttribute(DECODER);
309: if (decoder == null) {
310: decoder = factory.getDecoder(session);
311: session.setAttribute(DECODER, decoder);
312: }
313: return decoder;
314: }
315:
316: private ProtocolDecoderOutput getDecoderOut(IoSession session,
317: NextFilter nextFilter) {
318: ProtocolDecoderOutput out = (ProtocolDecoderOutput) session
319: .getAttribute(DECODER_OUT);
320: if (out == null) {
321: out = new ProtocolDecoderOutputImpl(session, nextFilter);
322: session.setAttribute(DECODER_OUT, out);
323: }
324: return out;
325: }
326:
327: private void disposeEncoder(IoSession session) {
328: ProtocolEncoder encoder = (ProtocolEncoder) session
329: .removeAttribute(ENCODER);
330: if (encoder == null) {
331: return;
332: }
333:
334: try {
335: encoder.dispose(session);
336: } catch (Throwable t) {
337: logger.warn("Failed to dispose: "
338: + encoder.getClass().getName() + " (" + encoder
339: + ')');
340: }
341: }
342:
343: private void disposeDecoder(IoSession session) {
344: ProtocolDecoder decoder = (ProtocolDecoder) session
345: .removeAttribute(DECODER);
346: if (decoder == null) {
347: return;
348: }
349:
350: try {
351: decoder.dispose(session);
352: } catch (Throwable t) {
353: logger.warn("Falied to dispose: "
354: + decoder.getClass().getName() + " (" + decoder
355: + ')');
356: }
357: }
358:
359: private void disposeDecoderOut(IoSession session) {
360: session.removeAttribute(DECODER_OUT);
361: }
362:
363: private static class EncodedWriteRequest extends
364: DefaultWriteRequest {
365: private EncodedWriteRequest(Object encodedMessage,
366: WriteFuture future, SocketAddress destination) {
367: super (encodedMessage, future, destination);
368: }
369: }
370:
371: private static class MessageWriteRequest extends
372: WriteRequestWrapper {
373: private MessageWriteRequest(WriteRequest writeRequest) {
374: super (writeRequest);
375: }
376:
377: @Override
378: public Object getMessage() {
379: return EMPTY_BUFFER;
380: }
381: }
382:
383: private static class ProtocolDecoderOutputImpl extends
384: AbstractProtocolDecoderOutput {
385: private final IoSession session;
386: private final NextFilter nextFilter;
387:
388: public ProtocolDecoderOutputImpl(IoSession session,
389: NextFilter nextFilter) {
390: this .session = session;
391: this .nextFilter = nextFilter;
392: }
393:
394: public void flush() {
395: Queue<Object> messageQueue = getMessageQueue();
396: while (!messageQueue.isEmpty()) {
397: nextFilter
398: .messageReceived(session, messageQueue.poll());
399: }
400: }
401: }
402:
403: private static class ProtocolEncoderOutputImpl extends
404: AbstractProtocolEncoderOutput {
405: private final IoSession session;
406:
407: private final NextFilter nextFilter;
408:
409: private final WriteRequest writeRequest;
410:
411: public ProtocolEncoderOutputImpl(IoSession session,
412: NextFilter nextFilter, WriteRequest writeRequest) {
413: this .session = session;
414: this .nextFilter = nextFilter;
415: this .writeRequest = writeRequest;
416: }
417:
418: public WriteFuture flush() {
419: Queue<Object> bufferQueue = getMessageQueue();
420: WriteFuture future = null;
421: for (;;) {
422: Object encodedMessage = bufferQueue.poll();
423: if (encodedMessage == null) {
424: break;
425: }
426:
427: // Flush only when the buffer has remaining.
428: if (!(encodedMessage instanceof IoBuffer)
429: || ((IoBuffer) encodedMessage).hasRemaining()) {
430: future = new DefaultWriteFuture(session);
431: nextFilter.filterWrite(session,
432: new EncodedWriteRequest(encodedMessage,
433: future, writeRequest
434: .getDestination()));
435: }
436: }
437:
438: if (future == null) {
439: future = DefaultWriteFuture.newNotWrittenFuture(
440: session, new NothingWrittenException(
441: writeRequest));
442: }
443:
444: return future;
445: }
446:
447: public void flushWithoutFuture() {
448: Queue<Object> bufferQueue = getMessageQueue();
449: for (;;) {
450: Object encodedMessage = bufferQueue.poll();
451: if (encodedMessage == null) {
452: break;
453: }
454:
455: // Flush only when the buffer has remaining.
456: if (!(encodedMessage instanceof IoBuffer)
457: || ((IoBuffer) encodedMessage).hasRemaining()) {
458: nextFilter
459: .filterWrite(session,
460: new EncodedWriteRequest(
461: encodedMessage, null,
462: writeRequest
463: .getDestination()));
464: }
465: }
466: }
467: }
468: }
|