001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one
003: * or more contributor license agreements. See the NOTICE file
004: * distributed with this work for additional information
005: * regarding copyright ownership. The ASF licenses this file
006: * to you under the Apache License, Version 2.0 (the
007: * "License"); you may not use this file except in compliance
008: * with the License. You may obtain a copy of the License at
009: *
010: * http://www.apache.org/licenses/LICENSE-2.0
011: *
012: * Unless required by applicable law or agreed to in writing,
013: * software distributed under the License is distributed on an
014: * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015: * KIND, either express or implied. See the License for the
016: * specific language governing permissions and limitations
017: * under the License.
018: *
019: */
020: package org.apache.mina.filter.codec.demux;
021:
022: import org.apache.mina.common.AttributeKey;
023: import org.apache.mina.common.IoBuffer;
024: import org.apache.mina.common.IoSession;
025: import org.apache.mina.filter.codec.CumulativeProtocolDecoder;
026: import org.apache.mina.filter.codec.ProtocolDecoder;
027: import org.apache.mina.filter.codec.ProtocolDecoderException;
028: import org.apache.mina.filter.codec.ProtocolDecoderOutput;
029:
030: /**
031: * A composite {@link ProtocolDecoder} that demultiplexes incoming {@link IoBuffer}
032: * decoding requests into an appropriate {@link MessageDecoder}.
033: *
034: * <h2>Internal mechanism of {@link MessageDecoder} selection</h2>
035: * <p>
036: * <ol>
037: * <li>{@link DemuxingProtocolDecoder} iterates the list of candidate
038: * {@link MessageDecoder}s and calls {@link MessageDecoder#decodable(IoSession, IoBuffer)}.
039: * Initially, all registered {@link MessageDecoder}s are candidates.</li>
040: * <li>If {@link MessageDecoderResult#NOT_OK} is returned, it is removed from the candidate
041: * list.</li>
042: * <li>If {@link MessageDecoderResult#NEED_DATA} is returned, it is retained in the candidate
043: * list, and its {@link MessageDecoder#decodable(IoSession, IoBuffer)} will be invoked
044: * again when more data is received.</li>
045: * <li>If {@link MessageDecoderResult#OK} is returned, {@link DemuxingProtocolDecoder}
046: * found the right {@link MessageDecoder}.</li>
047: * <li>If there's no candidate left, an exception is raised. Otherwise,
048: * {@link DemuxingProtocolDecoder} will keep iterating the candidate list.
049: * </ol>
050: *
051: * Please note that any change of position and limit of the specified {@link IoBuffer}
052: * in {@link MessageDecoder#decodable(IoSession, IoBuffer)} will be reverted back to its
053: * original value.
054: * <p>
055: * Once a {@link MessageDecoder} is selected, {@link DemuxingProtocolDecoder} calls
056: * {@link MessageDecoder#decode(IoSession, IoBuffer, ProtocolDecoderOutput)} continuously
057: * reading its return value:
058: * <ul>
059: * <li>{@link MessageDecoderResult#NOT_OK} - protocol violation; {@link ProtocolDecoderException}
060: * is raised automatically.</li>
061: * <li>{@link MessageDecoderResult#NEED_DATA} - needs more data to read the whole message;
062: * {@link MessageDecoder#decode(IoSession, IoBuffer, ProtocolDecoderOutput)}
063: * will be invoked again when more data is received.</li>
064: * <li>{@link MessageDecoderResult#OK} - successfully decoded a message; the candidate list will
065: * be reset and the selection process will start over.</li>
066: * </ul>
067: *
068: * @author The Apache MINA Project (dev@mina.apache.org)
069: * @version $Rev: 612026 $, $Date: 2008-01-14 23:16:14 -0700 (Mon, 14 Jan 2008) $
070: *
071: * @see MessageDecoderFactory
072: * @see MessageDecoder
073: */
074: public class DemuxingProtocolDecoder extends CumulativeProtocolDecoder {
075:
076: private final AttributeKey STATE = new AttributeKey(getClass(),
077: "state");
078:
079: private MessageDecoderFactory[] decoderFactories = new MessageDecoderFactory[0];
080: private static final Class<?>[] EMPTY_PARAMS = new Class[0];
081:
082: public DemuxingProtocolDecoder() {
083: }
084:
085: public void addMessageDecoder(
086: Class<? extends MessageDecoder> decoderClass) {
087: if (decoderClass == null) {
088: throw new NullPointerException("decoderClass");
089: }
090:
091: try {
092: decoderClass.getConstructor(EMPTY_PARAMS);
093: } catch (NoSuchMethodException e) {
094: throw new IllegalArgumentException(
095: "The specified class doesn't have a public default constructor.");
096: }
097:
098: boolean registered = false;
099: if (MessageDecoder.class.isAssignableFrom(decoderClass)) {
100: addMessageDecoder(new DefaultConstructorMessageDecoderFactory(
101: decoderClass));
102: registered = true;
103: }
104:
105: if (!registered) {
106: throw new IllegalArgumentException("Unregisterable type: "
107: + decoderClass);
108: }
109: }
110:
111: public void addMessageDecoder(MessageDecoder decoder) {
112: addMessageDecoder(new SingletonMessageDecoderFactory(decoder));
113: }
114:
115: public void addMessageDecoder(MessageDecoderFactory factory) {
116: if (factory == null) {
117: throw new NullPointerException("factory");
118: }
119: MessageDecoderFactory[] decoderFactories = this .decoderFactories;
120: MessageDecoderFactory[] newDecoderFactories = new MessageDecoderFactory[decoderFactories.length + 1];
121: System.arraycopy(decoderFactories, 0, newDecoderFactories, 0,
122: decoderFactories.length);
123: newDecoderFactories[decoderFactories.length] = factory;
124: this .decoderFactories = newDecoderFactories;
125: }
126:
127: @Override
128: protected boolean doDecode(IoSession session, IoBuffer in,
129: ProtocolDecoderOutput out) throws Exception {
130: State state = getState(session);
131: if (state.currentDecoder == null) {
132: MessageDecoder[] decoders = state.decoders;
133: int undecodables = 0;
134: for (int i = decoders.length - 1; i >= 0; i--) {
135: MessageDecoder decoder = decoders[i];
136: int limit = in.limit();
137: int pos = in.position();
138:
139: MessageDecoderResult result;
140: try {
141: result = decoder.decodable(session, in);
142: } finally {
143: in.position(pos);
144: in.limit(limit);
145: }
146:
147: if (result == MessageDecoder.OK) {
148: state.currentDecoder = decoder;
149: break;
150: } else if (result == MessageDecoder.NOT_OK) {
151: undecodables++;
152: } else if (result != MessageDecoder.NEED_DATA) {
153: throw new IllegalStateException(
154: "Unexpected decode result (see your decodable()): "
155: + result);
156: }
157: }
158:
159: if (undecodables == decoders.length) {
160: // Throw an exception if all decoders cannot decode data.
161: String dump = in.getHexDump();
162: in.position(in.limit()); // Skip data
163: ProtocolDecoderException e = new ProtocolDecoderException(
164: "No appropriate message decoder: " + dump);
165: e.setHexdump(dump);
166: throw e;
167: }
168:
169: if (state.currentDecoder == null) {
170: // Decoder is not determined yet (i.e. we need more data)
171: return false;
172: }
173: }
174:
175: MessageDecoderResult result = state.currentDecoder.decode(
176: session, in, out);
177: if (result == MessageDecoder.OK) {
178: state.currentDecoder = null;
179: return true;
180: } else if (result == MessageDecoder.NEED_DATA) {
181: return false;
182: } else if (result == MessageDecoder.NOT_OK) {
183: state.currentDecoder = null;
184: throw new ProtocolDecoderException(
185: "Message decoder returned NOT_OK.");
186: } else {
187: state.currentDecoder = null;
188: throw new IllegalStateException(
189: "Unexpected decode result (see your decode()): "
190: + result);
191: }
192: }
193:
194: @Override
195: public void finishDecode(IoSession session,
196: ProtocolDecoderOutput out) throws Exception {
197: super .finishDecode(session, out);
198: State state = getState(session);
199: MessageDecoder currentDecoder = state.currentDecoder;
200: if (currentDecoder == null) {
201: return;
202: }
203:
204: currentDecoder.finishDecode(session, out);
205: }
206:
207: @Override
208: public void dispose(IoSession session) throws Exception {
209: super .dispose(session);
210: session.removeAttribute(STATE);
211: }
212:
213: private State getState(IoSession session) throws Exception {
214: State state = (State) session.getAttribute(STATE);
215: if (state == null) {
216: state = new State();
217: State oldState = (State) session.setAttributeIfAbsent(
218: STATE, state);
219: if (oldState != null) {
220: state = oldState;
221: }
222: }
223: return state;
224: }
225:
226: private class State {
227: private final MessageDecoder[] decoders;
228: private MessageDecoder currentDecoder;
229:
230: private State() throws Exception {
231: MessageDecoderFactory[] decoderFactories = DemuxingProtocolDecoder.this .decoderFactories;
232: decoders = new MessageDecoder[decoderFactories.length];
233: for (int i = decoderFactories.length - 1; i >= 0; i--) {
234: decoders[i] = decoderFactories[i].getDecoder();
235: }
236: }
237: }
238:
239: private static class SingletonMessageDecoderFactory implements
240: MessageDecoderFactory {
241: private final MessageDecoder decoder;
242:
243: private SingletonMessageDecoderFactory(MessageDecoder decoder) {
244: if (decoder == null) {
245: throw new NullPointerException("decoder");
246: }
247: this .decoder = decoder;
248: }
249:
250: public MessageDecoder getDecoder() {
251: return decoder;
252: }
253: }
254:
255: private static class DefaultConstructorMessageDecoderFactory
256: implements MessageDecoderFactory {
257: private final Class<?> decoderClass;
258:
259: private DefaultConstructorMessageDecoderFactory(
260: Class<?> decoderClass) {
261: if (decoderClass == null) {
262: throw new NullPointerException("decoderClass");
263: }
264:
265: if (!MessageDecoder.class.isAssignableFrom(decoderClass)) {
266: throw new IllegalArgumentException(
267: "decoderClass is not assignable to MessageDecoder");
268: }
269: this .decoderClass = decoderClass;
270: }
271:
272: public MessageDecoder getDecoder() throws Exception {
273: return (MessageDecoder) decoderClass.newInstance();
274: }
275: }
276: }
|