View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.filter.codec.statemachine;
21  
22  import java.util.ArrayList;
23  import java.util.List;
24  
25  import org.apache.mina.core.buffer.IoBuffer;
26  import org.apache.mina.core.filterchain.IoFilter.NextFilter;
27  import org.apache.mina.core.session.IoSession;
28  import org.apache.mina.filter.codec.ProtocolCodecFilter;
29  import org.apache.mina.filter.codec.ProtocolDecoderOutput;
30  import org.slf4j.Logger;
31  import org.slf4j.LoggerFactory;
32  
33  /**
34   * Abstract base class for decoder state machines. Calls {@link #init()} to
35   * get the start {@link DecodingState} of the state machine. Calls 
36   * {@link #destroy()} when the state machine has reached its end state or when
37   * the session is closed.
38   * <p>
39   * NOTE: The {@link ProtocolDecoderOutput} used by this class when calling 
40   * {@link DecodingState#decode(IoBuffer, ProtocolDecoderOutput)} buffers decoded
41   * messages in a {@link List}. Once the state machine has reached its end state
42   * this class will call {@link #finishDecode(List, ProtocolDecoderOutput)}. The 
43   * implementation will have to take care of writing the decoded messages to the 
44   * real {@link ProtocolDecoderOutput} used by the configured 
45   * {@link ProtocolCodecFilter}.
46   * </p>
47   * 
48   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
49   */
50  public abstract class DecodingStateMachine implements DecodingState {
51      private final Logger log = LoggerFactory.getLogger(DecodingStateMachine.class);
52  
53      private final List<Object> childProducts = new ArrayList<Object>();
54  
55      private final ProtocolDecoderOutput childOutput = new ProtocolDecoderOutput() {
56          public void flush(NextFilter nextFilter, IoSession session) {
57              // Do nothing
58          }
59  
60          public void write(Object message) {
61              childProducts.add(message);
62          }
63      };
64  
65      private DecodingState currentState;
66  
67      private boolean initialized;
68  
69      /**
70       * Invoked to initialize this state machine.
71       * 
72       * @return the start {@link DecodingState}.
73       */
74      protected abstract DecodingState init() throws Exception;
75  
76      /**
77       * Called once the state machine has reached its end.
78       * 
79       * @param childProducts contains the messages generated by each of the 
80       *        {@link DecodingState}s which were exposed to the received data 
81       *        during the life time of this state machine.
82       * @param out the real {@link ProtocolDecoderOutput} used by the 
83       *        {@link ProtocolCodecFilter}.
84       * @return the next state if the state machine should resume.
85       */
86      protected abstract DecodingState finishDecode(List<Object> childProducts, ProtocolDecoderOutput out)
87              throws Exception;
88  
89      /**
90       * Invoked to destroy this state machine once the end state has been reached
91       * or the session has been closed.
92       */
93      protected abstract void destroy() throws Exception;
94  
95      /**
96       * {@inheritDoc}
97       */
98      public DecodingState decode(IoBuffer in, ProtocolDecoderOutput out) throws Exception {
99          DecodingState state = getCurrentState();
100 
101         final int limit = in.limit();
102         int pos = in.position();
103 
104         try {
105             for (;;) {
106                 // Wait for more data if all data is consumed.
107                 if (pos == limit) {
108                     break;
109                 }
110 
111                 DecodingState oldState = state;
112                 state = state.decode(in, childOutput);
113 
114                 // If finished, call finishDecode
115                 if (state == null) {
116                     return finishDecode(childProducts, out);
117                 }
118 
119                 int newPos = in.position();
120 
121                 // Wait for more data if nothing is consumed and state didn't change.
122                 if (newPos == pos && oldState == state) {
123                     break;
124                 }
125                 pos = newPos;
126             }
127 
128             return this;
129         } catch (Exception e) {
130             state = null;
131             throw e;
132         } finally {
133             this.currentState = state;
134 
135             // Destroy if decoding is finished or failed.
136             if (state == null) {
137                 cleanup();
138             }
139         }
140     }
141 
142     /**
143      * {@inheritDoc}
144      */
145     public DecodingState finishDecode(ProtocolDecoderOutput out) throws Exception {
146         DecodingState nextState;
147         DecodingState state = getCurrentState();
148         try {
149             for (;;) {
150                 DecodingState oldState = state;
151                 state = state.finishDecode(childOutput);
152                 if (state == null) {
153                     // Finished
154                     break;
155                 }
156 
157                 // Exit if state didn't change.
158                 if (oldState == state) {
159                     break;
160                 }
161             }
162         } catch (Exception e) {
163             state = null;
164             log.debug("Ignoring the exception caused by a closed session.", e);
165         } finally {
166             this.currentState = state;
167             nextState = finishDecode(childProducts, out);
168             if (state == null) {
169                 cleanup();
170             }
171         }
172         return nextState;
173     }
174 
175     private void cleanup() {
176         if (!initialized) {
177             throw new IllegalStateException();
178         }
179 
180         initialized = false;
181         childProducts.clear();
182         try {
183             destroy();
184         } catch (Exception e2) {
185             log.warn("Failed to destroy a decoding state machine.", e2);
186         }
187     }
188 
189     private DecodingState getCurrentState() throws Exception {
190         DecodingState state = this.currentState;
191         if (state == null) {
192             state = init();
193             initialized = true;
194         }
195         return state;
196     }
197 }