View Javadoc
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.filter.codec;
21  
22  import java.net.SocketAddress;
23  import java.util.Queue;
24  
25  import org.apache.mina.core.buffer.IoBuffer;
26  import org.apache.mina.core.file.FileRegion;
27  import org.apache.mina.core.filterchain.IoFilter;
28  import org.apache.mina.core.filterchain.IoFilterAdapter;
29  import org.apache.mina.core.filterchain.IoFilterChain;
30  import org.apache.mina.core.future.DefaultWriteFuture;
31  import org.apache.mina.core.future.WriteFuture;
32  import org.apache.mina.core.session.AttributeKey;
33  import org.apache.mina.core.session.IoSession;
34  import org.apache.mina.core.write.DefaultWriteRequest;
35  import org.apache.mina.core.write.NothingWrittenException;
36  import org.apache.mina.core.write.WriteRequest;
37  import org.apache.mina.core.write.WriteRequestWrapper;
38  import org.slf4j.Logger;
39  import org.slf4j.LoggerFactory;
40  
41  /**
42   * An {@link IoFilter} which translates binary or protocol specific data into
43   * message objects and vice versa using {@link ProtocolCodecFactory},
44   * {@link ProtocolEncoder}, or {@link ProtocolDecoder}.
45   *
46   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
47   * @org.apache.xbean.XBean
48   */
49  public class ProtocolCodecFilter extends IoFilterAdapter {
50      /** A logger for this class */
51      private static final Logger LOGGER = LoggerFactory.getLogger(ProtocolCodecFilter.class);
52  
53      private static final Class<?>[] EMPTY_PARAMS = new Class[0];
54  
55      private static final IoBuffer EMPTY_BUFFER = IoBuffer.wrap(new byte[0]);
56  
57      private static final AttributeKey ENCODER = new AttributeKey(ProtocolCodecFilter.class, "encoder");
58  
59      private static final AttributeKey DECODER = new AttributeKey(ProtocolCodecFilter.class, "decoder");
60  
61      private static final AttributeKey DECODER_OUT = new AttributeKey(ProtocolCodecFilter.class, "decoderOut");
62  
63      private static final AttributeKey ENCODER_OUT = new AttributeKey(ProtocolCodecFilter.class, "encoderOut");
64  
65      /** The factory responsible for creating the encoder and decoder */
66      private final ProtocolCodecFactory factory;
67  
68      /**
69       * Creates a new instance of ProtocolCodecFilter, associating a factory
70       * for the creation of the encoder and decoder.
71       *
72       * @param factory The associated factory
73       */
74      public ProtocolCodecFilter(ProtocolCodecFactory factory) {
75          if (factory == null) {
76              throw new IllegalArgumentException("factory");
77          }
78  
79          this.factory = factory;
80      }
81  
82      /**
83       * Creates a new instance of ProtocolCodecFilter, without any factory.
84       * The encoder/decoder factory will be created as an inner class, using
85       * the two parameters (encoder and decoder).
86       * 
87       * @param encoder The class responsible for encoding the message
88       * @param decoder The class responsible for decoding the message
89       */
90      public ProtocolCodecFilter(final ProtocolEncoder encoder, final ProtocolDecoder decoder) {
91          if (encoder == null) {
92              throw new IllegalArgumentException("encoder");
93          }
94          if (decoder == null) {
95              throw new IllegalArgumentException("decoder");
96          }
97  
98          // Create the inner Factory based on the two parameters
99          this.factory = new ProtocolCodecFactory() {
100             public ProtocolEncoder getEncoder(IoSession session) {
101                 return encoder;
102             }
103 
104             public ProtocolDecoder getDecoder(IoSession session) {
105                 return decoder;
106             }
107         };
108     }
109 
110     /**
111      * Creates a new instance of ProtocolCodecFilter, without any factory.
112      * The encoder/decoder factory will be created as an inner class, using
113      * the two parameters (encoder and decoder), which are class names. Instances
114      * for those classes will be created in this constructor.
115      * 
116      * @param encoderClass The class responsible for encoding the message
117      * @param decoderClass The class responsible for decoding the message
118      */
119     public ProtocolCodecFilter(final Class<? extends ProtocolEncoder> encoderClass,
120             final Class<? extends ProtocolDecoder> decoderClass) {
121         if (encoderClass == null) {
122             throw new IllegalArgumentException("encoderClass");
123         }
124         if (decoderClass == null) {
125             throw new IllegalArgumentException("decoderClass");
126         }
127         if (!ProtocolEncoder.class.isAssignableFrom(encoderClass)) {
128             throw new IllegalArgumentException("encoderClass: " + encoderClass.getName());
129         }
130         if (!ProtocolDecoder.class.isAssignableFrom(decoderClass)) {
131             throw new IllegalArgumentException("decoderClass: " + decoderClass.getName());
132         }
133         try {
134             encoderClass.getConstructor(EMPTY_PARAMS);
135         } catch (NoSuchMethodException e) {
136             throw new IllegalArgumentException("encoderClass doesn't have a public default constructor.");
137         }
138         try {
139             decoderClass.getConstructor(EMPTY_PARAMS);
140         } catch (NoSuchMethodException e) {
141             throw new IllegalArgumentException("decoderClass doesn't have a public default constructor.");
142         }
143 
144         final ProtocolEncoder encoder;
145 
146         try {
147             encoder = encoderClass.newInstance();
148         } catch (Exception e) {
149             throw new IllegalArgumentException("encoderClass cannot be initialized");
150         }
151 
152         final ProtocolDecoder decoder;
153 
154         try {
155             decoder = decoderClass.newInstance();
156         } catch (Exception e) {
157             throw new IllegalArgumentException("decoderClass cannot be initialized");
158         }
159 
160         // Create the inner factory based on the two parameters.
161         this.factory = new ProtocolCodecFactory() {
162             public ProtocolEncoder getEncoder(IoSession session) throws Exception {
163                 return encoder;
164             }
165 
166             public ProtocolDecoder getDecoder(IoSession session) throws Exception {
167                 return decoder;
168             }
169         };
170     }
171 
172     /**
173      * Get the encoder instance from a given session.
174      *
175      * @param session The associated session we will get the encoder from
176      * @return The encoder instance, if any
177      */
178     public ProtocolEncoder getEncoder(IoSession session) {
179         return (ProtocolEncoder) session.getAttribute(ENCODER);
180     }
181 
182     @Override
183     public void onPreAdd(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception {
184         if (parent.contains(this)) {
185             throw new IllegalArgumentException(
186                     "You can't add the same filter instance more than once.  Create another instance and add it.");
187         }
188     }
189 
190     @Override
191     public void onPostRemove(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception {
192         // Clean everything
193         disposeCodec(parent.getSession());
194     }
195 
196     /**
197      * Process the incoming message, calling the session decoder. As the incoming
198      * buffer might contains more than one messages, we have to loop until the decoder
199      * throws an exception.
200      * 
201      *  while ( buffer not empty )
202      *    try
203      *      decode ( buffer )
204      *    catch
205      *      break;
206      * 
207      */
208     @Override
209     public void messageReceived(NextFilter nextFilter, IoSession session, Object message) throws Exception {
210         LOGGER.debug("Processing a MESSAGE_RECEIVED for session {}", session.getId());
211 
212         if (!(message instanceof IoBuffer)) {
213             nextFilter.messageReceived(session, message);
214             return;
215         }
216 
217         IoBuffer in = (IoBuffer) message;
218         ProtocolDecoder decoder = factory.getDecoder(session);
219         ProtocolDecoderOutput decoderOut = getDecoderOut(session, nextFilter);
220 
221         // Loop until we don't have anymore byte in the buffer,
222         // or until the decoder throws an unrecoverable exception or
223         // can't decoder a message, because there are not enough
224         // data in the buffer
225         while (in.hasRemaining()) {
226             int oldPos = in.position();
227             try {
228                 synchronized (session) {
229                     // Call the decoder with the read bytes
230                     decoder.decode(session, in, decoderOut);
231                 }
232                 // Finish decoding if no exception was thrown.
233                 decoderOut.flush(nextFilter, session);
234             } catch (Exception e) {
235                 ProtocolDecoderException pde;
236                 if (e instanceof ProtocolDecoderException) {
237                     pde = (ProtocolDecoderException) e;
238                 } else {
239                     pde = new ProtocolDecoderException(e);
240                 }
241                 if (pde.getHexdump() == null) {
242                     // Generate a message hex dump
243                     int curPos = in.position();
244                     in.position(oldPos);
245                     pde.setHexdump(in.getHexDump());
246                     in.position(curPos);
247                 }
248                 // Fire the exceptionCaught event.
249                 decoderOut.flush(nextFilter, session);
250                 nextFilter.exceptionCaught(session, pde);
251                 // Retry only if the type of the caught exception is
252                 // recoverable and the buffer position has changed.
253                 // We check buffer position additionally to prevent an
254                 // infinite loop.
255                 if (!(e instanceof RecoverableProtocolDecoderException) || (in.position() == oldPos)) {
256                     break;
257                 }
258             }
259         }
260     }
261 
262     @Override
263     public void messageSent(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception {
264         if (writeRequest instanceof EncodedWriteRequest) {
265             return;
266         }
267 
268         if (writeRequest instanceof MessageWriteRequest) {
269             MessageWriteRequest wrappedRequest = (MessageWriteRequest) writeRequest;
270             nextFilter.messageSent(session, wrappedRequest.getParentRequest());
271         } else {
272             nextFilter.messageSent(session, writeRequest);
273         }
274     }
275 
276     @Override
277     public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception {
278         Object message = writeRequest.getMessage();
279 
280         // Bypass the encoding if the message is contained in a IoBuffer,
281         // as it has already been encoded before
282         if ((message instanceof IoBuffer) || (message instanceof FileRegion)) {
283             nextFilter.filterWrite(session, writeRequest);
284             return;
285         }
286 
287         // Get the encoder in the session
288         ProtocolEncoder encoder = factory.getEncoder(session);
289 
290         ProtocolEncoderOutput encoderOut = getEncoderOut(session, nextFilter, writeRequest);
291 
292         if (encoder == null) {
293             throw new ProtocolEncoderException("The encoder is null for the session " + session);
294         }
295 
296         try {
297             // Now we can try to encode the response
298             encoder.encode(session, message, encoderOut);
299 
300             // Send it directly
301             Queue<Object> bufferQueue = ((AbstractProtocolEncoderOutput) encoderOut).getMessageQueue();
302 
303             // Write all the encoded messages now
304             while (!bufferQueue.isEmpty()) {
305                 Object encodedMessage = bufferQueue.poll();
306 
307                 if (encodedMessage == null) {
308                     break;
309                 }
310 
311                 // Flush only when the buffer has remaining.
312                 if (!(encodedMessage instanceof IoBuffer) || ((IoBuffer) encodedMessage).hasRemaining()) {
313                     SocketAddress destination = writeRequest.getDestination();
314                     WriteRequest encodedWriteRequest = new EncodedWriteRequest(encodedMessage, null, destination);
315 
316                     nextFilter.filterWrite(session, encodedWriteRequest);
317                 }
318             }
319 
320             // Call the next filter
321             nextFilter.filterWrite(session, new MessageWriteRequest(writeRequest));
322         } catch (Exception e) {
323             ProtocolEncoderException pee;
324 
325             // Generate the correct exception
326             if (e instanceof ProtocolEncoderException) {
327                 pee = (ProtocolEncoderException) e;
328             } else {
329                 pee = new ProtocolEncoderException(e);
330             }
331 
332             throw pee;
333         }
334     }
335 
336     @Override
337     public void sessionClosed(NextFilter nextFilter, IoSession session) throws Exception {
338         // Call finishDecode() first when a connection is closed.
339         ProtocolDecoder decoder = factory.getDecoder(session);
340         ProtocolDecoderOutput decoderOut = getDecoderOut(session, nextFilter);
341 
342         try {
343             decoder.finishDecode(session, decoderOut);
344         } catch (Exception e) {
345             ProtocolDecoderException pde;
346             if (e instanceof ProtocolDecoderException) {
347                 pde = (ProtocolDecoderException) e;
348             } else {
349                 pde = new ProtocolDecoderException(e);
350             }
351             throw pde;
352         } finally {
353             // Dispose everything
354             disposeCodec(session);
355             decoderOut.flush(nextFilter, session);
356         }
357 
358         // Call the next filter
359         nextFilter.sessionClosed(session);
360     }
361 
362     private static class EncodedWriteRequest extends DefaultWriteRequest {
363         public EncodedWriteRequest(Object encodedMessage, WriteFuture future, SocketAddress destination) {
364             super(encodedMessage, future, destination);
365         }
366 
367         public boolean isEncoded() {
368             return true;
369         }
370     }
371 
372     private static class MessageWriteRequest extends WriteRequestWrapper {
373         public MessageWriteRequest(WriteRequest writeRequest) {
374             super(writeRequest);
375         }
376 
377         @Override
378         public Object getMessage() {
379             return EMPTY_BUFFER;
380         }
381 
382         @Override
383         public String toString() {
384             return "MessageWriteRequest, parent : " + super.toString();
385         }
386     }
387 
388     private static class ProtocolDecoderOutputImpl extends AbstractProtocolDecoderOutput {
389         public ProtocolDecoderOutputImpl() {
390             // Do nothing
391         }
392 
393         public void flush(NextFilter nextFilter, IoSession session) {
394             Queue<Object> messageQueue = getMessageQueue();
395 
396             while (!messageQueue.isEmpty()) {
397                 nextFilter.messageReceived(session, messageQueue.poll());
398             }
399         }
400     }
401 
402     private static class ProtocolEncoderOutputImpl extends AbstractProtocolEncoderOutput {
403         private final IoSession session;
404 
405         private final NextFilter nextFilter;
406 
407         /** The WriteRequest destination */
408         private final SocketAddress destination;
409 
410         public ProtocolEncoderOutputImpl(IoSession session, NextFilter nextFilter, WriteRequest writeRequest) {
411             this.session = session;
412             this.nextFilter = nextFilter;
413 
414             // Only store the destination, not the full WriteRequest.
415             destination = writeRequest.getDestination();
416         }
417 
418         public WriteFuture flush() {
419             Queue<Object> bufferQueue = getMessageQueue();
420             WriteFuture future = null;
421 
422             while (!bufferQueue.isEmpty()) {
423                 Object encodedMessage = bufferQueue.poll();
424 
425                 if (encodedMessage == null) {
426                     break;
427                 }
428 
429                 // Flush only when the buffer has remaining.
430                 if (!(encodedMessage instanceof IoBuffer) || ((IoBuffer) encodedMessage).hasRemaining()) {
431                     future = new DefaultWriteFuture(session);
432                     nextFilter.filterWrite(session, new EncodedWriteRequest(encodedMessage, future, destination));
433                 }
434             }
435 
436             if (future == null) {
437                 // Creates an empty writeRequest containing the destination
438                 WriteRequest writeRequest = new DefaultWriteRequest(
439                         DefaultWriteRequest.EMPTY_MESSAGE, null, destination);
440                 future = DefaultWriteFuture.newNotWrittenFuture(session, new NothingWrittenException(writeRequest));
441             }
442 
443             return future;
444         }
445     }
446 
447     //----------- Helper methods ---------------------------------------------
448     /**
449      * Dispose the encoder, decoder, and the callback for the decoded
450      * messages.
451      */
452     private void disposeCodec(IoSession session) {
453         // We just remove the two instances of encoder/decoder to release resources
454         // from the session
455         disposeEncoder(session);
456         disposeDecoder(session);
457 
458         // We also remove the callback
459         disposeDecoderOut(session);
460     }
461 
462     /**
463      * Dispose the encoder, removing its instance from the
464      * session's attributes, and calling the associated
465      * dispose method.
466      */
467     private void disposeEncoder(IoSession session) {
468         ProtocolEncoder encoder = (ProtocolEncoder) session.removeAttribute(ENCODER);
469         if (encoder == null) {
470             return;
471         }
472 
473         try {
474             encoder.dispose(session);
475         } catch (Exception e) {
476             LOGGER.warn("Failed to dispose: " + encoder.getClass().getName() + " (" + encoder + ')');
477         }
478     }
479 
480     /**
481      * Dispose the decoder, removing its instance from the
482      * session's attributes, and calling the associated
483      * dispose method.
484      */
485     private void disposeDecoder(IoSession session) {
486         ProtocolDecoder decoder = (ProtocolDecoder) session.removeAttribute(DECODER);
487         if (decoder == null) {
488             return;
489         }
490 
491         try {
492             decoder.dispose(session);
493         } catch (Exception e) {
494             LOGGER.warn("Failed to dispose: " + decoder.getClass().getName() + " (" + decoder + ')');
495         }
496     }
497 
498     /**
499      * Return a reference to the decoder callback. If it's not already created
500      * and stored into the session, we create a new instance.
501      */
502     private ProtocolDecoderOutput getDecoderOut(IoSession session, NextFilter nextFilter) {
503         ProtocolDecoderOutput out = (ProtocolDecoderOutput) session.getAttribute(DECODER_OUT);
504 
505         if (out == null) {
506             // Create a new instance, and stores it into the session
507             out = new ProtocolDecoderOutputImpl();
508             session.setAttribute(DECODER_OUT, out);
509         }
510 
511         return out;
512     }
513 
514     private ProtocolEncoderOutput getEncoderOut(IoSession session, NextFilter nextFilter, WriteRequest writeRequest) {
515         ProtocolEncoderOutput out = (ProtocolEncoderOutput) session.getAttribute(ENCODER_OUT);
516 
517         if (out == null) {
518             // Create a new instance, and stores it into the session
519             out = new ProtocolEncoderOutputImpl(session, nextFilter, writeRequest);
520             session.setAttribute(ENCODER_OUT, out);
521         }
522 
523         return out;
524     }
525 
526     /**
527      * Remove the decoder callback from the session's attributes.
528      */
529     private void disposeDecoderOut(IoSession session) {
530         session.removeAttribute(DECODER_OUT);
531     }
532 }