View Javadoc
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.filter.codec;
21  
22  import java.net.SocketAddress;
23  import java.util.Queue;
24  
25  import org.apache.mina.core.buffer.IoBuffer;
26  import org.apache.mina.core.file.FileRegion;
27  import org.apache.mina.core.filterchain.IoFilter;
28  import org.apache.mina.core.filterchain.IoFilterAdapter;
29  import org.apache.mina.core.filterchain.IoFilterChain;
30  import org.apache.mina.core.future.DefaultWriteFuture;
31  import org.apache.mina.core.future.WriteFuture;
32  import org.apache.mina.core.session.AbstractIoSession;
33  import org.apache.mina.core.session.AttributeKey;
34  import org.apache.mina.core.session.IoSession;
35  import org.apache.mina.core.write.DefaultWriteRequest;
36  import org.apache.mina.core.write.NothingWrittenException;
37  import org.apache.mina.core.write.WriteRequest;
38  import org.apache.mina.core.write.WriteRequestWrapper;
39  import org.slf4j.Logger;
40  import org.slf4j.LoggerFactory;
41  
42  /**
43   * An {@link IoFilter} which translates binary or protocol specific data into
44   * message objects and vice versa using {@link ProtocolCodecFactory},
45   * {@link ProtocolEncoder}, or {@link ProtocolDecoder}.
46   *
47   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
48   * @org.apache.xbean.XBean
49   */
50  public class ProtocolCodecFilter extends IoFilterAdapter {
51      /** A logger for this class */
52      private static final Logger LOGGER = LoggerFactory.getLogger(ProtocolCodecFilter.class);
53  
54      private static final Class<?>[] EMPTY_PARAMS = new Class[0];
55  
56      private static final IoBuffer EMPTY_BUFFER = IoBuffer.wrap(new byte[0]);
57  
58      private static final AttributeKey ENCODER = new AttributeKey(ProtocolCodecFilter.class, "encoder");
59  
60      private static final AttributeKey DECODER = new AttributeKey(ProtocolCodecFilter.class, "decoder");
61  
62      private static final AttributeKey DECODER_OUT = new AttributeKey(ProtocolCodecFilter.class, "decoderOut");
63  
64      private static final AttributeKey ENCODER_OUT = new AttributeKey(ProtocolCodecFilter.class, "encoderOut");
65  
66      /** The factory responsible for creating the encoder and decoder */
67      private final ProtocolCodecFactory factory;
68  
69      /**
70       * Creates a new instance of ProtocolCodecFilter, associating a factory
71       * for the creation of the encoder and decoder.
72       *
73       * @param factory The associated factory
74       */
75      public ProtocolCodecFilter(ProtocolCodecFactory factory) {
76          if (factory == null) {
77              throw new IllegalArgumentException("factory");
78          }
79  
80          this.factory = factory;
81      }
82  
83      /**
84       * Creates a new instance of ProtocolCodecFilter, without any factory.
85       * The encoder/decoder factory will be created as an inner class, using
86       * the two parameters (encoder and decoder).
87       * 
88       * @param encoder The class responsible for encoding the message
89       * @param decoder The class responsible for decoding the message
90       */
91      public ProtocolCodecFilter(final ProtocolEncoder encoder, final ProtocolDecoder decoder) {
92          if (encoder == null) {
93              throw new IllegalArgumentException("encoder");
94          }
95          if (decoder == null) {
96              throw new IllegalArgumentException("decoder");
97          }
98  
99          // Create the inner Factory based on the two parameters
100         this.factory = new ProtocolCodecFactory() {
101             public ProtocolEncoder getEncoder(IoSession session) {
102                 return encoder;
103             }
104 
105             public ProtocolDecoder getDecoder(IoSession session) {
106                 return decoder;
107             }
108         };
109     }
110 
111     /**
112      * Creates a new instance of ProtocolCodecFilter, without any factory.
113      * The encoder/decoder factory will be created as an inner class, using
114      * the two parameters (encoder and decoder), which are class names. Instances
115      * for those classes will be created in this constructor.
116      * 
117      * @param encoderClass The class responsible for encoding the message
118      * @param decoderClass The class responsible for decoding the message
119      */
120     public ProtocolCodecFilter(final Class<? extends ProtocolEncoder> encoderClass,
121             final Class<? extends ProtocolDecoder> decoderClass) {
122         if (encoderClass == null) {
123             throw new IllegalArgumentException("encoderClass");
124         }
125         if (decoderClass == null) {
126             throw new IllegalArgumentException("decoderClass");
127         }
128         if (!ProtocolEncoder.class.isAssignableFrom(encoderClass)) {
129             throw new IllegalArgumentException("encoderClass: " + encoderClass.getName());
130         }
131         if (!ProtocolDecoder.class.isAssignableFrom(decoderClass)) {
132             throw new IllegalArgumentException("decoderClass: " + decoderClass.getName());
133         }
134         try {
135             encoderClass.getConstructor(EMPTY_PARAMS);
136         } catch (NoSuchMethodException e) {
137             throw new IllegalArgumentException("encoderClass doesn't have a public default constructor.");
138         }
139         try {
140             decoderClass.getConstructor(EMPTY_PARAMS);
141         } catch (NoSuchMethodException e) {
142             throw new IllegalArgumentException("decoderClass doesn't have a public default constructor.");
143         }
144 
145         final ProtocolEncoder encoder;
146 
147         try {
148             encoder = encoderClass.newInstance();
149         } catch (Exception e) {
150             throw new IllegalArgumentException("encoderClass cannot be initialized");
151         }
152 
153         final ProtocolDecoder decoder;
154 
155         try {
156             decoder = decoderClass.newInstance();
157         } catch (Exception e) {
158             throw new IllegalArgumentException("decoderClass cannot be initialized");
159         }
160 
161         // Create the inner factory based on the two parameters.
162         this.factory = new ProtocolCodecFactory() {
163             public ProtocolEncoder getEncoder(IoSession session) throws Exception {
164                 return encoder;
165             }
166 
167             public ProtocolDecoder getDecoder(IoSession session) throws Exception {
168                 return decoder;
169             }
170         };
171     }
172 
173     /**
174      * Get the encoder instance from a given session.
175      *
176      * @param session The associated session we will get the encoder from
177      * @return The encoder instance, if any
178      */
179     public ProtocolEncoder getEncoder(IoSession session) {
180         return (ProtocolEncoder) session.getAttribute(ENCODER);
181     }
182 
183     @Override
184     public void onPreAdd(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception {
185         if (parent.contains(this)) {
186             throw new IllegalArgumentException(
187                     "You can't add the same filter instance more than once.  Create another instance and add it.");
188         }
189     }
190 
191     @Override
192     public void onPostRemove(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception {
193         // Clean everything
194         disposeCodec(parent.getSession());
195     }
196 
197     /**
198      * Process the incoming message, calling the session decoder. As the incoming
199      * buffer might contains more than one messages, we have to loop until the decoder
200      * throws an exception.
201      * 
202      *  while ( buffer not empty )
203      *    try
204      *      decode ( buffer )
205      *    catch
206      *      break;
207      * 
208      */
209     @Override
210     public void messageReceived(NextFilter nextFilter, IoSession session, Object message) throws Exception {
211         LOGGER.debug("Processing a MESSAGE_RECEIVED for session {}", session.getId());
212 
213         if (!(message instanceof IoBuffer)) {
214             nextFilter.messageReceived(session, message);
215             return;
216         }
217 
218         IoBuffer in = (IoBuffer) message;
219         ProtocolDecoder decoder = factory.getDecoder(session);
220         ProtocolDecoderOutput decoderOut = getDecoderOut(session, nextFilter);
221 
222         // Loop until we don't have anymore byte in the buffer,
223         // or until the decoder throws an unrecoverable exception or
224         // can't decoder a message, because there are not enough
225         // data in the buffer
226         while (in.hasRemaining()) {
227             int oldPos = in.position();
228             try {
229                 synchronized (session) {
230                     // Call the decoder with the read bytes
231                     decoder.decode(session, in, decoderOut);
232                 }
233                 // Finish decoding if no exception was thrown.
234                 decoderOut.flush(nextFilter, session);
235             } catch (Exception e) {
236                 ProtocolDecoderException pde;
237                 if (e instanceof ProtocolDecoderException) {
238                     pde = (ProtocolDecoderException) e;
239                 } else {
240                     pde = new ProtocolDecoderException(e);
241                 }
242                 if (pde.getHexdump() == null) {
243                     // Generate a message hex dump
244                     int curPos = in.position();
245                     in.position(oldPos);
246                     pde.setHexdump(in.getHexDump());
247                     in.position(curPos);
248                 }
249                 // Fire the exceptionCaught event.
250                 decoderOut.flush(nextFilter, session);
251                 nextFilter.exceptionCaught(session, pde);
252                 // Retry only if the type of the caught exception is
253                 // recoverable and the buffer position has changed.
254                 // We check buffer position additionally to prevent an
255                 // infinite loop.
256                 if (!(e instanceof RecoverableProtocolDecoderException) || (in.position() == oldPos)) {
257                     break;
258                 }
259             }
260         }
261     }
262 
263     @Override
264     public void messageSent(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception {
265         if (writeRequest instanceof EncodedWriteRequest) {
266             return;
267         }
268 
269         if (writeRequest instanceof MessageWriteRequest) {
270             MessageWriteRequest wrappedRequest = (MessageWriteRequest) writeRequest;
271             nextFilter.messageSent(session, wrappedRequest.getParentRequest());
272         } else {
273             nextFilter.messageSent(session, writeRequest);
274         }
275     }
276 
277     @Override
278     public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception {
279         Object message = writeRequest.getMessage();
280 
281         // Bypass the encoding if the message is contained in a IoBuffer,
282         // as it has already been encoded before
283         if ((message instanceof IoBuffer) || (message instanceof FileRegion)) {
284             nextFilter.filterWrite(session, writeRequest);
285             return;
286         }
287 
288         // Get the encoder in the session
289         ProtocolEncoder encoder = factory.getEncoder(session);
290 
291         ProtocolEncoderOutput encoderOut = getEncoderOut(session, nextFilter, writeRequest);
292 
293         if (encoder == null) {
294             throw new ProtocolEncoderException("The encoder is null for the session " + session);
295         }
296 
297         try {
298             // Now we can try to encode the response
299             encoder.encode(session, message, encoderOut);
300 
301             // Send it directly
302             Queue<Object> bufferQueue = ((AbstractProtocolEncoderOutput) encoderOut).getMessageQueue();
303 
304             // Write all the encoded messages now
305             while (!bufferQueue.isEmpty()) {
306                 Object encodedMessage = bufferQueue.poll();
307 
308                 if (encodedMessage == null) {
309                     break;
310                 }
311 
312                 // Flush only when the buffer has remaining.
313                 if (!(encodedMessage instanceof IoBuffer) || ((IoBuffer) encodedMessage).hasRemaining()) {
314                     SocketAddress destination = writeRequest.getDestination();
315                     WriteRequest encodedWriteRequest = new EncodedWriteRequest(encodedMessage, null, destination);
316 
317                     nextFilter.filterWrite(session, encodedWriteRequest);
318                 }
319             }
320 
321             // Call the next filter
322             nextFilter.filterWrite(session, new MessageWriteRequest(writeRequest));
323         } catch (Exception e) {
324             ProtocolEncoderException pee;
325 
326             // Generate the correct exception
327             if (e instanceof ProtocolEncoderException) {
328                 pee = (ProtocolEncoderException) e;
329             } else {
330                 pee = new ProtocolEncoderException(e);
331             }
332 
333             throw pee;
334         }
335     }
336 
337     @Override
338     public void sessionClosed(NextFilter nextFilter, IoSession session) throws Exception {
339         // Call finishDecode() first when a connection is closed.
340         ProtocolDecoder decoder = factory.getDecoder(session);
341         ProtocolDecoderOutput decoderOut = getDecoderOut(session, nextFilter);
342 
343         try {
344             decoder.finishDecode(session, decoderOut);
345         } catch (Exception e) {
346             ProtocolDecoderException pde;
347             if (e instanceof ProtocolDecoderException) {
348                 pde = (ProtocolDecoderException) e;
349             } else {
350                 pde = new ProtocolDecoderException(e);
351             }
352             throw pde;
353         } finally {
354             // Dispose everything
355             disposeCodec(session);
356             decoderOut.flush(nextFilter, session);
357         }
358 
359         // Call the next filter
360         nextFilter.sessionClosed(session);
361     }
362 
363     private static class EncodedWriteRequest extends DefaultWriteRequest {
364         public EncodedWriteRequest(Object encodedMessage, WriteFuture future, SocketAddress destination) {
365             super(encodedMessage, future, destination);
366         }
367 
368         public boolean isEncoded() {
369             return true;
370         }
371     }
372 
373     private static class MessageWriteRequest extends WriteRequestWrapper {
374         public MessageWriteRequest(WriteRequest writeRequest) {
375             super(writeRequest);
376         }
377 
378         @Override
379         public Object getMessage() {
380             return EMPTY_BUFFER;
381         }
382 
383         @Override
384         public String toString() {
385             return "MessageWriteRequest, parent : " + super.toString();
386         }
387     }
388 
389     private static class ProtocolDecoderOutputImpl extends AbstractProtocolDecoderOutput {
390         public ProtocolDecoderOutputImpl() {
391             // Do nothing
392         }
393 
394         public void flush(NextFilter nextFilter, IoSession session) {
395             Queue<Object> messageQueue = getMessageQueue();
396 
397             while (!messageQueue.isEmpty()) {
398                 nextFilter.messageReceived(session, messageQueue.poll());
399             }
400         }
401     }
402 
403     private static class ProtocolEncoderOutputImpl extends AbstractProtocolEncoderOutput {
404         private final IoSession session;
405 
406         private final NextFilter nextFilter;
407 
408         /** The WriteRequest destination */
409         private final SocketAddress destination;
410 
411         public ProtocolEncoderOutputImpl(IoSession session, NextFilter nextFilter, WriteRequest writeRequest) {
412             this.session = session;
413             this.nextFilter = nextFilter;
414 
415             // Only store the destination, not the full WriteRequest.
416             destination = writeRequest.getDestination();
417         }
418 
419         public WriteFuture flush() {
420             Queue<Object> bufferQueue = getMessageQueue();
421             WriteFuture future = null;
422 
423             while (!bufferQueue.isEmpty()) {
424                 Object encodedMessage = bufferQueue.poll();
425 
426                 if (encodedMessage == null) {
427                     break;
428                 }
429 
430                 // Flush only when the buffer has remaining.
431                 if (!(encodedMessage instanceof IoBuffer) || ((IoBuffer) encodedMessage).hasRemaining()) {
432                     future = new DefaultWriteFuture(session);
433                     nextFilter.filterWrite(session, new EncodedWriteRequest(encodedMessage, future, destination));
434                 }
435             }
436 
437             if (future == null) {
438                 // Creates an empty writeRequest containing the destination
439                 future = DefaultWriteFuture.newNotWrittenFuture(session, new NothingWrittenException(AbstractIoSession.MESSAGE_SENT_REQUEST));
440             }
441 
442             return future;
443         }
444     }
445 
446     //----------- Helper methods ---------------------------------------------
447     /**
448      * Dispose the encoder, decoder, and the callback for the decoded
449      * messages.
450      */
451     private void disposeCodec(IoSession session) {
452         // We just remove the two instances of encoder/decoder to release resources
453         // from the session
454         disposeEncoder(session);
455         disposeDecoder(session);
456 
457         // We also remove the callback
458         disposeDecoderOut(session);
459     }
460 
461     /**
462      * Dispose the encoder, removing its instance from the
463      * session's attributes, and calling the associated
464      * dispose method.
465      */
466     private void disposeEncoder(IoSession session) {
467         ProtocolEncoder encoder = (ProtocolEncoder) session.removeAttribute(ENCODER);
468         if (encoder == null) {
469             return;
470         }
471 
472         try {
473             encoder.dispose(session);
474         } catch (Exception e) {
475             LOGGER.warn("Failed to dispose: " + encoder.getClass().getName() + " (" + encoder + ')');
476         }
477     }
478 
479     /**
480      * Dispose the decoder, removing its instance from the
481      * session's attributes, and calling the associated
482      * dispose method.
483      */
484     private void disposeDecoder(IoSession session) {
485         ProtocolDecoder decoder = (ProtocolDecoder) session.removeAttribute(DECODER);
486         if (decoder == null) {
487             return;
488         }
489 
490         try {
491             decoder.dispose(session);
492         } catch (Exception e) {
493             LOGGER.warn("Failed to dispose: " + decoder.getClass().getName() + " (" + decoder + ')');
494         }
495     }
496 
497     /**
498      * Return a reference to the decoder callback. If it's not already created
499      * and stored into the session, we create a new instance.
500      */
501     private ProtocolDecoderOutput getDecoderOut(IoSession session, NextFilter nextFilter) {
502         ProtocolDecoderOutput out = (ProtocolDecoderOutput) session.getAttribute(DECODER_OUT);
503 
504         if (out == null) {
505             // Create a new instance, and stores it into the session
506             out = new ProtocolDecoderOutputImpl();
507             session.setAttribute(DECODER_OUT, out);
508         }
509 
510         return out;
511     }
512 
513     private ProtocolEncoderOutput getEncoderOut(IoSession session, NextFilter nextFilter, WriteRequest writeRequest) {
514         ProtocolEncoderOutput out = (ProtocolEncoderOutput) session.getAttribute(ENCODER_OUT);
515 
516         if (out == null) {
517             // Create a new instance, and stores it into the session
518             out = new ProtocolEncoderOutputImpl(session, nextFilter, writeRequest);
519             session.setAttribute(ENCODER_OUT, out);
520         }
521 
522         return out;
523     }
524 
525     /**
526      * Remove the decoder callback from the session's attributes.
527      */
528     private void disposeDecoderOut(IoSession session) {
529         session.removeAttribute(DECODER_OUT);
530     }
531 }