001/*
002 *  Licensed to the Apache Software Foundation (ASF) under one
003 *  or more contributor license agreements.  See the NOTICE file
004 *  distributed with this work for additional information
005 *  regarding copyright ownership.  The ASF licenses this file
006 *  to you under the Apache License, Version 2.0 (the
007 *  "License"); you may not use this file except in compliance
008 *  with the License.  You may obtain a copy of the License at
009 *
010 *    http://www.apache.org/licenses/LICENSE-2.0
011 *
012 *  Unless required by applicable law or agreed to in writing,
013 *  software distributed under the License is distributed on an
014 *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 *  KIND, either express or implied.  See the License for the
016 *  specific language governing permissions and limitations
017 *  under the License.
018 *
019 */
020package org.apache.mina.filter.codec;
021
022import org.apache.mina.core.buffer.IoBuffer;
023import org.apache.mina.core.service.TransportMetadata;
024import org.apache.mina.core.session.AttributeKey;
025import org.apache.mina.core.session.IoSession;
026
027/**
028 * A {@link ProtocolDecoder} that cumulates the content of received buffers to a
029 * <em>cumulative buffer</em> to help users implement decoders.
030 * <p>
031 * If the received {@link IoBuffer} is only a part of a message. decoders should
032 * cumulate received buffers to make a message complete or to postpone decoding
033 * until more buffers arrive.
034 * <p>
035 * Here is an example decoder that decodes CRLF terminated lines into
036 * <code>Command</code> objects:
037 * 
038 * <pre>
039 * public class CrLfTerminatedCommandLineDecoder extends CumulativeProtocolDecoder {
040 * 
041 *     private Command parseCommand(IoBuffer in) {
042 *         // Convert the bytes in the specified buffer to a
043 *         // Command object.
044 *         ...
045 *     }
046 * 
047 *     protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
048 * 
049 *         // Remember the initial position.
050 *         int start = in.position();
051 * 
052 *         // Now find the first CRLF in the buffer.
053 *         byte previous = 0;
054 *         while (in.hasRemaining()) {
055 *             byte current = in.get();
056 * 
057 *             if (previous == '\r' &amp;&amp; current == '\n') {
058 *                 // Remember the current position and limit.
059 *                 int position = in.position();
060 *                 int limit = in.limit();
061 *                 try {
062 *                     in.position(start);
063 *                     in.limit(position);
064 *                     // The bytes between in.position() and in.limit()
065 *                     // now contain a full CRLF terminated line.
066 *                     out.write(parseCommand(in.slice()));
067 *                 } finally {
068 *                     // Set the position to point right after the
069 *                     // detected line and set the limit to the old
070 *                     // one.
071 *                     in.position(position);
072 *                     in.limit(limit);
073 *                 }
074 *                 // Decoded one line; CumulativeProtocolDecoder will
075 *                 // call me again until I return false. So just
076 *                 // return true until there are no more lines in the
077 *                 // buffer.
078 *                 return true;
079 *             }
080 * 
081 *             previous = current;
082 *         }
083 * 
084 *         // Could not find CRLF in the buffer. Reset the initial
085 *         // position to the one we recorded above.
086 *         in.position(start);
087 * 
088 *         return false;
089 *     }
090 * }
091 * </pre>
092 * <p>
093 * Please note that this decoder simply forward the call to
094 * {@link #doDecode(IoSession, IoBuffer, ProtocolDecoderOutput)} if the
095 * underlying transport doesn't have a packet fragmentation. Whether the
096 * transport has fragmentation or not is determined by querying
097 * {@link TransportMetadata}.
098 *
099 * @author <a href="http://mina.apache.org">Apache MINA Project</a>
100 */
101public abstract class CumulativeProtocolDecoder extends ProtocolDecoderAdapter {
102    /** The buffer used to store the data in the session */
103    private final AttributeKey BUFFER = new AttributeKey(getClass(), "buffer");
104    
105    /** A flag set to true if we handle fragmentation accordingly to the TransportMetadata setting. 
106     * It can be set to false if needed (UDP with fragments, for instance). the default value is 'true'
107     */
108    private boolean transportMetadataFragmentation = true;
109
110    /**
111     * Creates a new instance.
112     */
113    protected CumulativeProtocolDecoder() {
114        // Do nothing
115    }
116
117    /**
118     * Cumulates content of <tt>in</tt> into internal buffer and forwards
119     * decoding request to
120     * {@link #doDecode(IoSession, IoBuffer, ProtocolDecoderOutput)}.
121     * <tt>doDecode()</tt> is invoked repeatedly until it returns <tt>false</tt>
122     * and the cumulative buffer is compacted after decoding ends.
123     *
124     * @throws IllegalStateException
125     *             if your <tt>doDecode()</tt> returned <tt>true</tt> not
126     *             consuming the cumulative buffer.
127     */
128    public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
129        if (transportMetadataFragmentation && !session.getTransportMetadata().hasFragmentation()) {
130            while (in.hasRemaining()) {
131                if (!doDecode(session, in, out)) {
132                    break;
133                }
134            }
135
136            return;
137        }
138
139        boolean usingSessionBuffer = true;
140        IoBuffer buf = (IoBuffer) session.getAttribute(BUFFER);
141        // If we have a session buffer, append data to that; otherwise
142        // use the buffer read from the network directly.
143        if (buf != null) {
144            boolean appended = false;
145            // Make sure that the buffer is auto-expanded.
146            if (buf.isAutoExpand()) {
147                try {
148                    buf.put(in);
149                    appended = true;
150                } catch (IllegalStateException e) {
151                    // A user called derivation method (e.g. slice()),
152                    // which disables auto-expansion of the parent buffer.
153                } catch (IndexOutOfBoundsException e) {
154                    // A user disabled auto-expansion.
155                }
156            }
157
158            if (appended) {
159                buf.flip();
160            } else {
161                // Reallocate the buffer if append operation failed due to
162                // derivation or disabled auto-expansion.
163                buf.flip();
164                IoBuffer newBuf = IoBuffer.allocate(buf.remaining() + in.remaining()).setAutoExpand(true);
165                newBuf.order(buf.order());
166                newBuf.put(buf);
167                newBuf.put(in);
168                newBuf.flip();
169                buf = newBuf;
170
171                // Update the session attribute.
172                session.setAttribute(BUFFER, buf);
173            }
174        } else {
175            buf = in;
176            usingSessionBuffer = false;
177        }
178
179        for (;;) {
180            int oldPos = buf.position();
181            boolean decoded = doDecode(session, buf, out);
182            if (decoded) {
183                if (buf.position() == oldPos) {
184                    throw new IllegalStateException("doDecode() can't return true when buffer is not consumed.");
185                }
186
187                if (!buf.hasRemaining()) {
188                    break;
189                }
190            } else {
191                break;
192            }
193        }
194
195        // if there is any data left that cannot be decoded, we store
196        // it in a buffer in the session and next time this decoder is
197        // invoked the session buffer gets appended to
198        if (buf.hasRemaining()) {
199            if (usingSessionBuffer && buf.isAutoExpand()) {
200                buf.compact();
201            } else {
202                storeRemainingInSession(buf, session);
203            }
204        } else {
205            if (usingSessionBuffer) {
206                removeSessionBuffer(session);
207            }
208        }
209    }
210
211    /**
212     * Implement this method to consume the specified cumulative buffer and
213     * decode its content into message(s).
214     *
215     * @param session The current Session
216     * @param in the cumulative buffer
217     * @param out The {@link ProtocolDecoderOutput} that will receive the decoded message
218     * @return <tt>true</tt> if and only if there's more to decode in the buffer
219     *         and you want to have <tt>doDecode</tt> method invoked again.
220     *         Return <tt>false</tt> if remaining data is not enough to decode,
221     *         then this method will be invoked again when more data is
222     *         cumulated.
223     * @throws Exception if cannot decode <tt>in</tt>.
224     */
225    protected abstract boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception;
226
227    /**
228     * Releases the cumulative buffer used by the specified <tt>session</tt>.
229     * Please don't forget to call <tt>super.dispose( session )</tt> when you
230     * override this method.
231     */
232    @Override
233    public void dispose(IoSession session) throws Exception {
234        removeSessionBuffer(session);
235    }
236
237    private void removeSessionBuffer(IoSession session) {
238        session.removeAttribute(BUFFER);
239    }
240
241    private void storeRemainingInSession(IoBuffer buf, IoSession session) {
242        final IoBuffer remainingBuf = IoBuffer.allocate(buf.capacity()).setAutoExpand(true);
243
244        remainingBuf.order(buf.order());
245        remainingBuf.put(buf);
246
247        session.setAttribute(BUFFER, remainingBuf);
248    }
249    
250    /**
251     * Let the user change the way we handle fragmentation. If set to <tt>false</tt>, the 
252     * decode() method will not check the TransportMetadata fragmentation capability
253     *  
254     * @param transportMetadataFragmentation The flag to set.
255     */
256    public void setTransportMetadataFragmentation(boolean transportMetadataFragmentation) {
257        this.transportMetadataFragmentation = transportMetadataFragmentation;
258    }
259}