View Javadoc
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.filter.codec;
21  
22  import org.apache.mina.core.buffer.IoBuffer;
23  import org.apache.mina.core.service.TransportMetadata;
24  import org.apache.mina.core.session.AttributeKey;
25  import org.apache.mina.core.session.IoSession;
26  
27  /**
28   * A {@link ProtocolDecoder} that cumulates the content of received buffers to a
29   * <em>cumulative buffer</em> to help users implement decoders.
30   * <p>
31   * If the received {@link IoBuffer} is only a part of a message. decoders should
32   * cumulate received buffers to make a message complete or to postpone decoding
33   * until more buffers arrive.
34   * <p>
35   * Here is an example decoder that decodes CRLF terminated lines into
36   * <code>Command</code> objects:
37   * 
38   * <pre>
39   * public class CrLfTerminatedCommandLineDecoder extends CumulativeProtocolDecoder {
40   * 
41   *     private Command parseCommand(IoBuffer in) {
42   *         // Convert the bytes in the specified buffer to a
43   *         // Command object.
44   *         ...
45   *     }
46   * 
47   *     protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
48   * 
49   *         // Remember the initial position.
50   *         int start = in.position();
51   * 
52   *         // Now find the first CRLF in the buffer.
53   *         byte previous = 0;
54   *         while (in.hasRemaining()) {
55   *             byte current = in.get();
56   * 
57   *             if (previous == '\r' &amp;&amp; current == '\n') {
58   *                 // Remember the current position and limit.
59   *                 int position = in.position();
60   *                 int limit = in.limit();
61   *                 try {
62   *                     in.position(start);
63   *                     in.limit(position);
64   *                     // The bytes between in.position() and in.limit()
65   *                     // now contain a full CRLF terminated line.
66   *                     out.write(parseCommand(in.slice()));
67   *                 } finally {
68   *                     // Set the position to point right after the
69   *                     // detected line and set the limit to the old
70   *                     // one.
71   *                     in.position(position);
72   *                     in.limit(limit);
73   *                 }
74   *                 // Decoded one line; CumulativeProtocolDecoder will
75   *                 // call me again until I return false. So just
76   *                 // return true until there are no more lines in the
77   *                 // buffer.
78   *                 return true;
79   *             }
80   * 
81   *             previous = current;
82   *         }
83   * 
84   *         // Could not find CRLF in the buffer. Reset the initial
85   *         // position to the one we recorded above.
86   *         in.position(start);
87   * 
88   *         return false;
89   *     }
90   * }
91   * </pre>
92   * <p>
93   * Please note that this decoder simply forward the call to
94   * {@link #doDecode(IoSession, IoBuffer, ProtocolDecoderOutput)} if the
95   * underlying transport doesn't have a packet fragmentation. Whether the
96   * transport has fragmentation or not is determined by querying
97   * {@link TransportMetadata}.
98   *
99   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
100  */
101 public abstract class CumulativeProtocolDecoder extends ProtocolDecoderAdapter {
102     /** The buffer used to store the data in the session */
103     private final AttributeKey BUFFER = new AttributeKey(getClass(), "buffer");
104     
105     /** A flag set to true if we handle fragmentation accordingly to the TransportMetadata setting. 
106      * It can be set to false if needed (UDP with fragments, for instance). the default value is 'true'
107      */
108     private boolean transportMetadataFragmentation = true;
109 
110     /**
111      * Creates a new instance.
112      */
113     protected CumulativeProtocolDecoder() {
114         // Do nothing
115     }
116 
117     /**
118      * Cumulates content of <tt>in</tt> into internal buffer and forwards
119      * decoding request to
120      * {@link #doDecode(IoSession, IoBuffer, ProtocolDecoderOutput)}.
121      * <tt>doDecode()</tt> is invoked repeatedly until it returns <tt>false</tt>
122      * and the cumulative buffer is compacted after decoding ends.
123      *
124      * @throws IllegalStateException
125      *             if your <tt>doDecode()</tt> returned <tt>true</tt> not
126      *             consuming the cumulative buffer.
127      */
128     public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
129         if (transportMetadataFragmentation && !session.getTransportMetadata().hasFragmentation()) {
130             while (in.hasRemaining()) {
131                 if (!doDecode(session, in, out)) {
132                     break;
133                 }
134             }
135 
136             return;
137         }
138 
139         boolean usingSessionBuffer = true;
140         IoBuffer buf = (IoBuffer) session.getAttribute(BUFFER);
141         // If we have a session buffer, append data to that; otherwise
142         // use the buffer read from the network directly.
143         if (buf != null) {
144             boolean appended = false;
145             // Make sure that the buffer is auto-expanded.
146             if (buf.isAutoExpand()) {
147                 try {
148                     buf.put(in);
149                     appended = true;
150                 } catch (IllegalStateException e) {
151                     // A user called derivation method (e.g. slice()),
152                     // which disables auto-expansion of the parent buffer.
153                 } catch (IndexOutOfBoundsException e) {
154                     // A user disabled auto-expansion.
155                 }
156             }
157 
158             if (appended) {
159                 buf.flip();
160             } else {
161                 // Reallocate the buffer if append operation failed due to
162                 // derivation or disabled auto-expansion.
163                 buf.flip();
164                 IoBuffer newBuf = IoBuffer.allocate(buf.remaining() + in.remaining()).setAutoExpand(true);
165                 newBuf.order(buf.order());
166                 newBuf.put(buf);
167                 newBuf.put(in);
168                 newBuf.flip();
169                 buf = newBuf;
170 
171                 // Update the session attribute.
172                 session.setAttribute(BUFFER, buf);
173             }
174         } else {
175             buf = in;
176             usingSessionBuffer = false;
177         }
178 
179         for (;;) {
180             int oldPos = buf.position();
181             boolean decoded = doDecode(session, buf, out);
182             if (decoded) {
183                 if (buf.position() == oldPos) {
184                     throw new IllegalStateException("doDecode() can't return true when buffer is not consumed.");
185                 }
186 
187                 if (!buf.hasRemaining()) {
188                     break;
189                 }
190             } else {
191                 break;
192             }
193         }
194 
195         // if there is any data left that cannot be decoded, we store
196         // it in a buffer in the session and next time this decoder is
197         // invoked the session buffer gets appended to
198         if (buf.hasRemaining()) {
199             if (usingSessionBuffer && buf.isAutoExpand()) {
200                 buf.compact();
201             } else {
202                 storeRemainingInSession(buf, session);
203             }
204         } else {
205             if (usingSessionBuffer) {
206                 removeSessionBuffer(session);
207             }
208         }
209     }
210 
211     /**
212      * Implement this method to consume the specified cumulative buffer and
213      * decode its content into message(s).
214      *
215      * @param session The current Session
216      * @param in the cumulative buffer
217      * @param out The {@link ProtocolDecoderOutput} that will receive the decoded message
218      * @return <tt>true</tt> if and only if there's more to decode in the buffer
219      *         and you want to have <tt>doDecode</tt> method invoked again.
220      *         Return <tt>false</tt> if remaining data is not enough to decode,
221      *         then this method will be invoked again when more data is
222      *         cumulated.
223      * @throws Exception if cannot decode <tt>in</tt>.
224      */
225     protected abstract boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception;
226 
227     /**
228      * Releases the cumulative buffer used by the specified <tt>session</tt>.
229      * Please don't forget to call <tt>super.dispose( session )</tt> when you
230      * override this method.
231      */
232     @Override
233     public void dispose(IoSession session) throws Exception {
234         removeSessionBuffer(session);
235     }
236 
237     private void removeSessionBuffer(IoSession session) {
238         session.removeAttribute(BUFFER);
239     }
240 
241     private void storeRemainingInSession(IoBuffer buf, IoSession session) {
242         final IoBuffer remainingBuf = IoBuffer.allocate(buf.capacity()).setAutoExpand(true);
243 
244         remainingBuf.order(buf.order());
245         remainingBuf.put(buf);
246 
247         session.setAttribute(BUFFER, remainingBuf);
248     }
249     
250     /**
251      * Let the user change the way we handle fragmentation. If set to <tt>false</tt>, the 
252      * decode() method will not check the TransportMetadata fragmentation capability
253      *  
254      * @param transportMetadataFragmentation The flag to set.
255      */
256     public void setTransportMetadataFragmentation(boolean transportMetadataFragmentation) {
257         this.transportMetadataFragmentation = transportMetadataFragmentation;
258     }
259 }