View Javadoc
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.filter.codec.demux;
21  
22  import org.apache.mina.core.buffer.IoBuffer;
23  import org.apache.mina.core.session.AttributeKey;
24  import org.apache.mina.core.session.IoSession;
25  import org.apache.mina.filter.codec.CumulativeProtocolDecoder;
26  import org.apache.mina.filter.codec.ProtocolDecoder;
27  import org.apache.mina.filter.codec.ProtocolDecoderException;
28  import org.apache.mina.filter.codec.ProtocolDecoderOutput;
29  
30  /**
31   * A composite {@link ProtocolDecoder} that demultiplexes incoming {@link IoBuffer}
32   * decoding requests into an appropriate {@link MessageDecoder}.
33   * 
34   * <h2>Internal mechanism of {@link MessageDecoder} selection</h2>
35   * <ol>
36   *   <li>
37   *     {@link DemuxingProtocolDecoder} iterates the list of candidate
38   *     {@link MessageDecoder}s and calls {@link MessageDecoder#decodable(IoSession, IoBuffer)}.
39   *     Initially, all registered {@link MessageDecoder}s are candidates.
40   *   </li>
41   *   <li>
42   *     If {@link MessageDecoderResult#NOT_OK} is returned, it is removed from the candidate
43   *     list.
44   *   </li>
45   *   <li>
46   *     If {@link MessageDecoderResult#NEED_DATA} is returned, it is retained in the candidate
47   *     list, and its {@link MessageDecoder#decodable(IoSession, IoBuffer)} will be invoked
48   *     again when more data is received.
49   *   </li>
50   *   <li>
51   *     If {@link MessageDecoderResult#OK} is returned, {@link DemuxingProtocolDecoder}
52   *     found the right {@link MessageDecoder}.
53   *   </li>
54   *   <li>
55   *     If there's no candidate left, an exception is raised.  Otherwise, 
56   *     {@link DemuxingProtocolDecoder} will keep iterating the candidate list.
57   *   </li>
58   * </ol>
59   * 
60   * Please note that any change of position and limit of the specified {@link IoBuffer}
61   * in {@link MessageDecoder#decodable(IoSession, IoBuffer)} will be reverted back to its
62   * original value.
63   * <p>
64   * Once a {@link MessageDecoder} is selected, {@link DemuxingProtocolDecoder} calls
65   * {@link MessageDecoder#decode(IoSession, IoBuffer, ProtocolDecoderOutput)} continuously
66   * reading its return value:
67   * <ul>
68   *   <li>
69   *     {@link MessageDecoderResult#NOT_OK} - protocol violation; {@link ProtocolDecoderException}
70   *     is raised automatically.
71   *   </li>
72   *   <li>
73   *     {@link MessageDecoderResult#NEED_DATA} - needs more data to read the whole message;
74   *     {@link MessageDecoder#decode(IoSession, IoBuffer, ProtocolDecoderOutput)}
75   *     will be invoked again when more data is received.
76   *   </li>
77   *   <li>
78   *     {@link MessageDecoderResult#OK} - successfully decoded a message; the candidate list will
79   *     be reset and the selection process will start over.
80   *   </li>
81   * </ul>
82   *
83   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
84   *
85   * @see MessageDecoderFactory
86   * @see MessageDecoder
87   */
88  public class DemuxingProtocolDecoder extends CumulativeProtocolDecoder {
89  
90      private final AttributeKey STATE = new AttributeKey(getClass(), "state");
91  
92      private MessageDecoderFactory[] decoderFactories = new MessageDecoderFactory[0];
93  
94      private static final Class<?>[] EMPTY_PARAMS = new Class[0];
95  
96      public DemuxingProtocolDecoder() {
97          // Do nothing
98      }
99  
100     public void addMessageDecoder(Class<? extends MessageDecoder> decoderClass) {
101         if (decoderClass == null) {
102             throw new IllegalArgumentException("decoderClass");
103         }
104 
105         try {
106             decoderClass.getConstructor(EMPTY_PARAMS);
107         } catch (NoSuchMethodException e) {
108             throw new IllegalArgumentException("The specified class doesn't have a public default constructor.");
109         }
110 
111         boolean registered = false;
112         if (MessageDecoder.class.isAssignableFrom(decoderClass)) {
113             addMessageDecoder(new DefaultConstructorMessageDecoderFactory(decoderClass));
114             registered = true;
115         }
116 
117         if (!registered) {
118             throw new IllegalArgumentException("Unregisterable type: " + decoderClass);
119         }
120     }
121 
122     public void addMessageDecoder(MessageDecoder decoder) {
123         addMessageDecoder(new SingletonMessageDecoderFactory(decoder));
124     }
125 
126     public void addMessageDecoder(MessageDecoderFactory factory) {
127         if (factory == null) {
128             throw new IllegalArgumentException("factory");
129         }
130         MessageDecoderFactory[] decoderFactories = this.decoderFactories;
131         MessageDecoderFactory[] newDecoderFactories = new MessageDecoderFactory[decoderFactories.length + 1];
132         System.arraycopy(decoderFactories, 0, newDecoderFactories, 0, decoderFactories.length);
133         newDecoderFactories[decoderFactories.length] = factory;
134         this.decoderFactories = newDecoderFactories;
135     }
136 
137     /**
138      * {@inheritDoc}
139      */
140     @Override
141     protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
142         State state = getState(session);
143 
144         if (state.currentDecoder == null) {
145             MessageDecoder[] decoders = state.decoders;
146             int undecodables = 0;
147 
148             for (int i = decoders.length - 1; i >= 0; i--) {
149                 MessageDecoder decoder = decoders[i];
150                 int limit = in.limit();
151                 int pos = in.position();
152 
153                 MessageDecoderResult result;
154 
155                 try {
156                     result = decoder.decodable(session, in);
157                 } finally {
158                     in.position(pos);
159                     in.limit(limit);
160                 }
161 
162                 if (result == MessageDecoder.OK) {
163                     state.currentDecoder = decoder;
164                     break;
165                 } else if (result == MessageDecoder.NOT_OK) {
166                     undecodables++;
167                 } else if (result != MessageDecoder.NEED_DATA) {
168                     throw new IllegalStateException("Unexpected decode result (see your decodable()): " + result);
169                 }
170             }
171 
172             if (undecodables == decoders.length) {
173                 // Throw an exception if all decoders cannot decode data.
174                 String dump = in.getHexDump();
175                 in.position(in.limit()); // Skip data
176                 ProtocolDecoderException e = new ProtocolDecoderException("No appropriate message decoder: " + dump);
177                 e.setHexdump(dump);
178                 throw e;
179             }
180 
181             if (state.currentDecoder == null) {
182                 // Decoder is not determined yet (i.e. we need more data)
183                 return false;
184             }
185         }
186 
187         try {
188             MessageDecoderResult result = state.currentDecoder.decode(session, in, out);
189             if (result == MessageDecoder.OK) {
190                 state.currentDecoder = null;
191                 return true;
192             } else if (result == MessageDecoder.NEED_DATA) {
193                 return false;
194             } else if (result == MessageDecoder.NOT_OK) {
195                 state.currentDecoder = null;
196                 throw new ProtocolDecoderException("Message decoder returned NOT_OK.");
197             } else {
198                 state.currentDecoder = null;
199                 throw new IllegalStateException("Unexpected decode result (see your decode()): " + result);
200             }
201         } catch (Exception e) {
202             state.currentDecoder = null;
203             throw e;
204         }
205     }
206 
207     /**
208      * {@inheritDoc}
209      */
210     @Override
211     public void finishDecode(IoSession session, ProtocolDecoderOutput out) throws Exception {
212         super.finishDecode(session, out);
213         State state = getState(session);
214         MessageDecoder currentDecoder = state.currentDecoder;
215         if (currentDecoder == null) {
216             return;
217         }
218 
219         currentDecoder.finishDecode(session, out);
220     }
221 
222     /**
223      * {@inheritDoc}
224      */
225     @Override
226     public void dispose(IoSession session) throws Exception {
227         super.dispose(session);
228         session.removeAttribute(STATE);
229     }
230 
231     private State getState(IoSession session) throws Exception {
232         State state = (State) session.getAttribute(STATE);
233 
234         if (state == null) {
235             state = new State();
236             State oldState = (State) session.setAttributeIfAbsent(STATE, state);
237 
238             if (oldState != null) {
239                 state = oldState;
240             }
241         }
242 
243         return state;
244     }
245 
246     private class State {
247         private final MessageDecoder[] decoders;
248 
249         private MessageDecoder currentDecoder;
250 
251         private State() throws Exception {
252             MessageDecoderFactory[] decoderFactories = DemuxingProtocolDecoder.this.decoderFactories;
253             decoders = new MessageDecoder[decoderFactories.length];
254             for (int i = decoderFactories.length - 1; i >= 0; i--) {
255                 decoders[i] = decoderFactories[i].getDecoder();
256             }
257         }
258     }
259 
260     private static class SingletonMessageDecoderFactory implements MessageDecoderFactory {
261         private final MessageDecoder decoder;
262 
263         private SingletonMessageDecoderFactory(MessageDecoder decoder) {
264             if (decoder == null) {
265                 throw new IllegalArgumentException("decoder");
266             }
267             this.decoder = decoder;
268         }
269 
270         public MessageDecoder getDecoder() {
271             return decoder;
272         }
273     }
274 
275     private static class DefaultConstructorMessageDecoderFactory implements MessageDecoderFactory {
276         private final Class<?> decoderClass;
277 
278         private DefaultConstructorMessageDecoderFactory(Class<?> decoderClass) {
279             if (decoderClass == null) {
280                 throw new IllegalArgumentException("decoderClass");
281             }
282 
283             if (!MessageDecoder.class.isAssignableFrom(decoderClass)) {
284                 throw new IllegalArgumentException("decoderClass is not assignable to MessageDecoder");
285             }
286             this.decoderClass = decoderClass;
287         }
288 
289         public MessageDecoder getDecoder() throws Exception {
290             return (MessageDecoder) decoderClass.newInstance();
291         }
292     }
293 }