1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 *
19 */
20 package org.apache.mina.filter.codec;
21
22 import org.apache.mina.core.buffer.IoBuffer;
23 import org.apache.mina.core.service.TransportMetadata;
24 import org.apache.mina.core.session.AttributeKey;
25 import org.apache.mina.core.session.IoSession;
26
27 /**
28 * A {@link ProtocolDecoder} that cumulates the content of received
29 * buffers to a <em>cumulative buffer</em> to help users implement decoders.
30 * <p>
31 * If the received {@link IoBuffer} is only a part of a message.
32 * decoders should cumulate received buffers to make a message complete or
33 * to postpone decoding until more buffers arrive.
34 * <p>
35 * Here is an example decoder that decodes CRLF terminated lines into
36 * <code>Command</code> objects:
37 * <pre>
38 * public class CrLfTerminatedCommandLineDecoder
39 * extends CumulativeProtocolDecoder {
40 *
41 * private Command parseCommand(IoBuffer in) {
42 * // Convert the bytes in the specified buffer to a
43 * // Command object.
44 * ...
45 * }
46 *
47 * protected boolean doDecode(
48 * IoSession session, IoBuffer in, ProtocolDecoderOutput out)
49 * throws Exception {
50 *
51 * // Remember the initial position.
52 * int start = in.position();
53 *
54 * // Now find the first CRLF in the buffer.
55 * byte previous = 0;
56 * while (in.hasRemaining()) {
57 * byte current = in.get();
58 *
59 * if (previous == '\r' && current == '\n') {
60 * // Remember the current position and limit.
61 * int position = in.position();
62 * int limit = in.limit();
63 * try {
64 * in.position(start);
65 * in.limit(position);
66 * // The bytes between in.position() and in.limit()
67 * // now contain a full CRLF terminated line.
68 * out.write(parseCommand(in.slice()));
69 * } finally {
70 * // Set the position to point right after the
71 * // detected line and set the limit to the old
72 * // one.
73 * in.position(position);
74 * in.limit(limit);
75 * }
76 * // Decoded one line; CumulativeProtocolDecoder will
77 * // call me again until I return false. So just
78 * // return true until there are no more lines in the
79 * // buffer.
80 * return true;
81 * }
82 *
83 * previous = current;
84 * }
85 *
86 * // Could not find CRLF in the buffer. Reset the initial
87 * // position to the one we recorded above.
88 * in.position(start);
89 *
90 * return false;
91 * }
92 * }
93 * </pre>
94 * <p>
95 * Please note that this decoder simply forward the call to
96 * {@link #doDecode(IoSession, IoBuffer, ProtocolDecoderOutput)} if the
97 * underlying transport doesn't have a packet fragmentation. Whether the
98 * transport has fragmentation or not is determined by querying
99 * {@link TransportMetadata}.
100 *
101 * @author <a href="http://mina.apache.org">Apache MINA Project</a>
102 */
103 public abstract class CumulativeProtocolDecoder extends ProtocolDecoderAdapter {
104
105 private final AttributeKey BUFFER = new AttributeKey(getClass(), "buffer");
106
107 /**
108 * Creates a new instance.
109 */
110 protected CumulativeProtocolDecoder() {
111 // Do nothing
112 }
113
114 /**
115 * Cumulates content of <tt>in</tt> into internal buffer and forwards
116 * decoding request to {@link #doDecode(IoSession, IoBuffer, ProtocolDecoderOutput)}.
117 * <tt>doDecode()</tt> is invoked repeatedly until it returns <tt>false</tt>
118 * and the cumulative buffer is compacted after decoding ends.
119 *
120 * @throws IllegalStateException if your <tt>doDecode()</tt> returned
121 * <tt>true</tt> not consuming the cumulative buffer.
122 */
123 public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
124 if (!session.getTransportMetadata().hasFragmentation()) {
125 while (in.hasRemaining()) {
126 if (!doDecode(session, in, out)) {
127 break;
128 }
129 }
130
131 return;
132 }
133
134 boolean usingSessionBuffer = true;
135 IoBuffer buf = (IoBuffer) session.getAttribute(BUFFER);
136 // If we have a session buffer, append data to that; otherwise
137 // use the buffer read from the network directly.
138 if (buf != null) {
139 boolean appended = false;
140 // Make sure that the buffer is auto-expanded.
141 if (buf.isAutoExpand()) {
142 try {
143 buf.put(in);
144 appended = true;
145 } catch (IllegalStateException e) {
146 // A user called derivation method (e.g. slice()),
147 // which disables auto-expansion of the parent buffer.
148 } catch (IndexOutOfBoundsException e) {
149 // A user disabled auto-expansion.
150 }
151 }
152
153 if (appended) {
154 buf.flip();
155 } else {
156 // Reallocate the buffer if append operation failed due to
157 // derivation or disabled auto-expansion.
158 buf.flip();
159 IoBuffer newBuf = IoBuffer.allocate(buf.remaining() + in.remaining()).setAutoExpand(true);
160 newBuf.order(buf.order());
161 newBuf.put(buf);
162 newBuf.put(in);
163 newBuf.flip();
164 buf = newBuf;
165
166 // Update the session attribute.
167 session.setAttribute(BUFFER, buf);
168 }
169 } else {
170 buf = in;
171 usingSessionBuffer = false;
172 }
173
174 for (;;) {
175 int oldPos = buf.position();
176 boolean decoded = doDecode(session, buf, out);
177 if (decoded) {
178 if (buf.position() == oldPos) {
179 throw new IllegalStateException("doDecode() can't return true when buffer is not consumed.");
180 }
181
182 if (!buf.hasRemaining()) {
183 break;
184 }
185 } else {
186 break;
187 }
188 }
189
190 // if there is any data left that cannot be decoded, we store
191 // it in a buffer in the session and next time this decoder is
192 // invoked the session buffer gets appended to
193 if (buf.hasRemaining()) {
194 if (usingSessionBuffer && buf.isAutoExpand()) {
195 buf.compact();
196 } else {
197 storeRemainingInSession(buf, session);
198 }
199 } else {
200 if (usingSessionBuffer) {
201 removeSessionBuffer(session);
202 }
203 }
204 }
205
206 /**
207 * Implement this method to consume the specified cumulative buffer and
208 * decode its content into message(s).
209 *
210 * @param in the cumulative buffer
211 * @return <tt>true</tt> if and only if there's more to decode in the buffer
212 * and you want to have <tt>doDecode</tt> method invoked again.
213 * Return <tt>false</tt> if remaining data is not enough to decode,
214 * then this method will be invoked again when more data is cumulated.
215 * @throws Exception if cannot decode <tt>in</tt>.
216 */
217 protected abstract boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception;
218
219 /**
220 * Releases the cumulative buffer used by the specified <tt>session</tt>.
221 * Please don't forget to call <tt>super.dispose( session )</tt> when
222 * you override this method.
223 */
224 @Override
225 public void dispose(IoSession session) throws Exception {
226 removeSessionBuffer(session);
227 }
228
229 private void removeSessionBuffer(IoSession session) {
230 session.removeAttribute(BUFFER);
231 }
232
233 private void storeRemainingInSession(IoBuffer buf, IoSession session) {
234 final IoBuffer remainingBuf = IoBuffer.allocate(buf.capacity()).setAutoExpand(true);
235
236 remainingBuf.order(buf.order());
237 remainingBuf.put(buf);
238
239 session.setAttribute(BUFFER, remainingBuf);
240 }
241 }