View Javadoc
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.filter.codec.demux;
21  
22  import org.apache.mina.core.buffer.IoBuffer;
23  import org.apache.mina.core.session.AttributeKey;
24  import org.apache.mina.core.session.IoSession;
25  import org.apache.mina.filter.codec.CumulativeProtocolDecoder;
26  import org.apache.mina.filter.codec.ProtocolDecoder;
27  import org.apache.mina.filter.codec.ProtocolDecoderException;
28  import org.apache.mina.filter.codec.ProtocolDecoderOutput;
29  
30  /**
31   * A composite {@link ProtocolDecoder} that demultiplexes incoming {@link IoBuffer}
32   * decoding requests into an appropriate {@link MessageDecoder}.
33   * 
34   * <h2>Internal mechanism of {@link MessageDecoder} selection</h2>
35   * <ol>
36   *   <li>
37   *     {@link DemuxingProtocolDecoder} iterates the list of candidate
38   *     {@link MessageDecoder}s and calls {@link MessageDecoder#decodable(IoSession, IoBuffer)}.
39   *     Initially, all registered {@link MessageDecoder}s are candidates.
40   *   </li>
41   *   <li>
42   *     If {@link MessageDecoderResult#NOT_OK} is returned, it is removed from the candidate
43   *     list.
44   *   </li>
45   *   <li>
46   *     If {@link MessageDecoderResult#NEED_DATA} is returned, it is retained in the candidate
47   *     list, and its {@link MessageDecoder#decodable(IoSession, IoBuffer)} will be invoked
48   *     again when more data is received.
49   *   </li>
50   *   <li>
51   *     If {@link MessageDecoderResult#OK} is returned, {@link DemuxingProtocolDecoder}
52   *     found the right {@link MessageDecoder}.
53   *   </li>
54   *   <li>
55   *     If there's no candidate left, an exception is raised.  Otherwise, 
56   *     {@link DemuxingProtocolDecoder} will keep iterating the candidate list.
57   *   </li>
58   * </ol>
59   * 
60   * Please note that any change of position and limit of the specified {@link IoBuffer}
61   * in {@link MessageDecoder#decodable(IoSession, IoBuffer)} will be reverted back to its
62   * original value.
63   * <p>
64   * Once a {@link MessageDecoder} is selected, {@link DemuxingProtocolDecoder} calls
65   * {@link MessageDecoder#decode(IoSession, IoBuffer, ProtocolDecoderOutput)} continuously
66   * reading its return value:
67   * <ul>
68   *   <li>
69   *     {@link MessageDecoderResult#NOT_OK} - protocol violation; {@link ProtocolDecoderException}
70   *     is raised automatically.
71   *   </li>
72   *   <li>
73   *     {@link MessageDecoderResult#NEED_DATA} - needs more data to read the whole message;
74   *     {@link MessageDecoder#decode(IoSession, IoBuffer, ProtocolDecoderOutput)}
75   *     will be invoked again when more data is received.
76   *   </li>
77   *   <li>
78   *     {@link MessageDecoderResult#OK} - successfully decoded a message; the candidate list will
79   *     be reset and the selection process will start over.
80   *   </li>
81   * </ul>
82   *
83   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
84   *
85   * @see MessageDecoderFactory
86   * @see MessageDecoder
87   */
88  public class DemuxingProtocolDecoder extends CumulativeProtocolDecoder {
89  
90      private static final AttributeKey STATE = new AttributeKey(DemuxingProtocolDecoder.class, "state");
91  
92      private MessageDecoderFactory[] decoderFactories = new MessageDecoderFactory[0];
93  
94      private static final Class<?>[] EMPTY_PARAMS = new Class[0];
95  
96      /**
97       * Adds a new message decoder class
98       * 
99       * @param decoderClass The decoder class
100      */
101     public void addMessageDecoder(Class<? extends MessageDecoder> decoderClass) {
102         if (decoderClass == null) {
103             throw new IllegalArgumentException("decoderClass");
104         }
105 
106         try {
107             decoderClass.getConstructor(EMPTY_PARAMS);
108         } catch (NoSuchMethodException e) {
109             throw new IllegalArgumentException("The specified class doesn't have a public default constructor.");
110         }
111 
112         boolean registered = false;
113         if (MessageDecoder.class.isAssignableFrom(decoderClass)) {
114             addMessageDecoder(new DefaultConstructorMessageDecoderFactory(decoderClass));
115             registered = true;
116         }
117 
118         if (!registered) {
119             throw new IllegalArgumentException("Unregisterable type: " + decoderClass);
120         }
121     }
122 
123     /**
124      * Adds a new message decoder instance
125      * 
126      * @param decoder The decoder instance
127      */
128     public void addMessageDecoder(MessageDecoder decoder) {
129         addMessageDecoder(new SingletonMessageDecoderFactory(decoder));
130     }
131 
132     /**
133      * Adds a new message decoder factory
134      * 
135      * @param factory The decoder factory
136      */
137     public void addMessageDecoder(MessageDecoderFactory factory) {
138         if (factory == null) {
139             throw new IllegalArgumentException("factory");
140         }
141 
142         MessageDecoderFactory[] newDecoderFactories = new MessageDecoderFactory[decoderFactories.length + 1];
143         System.arraycopy(decoderFactories, 0, newDecoderFactories, 0, decoderFactories.length);
144         newDecoderFactories[decoderFactories.length] = factory;
145         this.decoderFactories = newDecoderFactories;
146     }
147 
148     /**
149      * {@inheritDoc}
150      */
151     @Override
152     protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
153         State state = getState(session);
154 
155         if (state.currentDecoder == null) {
156             MessageDecoder[] decoders = state.decoders;
157             int undecodables = 0;
158 
159             for (int i = decoders.length - 1; i >= 0; i--) {
160                 MessageDecoder decoder = decoders[i];
161                 int limit = in.limit();
162                 int pos = in.position();
163 
164                 MessageDecoderResult result;
165 
166                 try {
167                     result = decoder.decodable(session, in);
168                 } finally {
169                     in.position(pos);
170                     in.limit(limit);
171                 }
172 
173                 if (result == MessageDecoder.OK) {
174                     state.currentDecoder = decoder;
175                     break;
176                 } else if (result == MessageDecoder.NOT_OK) {
177                     undecodables++;
178                 } else if (result != MessageDecoder.NEED_DATA) {
179                     throw new IllegalStateException("Unexpected decode result (see your decodable()): " + result);
180                 }
181             }
182 
183             if (undecodables == decoders.length) {
184                 // Throw an exception if all decoders cannot decode data.
185                 String dump = in.getHexDump();
186                 in.position(in.limit()); // Skip data
187                 ProtocolDecoderException e = new ProtocolDecoderException("No appropriate message decoder: " + dump);
188                 e.setHexdump(dump);
189                 throw e;
190             }
191 
192             if (state.currentDecoder == null) {
193                 // Decoder is not determined yet (i.e. we need more data)
194                 return false;
195             }
196         }
197 
198         try {
199             MessageDecoderResult result = state.currentDecoder.decode(session, in, out);
200             if (result == MessageDecoder.OK) {
201                 state.currentDecoder = null;
202                 return true;
203             } else if (result == MessageDecoder.NEED_DATA) {
204                 return false;
205             } else if (result == MessageDecoder.NOT_OK) {
206                 state.currentDecoder = null;
207                 throw new ProtocolDecoderException("Message decoder returned NOT_OK.");
208             } else {
209                 state.currentDecoder = null;
210                 throw new IllegalStateException("Unexpected decode result (see your decode()): " + result);
211             }
212         } catch (Exception e) {
213             state.currentDecoder = null;
214             throw e;
215         }
216     }
217 
218     /**
219      * {@inheritDoc}
220      */
221     @Override
222     public void finishDecode(IoSession session, ProtocolDecoderOutput out) throws Exception {
223         super.finishDecode(session, out);
224         State state = getState(session);
225         MessageDecoder currentDecoder = state.currentDecoder;
226         if (currentDecoder == null) {
227             return;
228         }
229 
230         currentDecoder.finishDecode(session, out);
231     }
232 
233     /**
234      * {@inheritDoc}
235      */
236     @Override
237     public void dispose(IoSession session) throws Exception {
238         super.dispose(session);
239         session.removeAttribute(STATE);
240     }
241 
242     private State getState(IoSession session) throws Exception {
243         State state = (State) session.getAttribute(STATE);
244 
245         if (state == null) {
246             state = new State();
247             State oldState = (State) session.setAttributeIfAbsent(STATE, state);
248 
249             if (oldState != null) {
250                 state = oldState;
251             }
252         }
253 
254         return state;
255     }
256 
257     private class State {
258         private final MessageDecoder[] decoders;
259 
260         private MessageDecoder currentDecoder;
261 
262         private State() throws Exception {
263             MessageDecoderFactory[] factories = DemuxingProtocolDecoder.this.decoderFactories;
264             decoders = new MessageDecoder[factories.length];
265             
266             for (int i = factories.length - 1; i >= 0; i--) {
267                 decoders[i] = factories[i].getDecoder();
268             }
269         }
270     }
271 
272     private static class SingletonMessageDecoderFactory implements MessageDecoderFactory {
273         private final MessageDecoder decoder;
274 
275         private SingletonMessageDecoderFactory(MessageDecoder decoder) {
276             if (decoder == null) {
277                 throw new IllegalArgumentException("decoder");
278             }
279             this.decoder = decoder;
280         }
281 
282         /**
283          * {@inheritDoc}
284          */
285         @Override
286         public MessageDecoder getDecoder() {
287             return decoder;
288         }
289     }
290 
291     private static class DefaultConstructorMessageDecoderFactory implements MessageDecoderFactory {
292         private final Class<?> decoderClass;
293 
294         private DefaultConstructorMessageDecoderFactory(Class<?> decoderClass) {
295             if (decoderClass == null) {
296                 throw new IllegalArgumentException("decoderClass");
297             }
298 
299             if (!MessageDecoder.class.isAssignableFrom(decoderClass)) {
300                 throw new IllegalArgumentException("decoderClass is not assignable to MessageDecoder");
301             }
302             this.decoderClass = decoderClass;
303         }
304 
305         /**
306          * {@inheritDoc}
307          */
308         @Override
309         public MessageDecoder getDecoder() throws Exception {
310             return (MessageDecoder) decoderClass.newInstance();
311         }
312     }
313 }