View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.filter.codec;
21  
22  import java.net.SocketAddress;
23  import java.util.Queue;
24  
25  import org.apache.mina.core.buffer.IoBuffer;
26  import org.apache.mina.core.file.FileRegion;
27  import org.apache.mina.core.filterchain.IoFilter;
28  import org.apache.mina.core.filterchain.IoFilterAdapter;
29  import org.apache.mina.core.filterchain.IoFilterChain;
30  import org.apache.mina.core.future.DefaultWriteFuture;
31  import org.apache.mina.core.future.WriteFuture;
32  import org.apache.mina.core.session.AttributeKey;
33  import org.apache.mina.core.session.IoSession;
34  import org.apache.mina.core.write.DefaultWriteRequest;
35  import org.apache.mina.core.write.NothingWrittenException;
36  import org.apache.mina.core.write.WriteRequest;
37  import org.apache.mina.core.write.WriteRequestWrapper;
38  import org.slf4j.Logger;
39  import org.slf4j.LoggerFactory;
40  
41  /**
42   * An {@link IoFilter} which translates binary or protocol specific data into
43   * message objects and vice versa using {@link ProtocolCodecFactory},
44   * {@link ProtocolEncoder}, or {@link ProtocolDecoder}.
45   *
46   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
47   * @org.apache.xbean.XBean
48   */
49  public class ProtocolCodecFilter extends IoFilterAdapter {
50      /** A logger for this class */
51      private static final Logger LOGGER = LoggerFactory.getLogger(ProtocolCodecFilter.class);
52  
53      private static final Class<?>[] EMPTY_PARAMS = new Class[0];
54  
55      private static final IoBuffer EMPTY_BUFFER = IoBuffer.wrap(new byte[0]);
56  
57      private final AttributeKey ENCODER = new AttributeKey(ProtocolCodecFilter.class, "encoder");
58  
59      private final AttributeKey DECODER = new AttributeKey(ProtocolCodecFilter.class, "decoder");
60  
61      private final AttributeKey DECODER_OUT = new AttributeKey(ProtocolCodecFilter.class, "decoderOut");
62  
63      private final AttributeKey ENCODER_OUT = new AttributeKey(ProtocolCodecFilter.class, "encoderOut");
64  
65      /** The factory responsible for creating the encoder and decoder */
66      private final ProtocolCodecFactory factory;
67  
68      /**
69       * 
70       * Creates a new instance of ProtocolCodecFilter, associating a factory
71       * for the creation of the encoder and decoder.
72       *
73       * @param factory The associated factory
74       */
75      public ProtocolCodecFilter(ProtocolCodecFactory factory) {
76          if (factory == null) {
77              throw new IllegalArgumentException("factory");
78          }
79  
80          this.factory = factory;
81      }
82  
83      /**
84       * Creates a new instance of ProtocolCodecFilter, without any factory.
85       * The encoder/decoder factory will be created as an inner class, using
86       * the two parameters (encoder and decoder).
87       * 
88       * @param encoder The class responsible for encoding the message
89       * @param decoder The class responsible for decoding the message
90       */
91      public ProtocolCodecFilter(final ProtocolEncoder encoder, final ProtocolDecoder decoder) {
92          if (encoder == null) {
93              throw new IllegalArgumentException("encoder");
94          }
95          if (decoder == null) {
96              throw new IllegalArgumentException("decoder");
97          }
98  
99          // Create the inner Factory based on the two parameters
100         this.factory = new ProtocolCodecFactory() {
101             public ProtocolEncoder getEncoder(IoSession session) {
102                 return encoder;
103             }
104 
105             public ProtocolDecoder getDecoder(IoSession session) {
106                 return decoder;
107             }
108         };
109     }
110 
111     /**
112      * Creates a new instance of ProtocolCodecFilter, without any factory.
113      * The encoder/decoder factory will be created as an inner class, using
114      * the two parameters (encoder and decoder), which are class names. Instances
115      * for those classes will be created in this constructor.
116      * 
117      * @param encoder The class responsible for encoding the message
118      * @param decoder The class responsible for decoding the message
119      */
120     public ProtocolCodecFilter(final Class<? extends ProtocolEncoder> encoderClass,
121             final Class<? extends ProtocolDecoder> decoderClass) {
122         if (encoderClass == null) {
123             throw new IllegalArgumentException("encoderClass");
124         }
125         if (decoderClass == null) {
126             throw new IllegalArgumentException("decoderClass");
127         }
128         if (!ProtocolEncoder.class.isAssignableFrom(encoderClass)) {
129             throw new IllegalArgumentException("encoderClass: " + encoderClass.getName());
130         }
131         if (!ProtocolDecoder.class.isAssignableFrom(decoderClass)) {
132             throw new IllegalArgumentException("decoderClass: " + decoderClass.getName());
133         }
134         try {
135             encoderClass.getConstructor(EMPTY_PARAMS);
136         } catch (NoSuchMethodException e) {
137             throw new IllegalArgumentException("encoderClass doesn't have a public default constructor.");
138         }
139         try {
140             decoderClass.getConstructor(EMPTY_PARAMS);
141         } catch (NoSuchMethodException e) {
142             throw new IllegalArgumentException("decoderClass doesn't have a public default constructor.");
143         }
144 
145         final ProtocolEncoder encoder;
146 
147         try {
148             encoder = encoderClass.newInstance();
149         } catch (Exception e) {
150             throw new IllegalArgumentException("encoderClass cannot be initialized");
151         }
152 
153         final ProtocolDecoder decoder;
154 
155         try {
156             decoder = decoderClass.newInstance();
157         } catch (Exception e) {
158             throw new IllegalArgumentException("decoderClass cannot be initialized");
159         }
160 
161         // Create the inner factory based on the two parameters.
162         this.factory = new ProtocolCodecFactory() {
163             public ProtocolEncoder getEncoder(IoSession session) throws Exception {
164                 return encoder;
165             }
166 
167             public ProtocolDecoder getDecoder(IoSession session) throws Exception {
168                 return decoder;
169             }
170         };
171     }
172 
173     /**
174      * Get the encoder instance from a given session.
175      *
176      * @param session The associated session we will get the encoder from
177      * @return The encoder instance, if any
178      */
179     public ProtocolEncoder getEncoder(IoSession session) {
180         return (ProtocolEncoder) session.getAttribute(ENCODER);
181     }
182 
183     @Override
184     public void onPreAdd(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception {
185         if (parent.contains(this)) {
186             throw new IllegalArgumentException(
187                     "You can't add the same filter instance more than once.  Create another instance and add it.");
188         }
189     }
190 
191     @Override
192     public void onPostRemove(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception {
193         // Clean everything
194         disposeCodec(parent.getSession());
195     }
196 
197     /**
198      * Process the incoming message, calling the session decoder. As the incoming
199      * buffer might contains more than one messages, we have to loop until the decoder
200      * throws an exception.
201      * 
202      *  while ( buffer not empty )
203      *    try
204      *      decode ( buffer )
205      *    catch
206      *      break;
207      * 
208      */
209     @Override
210     public void messageReceived(NextFilter nextFilter, IoSession session, Object message) throws Exception {
211         LOGGER.debug("Processing a MESSAGE_RECEIVED for session {}", session.getId());
212 
213         if (!(message instanceof IoBuffer)) {
214             nextFilter.messageReceived(session, message);
215             return;
216         }
217 
218         IoBuffer in = (IoBuffer) message;
219         ProtocolDecoder decoder = factory.getDecoder(session);
220         ProtocolDecoderOutput decoderOut = getDecoderOut(session, nextFilter);
221 
222         // Loop until we don't have anymore byte in the buffer,
223         // or until the decoder throws an unrecoverable exception or
224         // can't decoder a message, because there are not enough
225         // data in the buffer
226         while (in.hasRemaining()) {
227             int oldPos = in.position();
228 
229             try {
230                 synchronized (decoderOut) {
231                     // Call the decoder with the read bytes
232                     decoder.decode(session, in, decoderOut);
233                 }
234 
235                 // Finish decoding if no exception was thrown.
236                 decoderOut.flush(nextFilter, session);
237             } catch (Throwable t) {
238                 ProtocolDecoderException pde;
239                 if (t instanceof ProtocolDecoderException) {
240                     pde = (ProtocolDecoderException) t;
241                 } else {
242                     pde = new ProtocolDecoderException(t);
243                 }
244 
245                 if (pde.getHexdump() == null) {
246                     // Generate a message hex dump
247                     int curPos = in.position();
248                     in.position(oldPos);
249                     pde.setHexdump(in.getHexDump());
250                     in.position(curPos);
251                 }
252 
253                 // Fire the exceptionCaught event.
254                 decoderOut.flush(nextFilter, session);
255                 nextFilter.exceptionCaught(session, pde);
256 
257                 // Retry only if the type of the caught exception is
258                 // recoverable and the buffer position has changed.
259                 // We check buffer position additionally to prevent an
260                 // infinite loop.
261                 if (!(t instanceof RecoverableProtocolDecoderException) || (in.position() == oldPos)) {
262                     break;
263                 }
264             }
265         }
266     }
267 
268     @Override
269     public void messageSent(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception {
270         if (writeRequest instanceof EncodedWriteRequest) {
271             return;
272         }
273 
274         if (writeRequest instanceof MessageWriteRequest) {
275             MessageWriteRequest wrappedRequest = (MessageWriteRequest) writeRequest;
276             nextFilter.messageSent(session, wrappedRequest.getParentRequest());
277         } else {
278             nextFilter.messageSent(session, writeRequest);
279         }
280     }
281 
282     @Override
283     public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception {
284         Object message = writeRequest.getMessage();
285 
286         // Bypass the encoding if the message is contained in a IoBuffer,
287         // as it has already been encoded before
288         if ((message instanceof IoBuffer) || (message instanceof FileRegion)) {
289             nextFilter.filterWrite(session, writeRequest);
290             return;
291         }
292 
293         // Get the encoder in the session
294         ProtocolEncoder encoder = factory.getEncoder(session);
295 
296         ProtocolEncoderOutput encoderOut = getEncoderOut(session, nextFilter, writeRequest);
297 
298         if (encoder == null) {
299             throw new ProtocolEncoderException("The encoder is null for the session " + session);
300         }
301 
302         if (encoderOut == null) {
303             throw new ProtocolEncoderException("The encoderOut is null for the session " + session);
304         }
305 
306         try {
307             // Now we can try to encode the response
308             encoder.encode(session, message, encoderOut);
309 
310             // Send it directly
311             Queue<Object> bufferQueue = ((AbstractProtocolEncoderOutput) encoderOut).getMessageQueue();
312 
313             // Write all the encoded messages now
314             while (!bufferQueue.isEmpty()) {
315                 Object encodedMessage = bufferQueue.poll();
316 
317                 if (encodedMessage == null) {
318                     break;
319                 }
320 
321                 // Flush only when the buffer has remaining.
322                 if (!(encodedMessage instanceof IoBuffer) || ((IoBuffer) encodedMessage).hasRemaining()) {
323                     SocketAddress destination = writeRequest.getDestination();
324                     WriteRequest encodedWriteRequest = new EncodedWriteRequest(encodedMessage, null, destination);
325 
326                     nextFilter.filterWrite(session, encodedWriteRequest);
327                 }
328             }
329 
330             // Call the next filter
331             nextFilter.filterWrite(session, new MessageWriteRequest(writeRequest));
332         } catch (Throwable t) {
333             ProtocolEncoderException pee;
334 
335             // Generate the correct exception
336             if (t instanceof ProtocolEncoderException) {
337                 pee = (ProtocolEncoderException) t;
338             } else {
339                 pee = new ProtocolEncoderException(t);
340             }
341 
342             throw pee;
343         }
344     }
345 
346     @Override
347     public void sessionClosed(NextFilter nextFilter, IoSession session) throws Exception {
348         // Call finishDecode() first when a connection is closed.
349         ProtocolDecoder decoder = factory.getDecoder(session);
350         ProtocolDecoderOutput decoderOut = getDecoderOut(session, nextFilter);
351 
352         try {
353             decoder.finishDecode(session, decoderOut);
354         } catch (Throwable t) {
355             ProtocolDecoderException pde;
356             if (t instanceof ProtocolDecoderException) {
357                 pde = (ProtocolDecoderException) t;
358             } else {
359                 pde = new ProtocolDecoderException(t);
360             }
361             throw pde;
362         } finally {
363             // Dispose everything
364             disposeCodec(session);
365             decoderOut.flush(nextFilter, session);
366         }
367 
368         // Call the next filter
369         nextFilter.sessionClosed(session);
370     }
371 
372     private static class EncodedWriteRequest extends DefaultWriteRequest {
373         public EncodedWriteRequest(Object encodedMessage, WriteFuture future, SocketAddress destination) {
374             super(encodedMessage, future, destination);
375         }
376 
377         public boolean isEncoded() {
378             return true;
379         }
380     }
381 
382     private static class MessageWriteRequest extends WriteRequestWrapper {
383         public MessageWriteRequest(WriteRequest writeRequest) {
384             super(writeRequest);
385         }
386 
387         @Override
388         public Object getMessage() {
389             return EMPTY_BUFFER;
390         }
391 
392         @Override
393         public String toString() {
394             return "MessageWriteRequest, parent : " + super.toString();
395         }
396     }
397 
398     private static class ProtocolDecoderOutputImpl extends AbstractProtocolDecoderOutput {
399         public ProtocolDecoderOutputImpl() {
400             // Do nothing
401         }
402 
403         public void flush(NextFilter nextFilter, IoSession session) {
404             Queue<Object> messageQueue = getMessageQueue();
405 
406             while (!messageQueue.isEmpty()) {
407                 nextFilter.messageReceived(session, messageQueue.poll());
408             }
409         }
410     }
411 
412     private static class ProtocolEncoderOutputImpl extends AbstractProtocolEncoderOutput {
413         private final IoSession session;
414 
415         private final NextFilter nextFilter;
416 
417         /** The WriteRequest destination */
418         private final SocketAddress destination;
419 
420         public ProtocolEncoderOutputImpl(IoSession session, NextFilter nextFilter, WriteRequest writeRequest) {
421             this.session = session;
422             this.nextFilter = nextFilter;
423 
424             // Only store the destination, not the full WriteRequest.
425             destination = writeRequest.getDestination();
426         }
427 
428         public WriteFuture flush() {
429             Queue<Object> bufferQueue = getMessageQueue();
430             WriteFuture future = null;
431 
432             while (!bufferQueue.isEmpty()) {
433                 Object encodedMessage = bufferQueue.poll();
434 
435                 if (encodedMessage == null) {
436                     break;
437                 }
438 
439                 // Flush only when the buffer has remaining.
440                 if (!(encodedMessage instanceof IoBuffer) || ((IoBuffer) encodedMessage).hasRemaining()) {
441                     future = new DefaultWriteFuture(session);
442                     nextFilter.filterWrite(session, new EncodedWriteRequest(encodedMessage, future, destination));
443                 }
444             }
445 
446             if (future == null) {
447                 // Creates an empty writeRequest containing the destination
448                 WriteRequest writeRequest = new DefaultWriteRequest(null, null, destination);
449                 future = DefaultWriteFuture.newNotWrittenFuture(session, new NothingWrittenException(writeRequest));
450             }
451 
452             return future;
453         }
454     }
455 
456     //----------- Helper methods ---------------------------------------------
457     /**
458      * Dispose the encoder, decoder, and the callback for the decoded
459      * messages.
460      */
461     private void disposeCodec(IoSession session) {
462         // We just remove the two instances of encoder/decoder to release resources
463         // from the session
464         disposeEncoder(session);
465         disposeDecoder(session);
466 
467         // We also remove the callback
468         disposeDecoderOut(session);
469     }
470 
471     /**
472      * Dispose the encoder, removing its instance from the
473      * session's attributes, and calling the associated
474      * dispose method.
475      */
476     private void disposeEncoder(IoSession session) {
477         ProtocolEncoder encoder = (ProtocolEncoder) session.removeAttribute(ENCODER);
478         if (encoder == null) {
479             return;
480         }
481 
482         try {
483             encoder.dispose(session);
484         } catch (Throwable t) {
485             LOGGER.warn("Failed to dispose: " + encoder.getClass().getName() + " (" + encoder + ')');
486         }
487     }
488 
489     /**
490      * Dispose the decoder, removing its instance from the
491      * session's attributes, and calling the associated
492      * dispose method.
493      */
494     private void disposeDecoder(IoSession session) {
495         ProtocolDecoder decoder = (ProtocolDecoder) session.removeAttribute(DECODER);
496         if (decoder == null) {
497             return;
498         }
499 
500         try {
501             decoder.dispose(session);
502         } catch (Throwable t) {
503             LOGGER.warn("Failed to dispose: " + decoder.getClass().getName() + " (" + decoder + ')');
504         }
505     }
506 
507     /**
508      * Return a reference to the decoder callback. If it's not already created
509      * and stored into the session, we create a new instance.
510      */
511     private ProtocolDecoderOutput getDecoderOut(IoSession session, NextFilter nextFilter) {
512         ProtocolDecoderOutput out = (ProtocolDecoderOutput) session.getAttribute(DECODER_OUT);
513 
514         if (out == null) {
515             // Create a new instance, and stores it into the session
516             out = new ProtocolDecoderOutputImpl();
517             session.setAttribute(DECODER_OUT, out);
518         }
519 
520         return out;
521     }
522 
523     private ProtocolEncoderOutput getEncoderOut(IoSession session, NextFilter nextFilter, WriteRequest writeRequest) {
524         ProtocolEncoderOutput out = (ProtocolEncoderOutput) session.getAttribute(ENCODER_OUT);
525 
526         if (out == null) {
527             // Create a new instance, and stores it into the session
528             out = new ProtocolEncoderOutputImpl(session, nextFilter, writeRequest);
529             session.setAttribute(ENCODER_OUT, out);
530         }
531 
532         return out;
533     }
534 
535     /**
536      * Remove the decoder callback from the session's attributes.
537      */
538     private void disposeDecoderOut(IoSession session) {
539         session.removeAttribute(DECODER_OUT);
540     }
541 }