View Javadoc
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.filter.codec;
21  
22  import java.net.SocketAddress;
23  import java.util.Queue;
24  
25  import org.apache.mina.core.buffer.IoBuffer;
26  import org.apache.mina.core.file.FileRegion;
27  import org.apache.mina.core.filterchain.IoFilter;
28  import org.apache.mina.core.filterchain.IoFilterAdapter;
29  import org.apache.mina.core.filterchain.IoFilterChain;
30  import org.apache.mina.core.future.DefaultWriteFuture;
31  import org.apache.mina.core.future.WriteFuture;
32  import org.apache.mina.core.session.AbstractIoSession;
33  import org.apache.mina.core.session.AttributeKey;
34  import org.apache.mina.core.session.IoSession;
35  import org.apache.mina.core.write.DefaultWriteRequest;
36  import org.apache.mina.core.write.NothingWrittenException;
37  import org.apache.mina.core.write.WriteRequest;
38  import org.slf4j.Logger;
39  import org.slf4j.LoggerFactory;
40  
41  /**
42   * An {@link IoFilter} which translates binary or protocol specific data into
43   * message objects and vice versa using {@link ProtocolCodecFactory},
44   * {@link ProtocolEncoder}, or {@link ProtocolDecoder}.
45   *
46   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
47   * @org.apache.xbean.XBean
48   */
49  public class ProtocolCodecFilter extends IoFilterAdapter {
50      /** A logger for this class */
51      private static final Logger LOGGER = LoggerFactory.getLogger(ProtocolCodecFilter.class);
52  
53      private static final Class<?>[] EMPTY_PARAMS = new Class[0];
54  
55      private static final IoBuffer EMPTY_BUFFER = IoBuffer.wrap(new byte[0]);
56  
57      private static final AttributeKeyributeKey.html#AttributeKey">AttributeKey ENCODER = new AttributeKey(ProtocolCodecFilter.class, "encoder");
58  
59      private static final AttributeKeyributeKey.html#AttributeKey">AttributeKey DECODER = new AttributeKey(ProtocolCodecFilter.class, "decoder");
60  
61      private static final AttributeKeyteKey.html#AttributeKey">AttributeKey DECODER_OUT = new AttributeKey(ProtocolCodecFilter.class, "decoderOut");
62  
63      private static final AttributeKeyteKey.html#AttributeKey">AttributeKey ENCODER_OUT = new AttributeKey(ProtocolCodecFilter.class, "encoderOut");
64  
65      /** The factory responsible for creating the encoder and decoder */
66      private final ProtocolCodecFactory factory;
67  
68      /**
69       * Creates a new instance of ProtocolCodecFilter, associating a factory
70       * for the creation of the encoder and decoder.
71       *
72       * @param factory The associated factory
73       */
74      public ProtocolCodecFilter(ProtocolCodecFactory factory) {
75          if (factory == null) {
76              throw new IllegalArgumentException("factory");
77          }
78  
79          this.factory = factory;
80      }
81  
82      /**
83       * Creates a new instance of ProtocolCodecFilter, without any factory.
84       * The encoder/decoder factory will be created as an inner class, using
85       * the two parameters (encoder and decoder).
86       * 
87       * @param encoder The class responsible for encoding the message
88       * @param decoder The class responsible for decoding the message
89       */
90      public ProtocolCodecFilter(final ProtocolEncoder encoder, final ProtocolDecoder decoder) {
91          if (encoder == null) {
92              throw new IllegalArgumentException("encoder");
93          }
94          if (decoder == null) {
95              throw new IllegalArgumentException("decoder");
96          }
97  
98          // Create the inner Factory based on the two parameters
99          this.factory = new ProtocolCodecFactory() {
100             /**
101              * {@inheritDoc}
102              */
103             @Override
104             public ProtocolEncoder getEncoder(IoSession session) {
105                 return encoder;
106             }
107 
108             /**
109              * {@inheritDoc}
110              */
111             @Override
112             public ProtocolDecoder getDecoder(IoSession session) {
113                 return decoder;
114             }
115         };
116     }
117 
118     /**
119      * Creates a new instance of ProtocolCodecFilter, without any factory.
120      * The encoder/decoder factory will be created as an inner class, using
121      * the two parameters (encoder and decoder), which are class names. Instances
122      * for those classes will be created in this constructor.
123      * 
124      * @param encoderClass The class responsible for encoding the message
125      * @param decoderClass The class responsible for decoding the message
126      */
127     public ProtocolCodecFilter(final Class<? extends ProtocolEncoder> encoderClass,
128             final Class<? extends ProtocolDecoder> decoderClass) {
129         if (encoderClass == null) {
130             throw new IllegalArgumentException("encoderClass");
131         }
132         if (decoderClass == null) {
133             throw new IllegalArgumentException("decoderClass");
134         }
135         if (!ProtocolEncoder.class.isAssignableFrom(encoderClass)) {
136             throw new IllegalArgumentException("encoderClass: " + encoderClass.getName());
137         }
138         if (!ProtocolDecoder.class.isAssignableFrom(decoderClass)) {
139             throw new IllegalArgumentException("decoderClass: " + decoderClass.getName());
140         }
141         try {
142             encoderClass.getConstructor(EMPTY_PARAMS);
143         } catch (NoSuchMethodException e) {
144             throw new IllegalArgumentException("encoderClass doesn't have a public default constructor.");
145         }
146         try {
147             decoderClass.getConstructor(EMPTY_PARAMS);
148         } catch (NoSuchMethodException e) {
149             throw new IllegalArgumentException("decoderClass doesn't have a public default constructor.");
150         }
151 
152         final ProtocolEncoder encoder;
153 
154         try {
155             encoder = encoderClass.newInstance();
156         } catch (Exception e) {
157             throw new IllegalArgumentException("encoderClass cannot be initialized");
158         }
159 
160         final ProtocolDecoder decoder;
161 
162         try {
163             decoder = decoderClass.newInstance();
164         } catch (Exception e) {
165             throw new IllegalArgumentException("decoderClass cannot be initialized");
166         }
167 
168         // Create the inner factory based on the two parameters.
169         this.factory = new ProtocolCodecFactory() {
170             /**
171              * {@inheritDoc}
172              */
173             @Override
174             public ProtocolEncoder getEncoder(IoSession session) throws Exception {
175                 return encoder;
176             }
177 
178             /**
179              * {@inheritDoc}
180              */
181             @Override
182             public ProtocolDecoder getDecoder(IoSession session) throws Exception {
183                 return decoder;
184             }
185         };
186     }
187 
188     /**
189      * Get the encoder instance from a given session.
190      *
191      * @param session The associated session we will get the encoder from
192      * @return The encoder instance, if any
193      */
194     public ProtocolEncoder getEncoder(IoSession session) {
195         return (ProtocolEncoder) session.getAttribute(ENCODER);
196     }
197 
198     /**
199      * {@inheritDoc}
200      */
201     @Override
202     public void onPreAdd(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception {
203         if (parent.contains(this)) {
204             throw new IllegalArgumentException(
205                     "You can't add the same filter instance more than once.  Create another instance and add it.");
206         }
207     }
208 
209     /**
210      * {@inheritDoc}
211      */
212     @Override
213     public void onPostRemove(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception {
214         // Clean everything
215         disposeCodec(parent.getSession());
216     }
217 
218     /**
219      * Process the incoming message, calling the session decoder. As the incoming
220      * buffer might contains more than one messages, we have to loop until the decoder
221      * throws an exception.
222      * 
223      *  while ( buffer not empty )
224      *    try
225      *      decode ( buffer )
226      *    catch
227      *      break;
228      * 
229      */
230     @Override
231     public void messageReceived(NextFilter nextFilter, IoSession session, Object message) throws Exception {
232         if (LOGGER.isDebugEnabled()) {
233             LOGGER.debug("Processing a MESSAGE_RECEIVED for session {}", session.getId());
234         }
235 
236         if (!(message instanceof IoBuffer)) {
237             nextFilter.messageReceived(session, message);
238             return;
239         }
240 
241         IoBuffer="../../../../../org/apache/mina/core/buffer/IoBuffer.html#IoBuffer">IoBuffer in = (IoBuffer) message;
242         ProtocolDecoder decoder = factory.getDecoder(session);
243         ProtocolDecoderOutput decoderOut = getDecoderOut(session, nextFilter);
244 
245         // Loop until we don't have anymore byte in the buffer,
246         // or until the decoder throws an unrecoverable exception or
247         // can't decoder a message, because there are not enough
248         // data in the buffer
249         while (in.hasRemaining()) {
250             int oldPos = in.position();
251             try {
252                 synchronized (session) {
253                     // Call the decoder with the read bytes
254                     decoder.decode(session, in, decoderOut);
255                 }
256                 // Finish decoding if no exception was thrown.
257                 decoderOut.flush(nextFilter, session);
258             } catch (Exception e) {
259                 ProtocolDecoderException pde;
260                 if (e instanceof ProtocolDecoderException) {
261                     pde = (ProtocolDecoderException) e;
262                 } else {
263                     pde = new ProtocolDecoderException(e);
264                 }
265                 if (pde.getHexdump() == null) {
266                     // Generate a message hex dump
267                     int curPos = in.position();
268                     in.position(oldPos);
269                     pde.setHexdump(in.getHexDump());
270                     in.position(curPos);
271                 }
272                 // Fire the exceptionCaught event.
273                 decoderOut.flush(nextFilter, session);
274                 nextFilter.exceptionCaught(session, pde);
275                 // Retry only if the type of the caught exception is
276                 // recoverable and the buffer position has changed.
277                 // We check buffer position additionally to prevent an
278                 // infinite loop.
279                 if (!(e instanceof RecoverableProtocolDecoderException) || (in.position() == oldPos)) {
280                     break;
281                 }
282             }
283         }
284     }
285 
286     /**
287      * {@inheritDoc}
288      */
289     @Override
290     public void messageSent(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception {
291         if (writeRequest instanceof EncodedWriteRequest) {
292             return;
293         }
294 
295         nextFilter.messageSent(session, writeRequest);
296     }
297 
298     /**
299      * {@inheritDoc}
300      */
301     @Override
302     public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception {
303         Object message = writeRequest.getMessage();
304 
305         // Bypass the encoding if the message is contained in a IoBuffer,
306         // as it has already been encoded before
307         if ((message instanceof IoBuffer) || (message instanceof FileRegion)) {
308             nextFilter.filterWrite(session, writeRequest);
309             return;
310         }
311 
312         // Get the encoder in the session
313         ProtocolEncoder encoder = factory.getEncoder(session);
314 
315         ProtocolEncoderOutput encoderOut = getEncoderOut(session, nextFilter, writeRequest);
316 
317         if (encoder == null) {
318             throw new ProtocolEncoderException("The encoder is null for the session " + session);
319         }
320 
321         try {
322             // Now we can try to encode the response
323             encoder.encode(session, message, encoderOut);
324 
325             // Send it directly
326             Queue<Object> bufferQueue = ((AbstractProtocolEncoderOutput) encoderOut).getMessageQueue();
327 
328             // Write all the encoded messages now
329             while (!bufferQueue.isEmpty()) {
330                 Object encodedMessage = bufferQueue.poll();
331 
332                 if (encodedMessage == null) {
333                     break;
334                 }
335 
336                 // Flush only when the buffer has remaining.
337                 if (!(encodedMessage instanceof IoBuffer="../../../../../org/apache/mina/core/buffer/IoBuffer.html#IoBuffer">IoBuffer) || ((IoBuffer) encodedMessage).hasRemaining()) {
338                     writeRequest.setMessage(encodedMessage);
339 
340                     nextFilter.filterWrite(session, writeRequest);
341                 }
342             }
343         } catch (Exception e) {
344             ProtocolEncoderException pee;
345 
346             // Generate the correct exception
347             if (e instanceof ProtocolEncoderException) {
348                 pee = (ProtocolEncoderException) e;
349             } else {
350                 pee = new ProtocolEncoderException(e);
351             }
352 
353             throw pee;
354         }
355     }
356 
357     /**
358      * {@inheritDoc}
359      */
360     @Override
361     public void sessionClosed(NextFilter nextFilter, IoSession session) throws Exception {
362         // Call finishDecode() first when a connection is closed.
363         ProtocolDecoder decoder = factory.getDecoder(session);
364         ProtocolDecoderOutput decoderOut = getDecoderOut(session, nextFilter);
365 
366         try {
367             decoder.finishDecode(session, decoderOut);
368         } catch (Exception e) {
369             ProtocolDecoderException pde;
370             if (e instanceof ProtocolDecoderException) {
371                 pde = (ProtocolDecoderException) e;
372             } else {
373                 pde = new ProtocolDecoderException(e);
374             }
375             throw pde;
376         } finally {
377             // Dispose everything
378             disposeCodec(session);
379             decoderOut.flush(nextFilter, session);
380         }
381 
382         // Call the next filter
383         nextFilter.sessionClosed(session);
384     }
385 
386     private static class EncodedWriteRequest extends DefaultWriteRequest {
387         public EncodedWriteRequest(Object encodedMessage, WriteFuture future, SocketAddress destination) {
388             super(encodedMessage, future, destination);
389         }
390 
391         /**
392          * {@inheritDoc}
393          */
394         @Override
395         public boolean isEncoded() {
396             return true;
397         }
398     }
399 
400     private static class ProtocolDecoderOutputImpl extends AbstractProtocolDecoderOutput {
401         public ProtocolDecoderOutputImpl() {
402             // Do nothing
403         }
404 
405         /**
406          * {@inheritDoc}
407          */
408         @Override
409         public void flush(NextFilter nextFilter, IoSession session) {
410             Queue<Object> messageQueue = getMessageQueue();
411 
412             while (!messageQueue.isEmpty()) {
413                 nextFilter.messageReceived(session, messageQueue.poll());
414             }
415         }
416     }
417 
418     private static class ProtocolEncoderOutputImpl extends AbstractProtocolEncoderOutput {
419         private final IoSession session;
420 
421         private final NextFilter nextFilter;
422 
423         /** The WriteRequest destination */
424         private final SocketAddress destination;
425 
426         public ProtocolEncoderOutputImpl(IoSession session, NextFilter nextFilter, WriteRequest writeRequest) {
427             this.session = session;
428             this.nextFilter = nextFilter;
429 
430             // Only store the destination, not the full WriteRequest.
431             destination = writeRequest.getDestination();
432         }
433 
434         /**
435          * {@inheritDoc}
436          */
437         @Override
438         public WriteFuture flush() {
439             Queue<Object> bufferQueue = getMessageQueue();
440             WriteFuture future = null;
441 
442             while (!bufferQueue.isEmpty()) {
443                 Object encodedMessage = bufferQueue.poll();
444 
445                 if (encodedMessage == null) {
446                     break;
447                 }
448 
449                 // Flush only when the buffer has remaining.
450                 if (!(encodedMessage instanceof IoBuffer="../../../../../org/apache/mina/core/buffer/IoBuffer.html#IoBuffer">IoBuffer) || ((IoBuffer) encodedMessage).hasRemaining()) {
451                     future = new DefaultWriteFuture(session);
452                     nextFilter.filterWrite(session, new EncodedWriteRequest(encodedMessage, future, destination));
453                 }
454             }
455 
456             if (future == null) {
457                 // Creates an empty writeRequest containing the destination
458                 future = DefaultWriteFuture.newNotWrittenFuture(session, new NothingWrittenException(AbstractIoSession.MESSAGE_SENT_REQUEST));
459             }
460 
461             return future;
462         }
463     }
464 
465     //----------- Helper methods ---------------------------------------------
466     /**
467      * Dispose the encoder, decoder, and the callback for the decoded
468      * messages.
469      */
470     private void disposeCodec(IoSession session) {
471         // We just remove the two instances of encoder/decoder to release resources
472         // from the session
473         disposeEncoder(session);
474         disposeDecoder(session);
475 
476         // We also remove the callback
477         disposeDecoderOut(session);
478     }
479 
480     /**
481      * Dispose the encoder, removing its instance from the
482      * session's attributes, and calling the associated
483      * dispose method.
484      */
485     private void disposeEncoder(IoSession session) {
486         ProtocolEncoder./../org/apache/mina/filter/codec/ProtocolEncoder.html#ProtocolEncoder">ProtocolEncoder encoder = (ProtocolEncoder) session.removeAttribute(ENCODER);
487         if (encoder == null) {
488             return;
489         }
490 
491         try {
492             encoder.dispose(session);
493         } catch (Exception e) {
494             LOGGER.warn("Failed to dispose: " + encoder.getClass().getName() + " (" + encoder + ')');
495         }
496     }
497 
498     /**
499      * Dispose the decoder, removing its instance from the
500      * session's attributes, and calling the associated
501      * dispose method.
502      */
503     private void disposeDecoder(IoSession session) {
504         ProtocolDecoder./../org/apache/mina/filter/codec/ProtocolDecoder.html#ProtocolDecoder">ProtocolDecoder decoder = (ProtocolDecoder) session.removeAttribute(DECODER);
505         if (decoder == null) {
506             return;
507         }
508 
509         try {
510             decoder.dispose(session);
511         } catch (Exception e) {
512             LOGGER.warn("Failed to dispose: " + decoder.getClass().getName() + " (" + decoder + ')');
513         }
514     }
515 
516     /**
517      * Return a reference to the decoder callback. If it's not already created
518      * and stored into the session, we create a new instance.
519      */
520     private ProtocolDecoderOutput getDecoderOut(IoSession session, NextFilter nextFilter) {
521         ProtocolDecoderOutput../org/apache/mina/filter/codec/ProtocolDecoderOutput.html#ProtocolDecoderOutput">ProtocolDecoderOutput out = (ProtocolDecoderOutput) session.getAttribute(DECODER_OUT);
522 
523         if (out == null) {
524             // Create a new instance, and stores it into the session
525             out = new ProtocolDecoderOutputImpl();
526             session.setAttribute(DECODER_OUT, out);
527         }
528 
529         return out;
530     }
531 
532     private ProtocolEncoderOutput getEncoderOut(IoSession session, NextFilter nextFilter, WriteRequest writeRequest) {
533         ProtocolEncoderOutput../org/apache/mina/filter/codec/ProtocolEncoderOutput.html#ProtocolEncoderOutput">ProtocolEncoderOutput out = (ProtocolEncoderOutput) session.getAttribute(ENCODER_OUT);
534 
535         if (out == null) {
536             // Create a new instance, and stores it into the session
537             out = new ProtocolEncoderOutputImpl(session, nextFilter, writeRequest);
538             session.setAttribute(ENCODER_OUT, out);
539         }
540 
541         return out;
542     }
543 
544     /**
545      * Remove the decoder callback from the session's attributes.
546      */
547     private void disposeDecoderOut(IoSession session) {
548         session.removeAttribute(DECODER_OUT);
549     }
550 }