View Javadoc
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.filter.codec;
21  
22  import java.net.SocketAddress;
23  import java.util.Queue;
24  import java.util.concurrent.Semaphore;
25  
26  import org.apache.mina.core.buffer.IoBuffer;
27  import org.apache.mina.core.file.FileRegion;
28  import org.apache.mina.core.filterchain.IoFilter;
29  import org.apache.mina.core.filterchain.IoFilterAdapter;
30  import org.apache.mina.core.filterchain.IoFilterChain;
31  import org.apache.mina.core.future.DefaultWriteFuture;
32  import org.apache.mina.core.future.WriteFuture;
33  import org.apache.mina.core.session.AttributeKey;
34  import org.apache.mina.core.session.IoSession;
35  import org.apache.mina.core.write.DefaultWriteRequest;
36  import org.apache.mina.core.write.NothingWrittenException;
37  import org.apache.mina.core.write.WriteRequest;
38  import org.apache.mina.core.write.WriteRequestWrapper;
39  import org.slf4j.Logger;
40  import org.slf4j.LoggerFactory;
41  
42  /**
43   * An {@link IoFilter} which translates binary or protocol specific data into
44   * message objects and vice versa using {@link ProtocolCodecFactory},
45   * {@link ProtocolEncoder}, or {@link ProtocolDecoder}.
46   *
47   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
48   * @org.apache.xbean.XBean
49   */
50  public class ProtocolCodecFilter extends IoFilterAdapter {
51      /** A logger for this class */
52      private static final Logger LOGGER = LoggerFactory.getLogger(ProtocolCodecFilter.class);
53  
54      private static final Class<?>[] EMPTY_PARAMS = new Class[0];
55  
56      private static final IoBuffer EMPTY_BUFFER = IoBuffer.wrap(new byte[0]);
57  
58      private static final AttributeKey ENCODER = new AttributeKey(ProtocolCodecFilter.class, "encoder");
59  
60      private static final AttributeKey DECODER = new AttributeKey(ProtocolCodecFilter.class, "decoder");
61  
62      private static final AttributeKey DECODER_OUT = new AttributeKey(ProtocolCodecFilter.class, "decoderOut");
63  
64      private static final AttributeKey ENCODER_OUT = new AttributeKey(ProtocolCodecFilter.class, "encoderOut");
65  
66      /** The factory responsible for creating the encoder and decoder */
67      private final ProtocolCodecFactory factory;
68  
69      private final Semaphore lock = new Semaphore(1, true);
70  
71      /**
72       * Creates a new instance of ProtocolCodecFilter, associating a factory
73       * for the creation of the encoder and decoder.
74       *
75       * @param factory The associated factory
76       */
77      public ProtocolCodecFilter(ProtocolCodecFactory factory) {
78          if (factory == null) {
79              throw new IllegalArgumentException("factory");
80          }
81  
82          this.factory = factory;
83      }
84  
85      /**
86       * Creates a new instance of ProtocolCodecFilter, without any factory.
87       * The encoder/decoder factory will be created as an inner class, using
88       * the two parameters (encoder and decoder).
89       * 
90       * @param encoder The class responsible for encoding the message
91       * @param decoder The class responsible for decoding the message
92       */
93      public ProtocolCodecFilter(final ProtocolEncoder encoder, final ProtocolDecoder decoder) {
94          if (encoder == null) {
95              throw new IllegalArgumentException("encoder");
96          }
97          if (decoder == null) {
98              throw new IllegalArgumentException("decoder");
99          }
100 
101         // Create the inner Factory based on the two parameters
102         this.factory = new ProtocolCodecFactory() {
103             public ProtocolEncoder getEncoder(IoSession session) {
104                 return encoder;
105             }
106 
107             public ProtocolDecoder getDecoder(IoSession session) {
108                 return decoder;
109             }
110         };
111     }
112 
113     /**
114      * Creates a new instance of ProtocolCodecFilter, without any factory.
115      * The encoder/decoder factory will be created as an inner class, using
116      * the two parameters (encoder and decoder), which are class names. Instances
117      * for those classes will be created in this constructor.
118      * 
119      * @param encoderClass The class responsible for encoding the message
120      * @param decoderClass The class responsible for decoding the message
121      */
122     public ProtocolCodecFilter(final Class<? extends ProtocolEncoder> encoderClass,
123             final Class<? extends ProtocolDecoder> decoderClass) {
124         if (encoderClass == null) {
125             throw new IllegalArgumentException("encoderClass");
126         }
127         if (decoderClass == null) {
128             throw new IllegalArgumentException("decoderClass");
129         }
130         if (!ProtocolEncoder.class.isAssignableFrom(encoderClass)) {
131             throw new IllegalArgumentException("encoderClass: " + encoderClass.getName());
132         }
133         if (!ProtocolDecoder.class.isAssignableFrom(decoderClass)) {
134             throw new IllegalArgumentException("decoderClass: " + decoderClass.getName());
135         }
136         try {
137             encoderClass.getConstructor(EMPTY_PARAMS);
138         } catch (NoSuchMethodException e) {
139             throw new IllegalArgumentException("encoderClass doesn't have a public default constructor.");
140         }
141         try {
142             decoderClass.getConstructor(EMPTY_PARAMS);
143         } catch (NoSuchMethodException e) {
144             throw new IllegalArgumentException("decoderClass doesn't have a public default constructor.");
145         }
146 
147         final ProtocolEncoder encoder;
148 
149         try {
150             encoder = encoderClass.newInstance();
151         } catch (Exception e) {
152             throw new IllegalArgumentException("encoderClass cannot be initialized");
153         }
154 
155         final ProtocolDecoder decoder;
156 
157         try {
158             decoder = decoderClass.newInstance();
159         } catch (Exception e) {
160             throw new IllegalArgumentException("decoderClass cannot be initialized");
161         }
162 
163         // Create the inner factory based on the two parameters.
164         this.factory = new ProtocolCodecFactory() {
165             public ProtocolEncoder getEncoder(IoSession session) throws Exception {
166                 return encoder;
167             }
168 
169             public ProtocolDecoder getDecoder(IoSession session) throws Exception {
170                 return decoder;
171             }
172         };
173     }
174 
175     /**
176      * Get the encoder instance from a given session.
177      *
178      * @param session The associated session we will get the encoder from
179      * @return The encoder instance, if any
180      */
181     public ProtocolEncoder getEncoder(IoSession session) {
182         return (ProtocolEncoder) session.getAttribute(ENCODER);
183     }
184 
185     @Override
186     public void onPreAdd(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception {
187         if (parent.contains(this)) {
188             throw new IllegalArgumentException(
189                     "You can't add the same filter instance more than once.  Create another instance and add it.");
190         }
191     }
192 
193     @Override
194     public void onPostRemove(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception {
195         // Clean everything
196         disposeCodec(parent.getSession());
197     }
198 
199     /**
200      * Process the incoming message, calling the session decoder. As the incoming
201      * buffer might contains more than one messages, we have to loop until the decoder
202      * throws an exception.
203      * 
204      *  while ( buffer not empty )
205      *    try
206      *      decode ( buffer )
207      *    catch
208      *      break;
209      * 
210      */
211     @Override
212     public void messageReceived(NextFilter nextFilter, IoSession session, Object message) throws Exception {
213         LOGGER.debug("Processing a MESSAGE_RECEIVED for session {}", session.getId());
214 
215         if (!(message instanceof IoBuffer)) {
216             nextFilter.messageReceived(session, message);
217             return;
218         }
219 
220         IoBuffer in = (IoBuffer) message;
221         ProtocolDecoder decoder = factory.getDecoder(session);
222         ProtocolDecoderOutput decoderOut = getDecoderOut(session, nextFilter);
223 
224         // Loop until we don't have anymore byte in the buffer,
225         // or until the decoder throws an unrecoverable exception or
226         // can't decoder a message, because there are not enough
227         // data in the buffer
228         while (in.hasRemaining()) {
229             int oldPos = in.position();
230             try {
231                 lock.acquire();
232                 // Call the decoder with the read bytes
233                 decoder.decode(session, in, decoderOut);
234                 // Finish decoding if no exception was thrown.
235                 decoderOut.flush(nextFilter, session);
236             } catch (Exception e) {
237                 ProtocolDecoderException pde;
238                 if (e instanceof ProtocolDecoderException) {
239                     pde = (ProtocolDecoderException) e;
240                 } else {
241                     pde = new ProtocolDecoderException(e);
242                 }
243                 if (pde.getHexdump() == null) {
244                     // Generate a message hex dump
245                     int curPos = in.position();
246                     in.position(oldPos);
247                     pde.setHexdump(in.getHexDump());
248                     in.position(curPos);
249                 }
250                 // Fire the exceptionCaught event.
251                 decoderOut.flush(nextFilter, session);
252                 nextFilter.exceptionCaught(session, pde);
253                 // Retry only if the type of the caught exception is
254                 // recoverable and the buffer position has changed.
255                 // We check buffer position additionally to prevent an
256                 // infinite loop.
257                 if (!(e instanceof RecoverableProtocolDecoderException) || (in.position() == oldPos)) {
258                     break;
259                 }
260             } finally {
261                 lock.release();
262             }
263         }
264     }
265 
266     @Override
267     public void messageSent(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception {
268         if (writeRequest instanceof EncodedWriteRequest) {
269             return;
270         }
271 
272         if (writeRequest instanceof MessageWriteRequest) {
273             MessageWriteRequest wrappedRequest = (MessageWriteRequest) writeRequest;
274             nextFilter.messageSent(session, wrappedRequest.getParentRequest());
275         } else {
276             nextFilter.messageSent(session, writeRequest);
277         }
278     }
279 
280     @Override
281     public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception {
282         Object message = writeRequest.getMessage();
283 
284         // Bypass the encoding if the message is contained in a IoBuffer,
285         // as it has already been encoded before
286         if ((message instanceof IoBuffer) || (message instanceof FileRegion)) {
287             nextFilter.filterWrite(session, writeRequest);
288             return;
289         }
290 
291         // Get the encoder in the session
292         ProtocolEncoder encoder = factory.getEncoder(session);
293 
294         ProtocolEncoderOutput encoderOut = getEncoderOut(session, nextFilter, writeRequest);
295 
296         if (encoder == null) {
297             throw new ProtocolEncoderException("The encoder is null for the session " + session);
298         }
299 
300         if (encoderOut == null) {
301             throw new ProtocolEncoderException("The encoderOut is null for the session " + session);
302         }
303 
304         try {
305             // Now we can try to encode the response
306             encoder.encode(session, message, encoderOut);
307 
308             // Send it directly
309             Queue<Object> bufferQueue = ((AbstractProtocolEncoderOutput) encoderOut).getMessageQueue();
310 
311             // Write all the encoded messages now
312             while (!bufferQueue.isEmpty()) {
313                 Object encodedMessage = bufferQueue.poll();
314 
315                 if (encodedMessage == null) {
316                     break;
317                 }
318 
319                 // Flush only when the buffer has remaining.
320                 if (!(encodedMessage instanceof IoBuffer) || ((IoBuffer) encodedMessage).hasRemaining()) {
321                     SocketAddress destination = writeRequest.getDestination();
322                     WriteRequest encodedWriteRequest = new EncodedWriteRequest(encodedMessage, null, destination);
323 
324                     nextFilter.filterWrite(session, encodedWriteRequest);
325                 }
326             }
327 
328             // Call the next filter
329             nextFilter.filterWrite(session, new MessageWriteRequest(writeRequest));
330         } catch (Exception e) {
331             ProtocolEncoderException pee;
332 
333             // Generate the correct exception
334             if (e instanceof ProtocolEncoderException) {
335                 pee = (ProtocolEncoderException) e;
336             } else {
337                 pee = new ProtocolEncoderException(e);
338             }
339 
340             throw pee;
341         }
342     }
343 
344     @Override
345     public void sessionClosed(NextFilter nextFilter, IoSession session) throws Exception {
346         // Call finishDecode() first when a connection is closed.
347         ProtocolDecoder decoder = factory.getDecoder(session);
348         ProtocolDecoderOutput decoderOut = getDecoderOut(session, nextFilter);
349 
350         try {
351             decoder.finishDecode(session, decoderOut);
352         } catch (Exception e) {
353             ProtocolDecoderException pde;
354             if (e instanceof ProtocolDecoderException) {
355                 pde = (ProtocolDecoderException) e;
356             } else {
357                 pde = new ProtocolDecoderException(e);
358             }
359             throw pde;
360         } finally {
361             // Dispose everything
362             disposeCodec(session);
363             decoderOut.flush(nextFilter, session);
364         }
365 
366         // Call the next filter
367         nextFilter.sessionClosed(session);
368     }
369 
370     private static class EncodedWriteRequest extends DefaultWriteRequest {
371         public EncodedWriteRequest(Object encodedMessage, WriteFuture future, SocketAddress destination) {
372             super(encodedMessage, future, destination);
373         }
374 
375         public boolean isEncoded() {
376             return true;
377         }
378     }
379 
380     private static class MessageWriteRequest extends WriteRequestWrapper {
381         public MessageWriteRequest(WriteRequest writeRequest) {
382             super(writeRequest);
383         }
384 
385         @Override
386         public Object getMessage() {
387             return EMPTY_BUFFER;
388         }
389 
390         @Override
391         public String toString() {
392             return "MessageWriteRequest, parent : " + super.toString();
393         }
394     }
395 
396     private static class ProtocolDecoderOutputImpl extends AbstractProtocolDecoderOutput {
397         public ProtocolDecoderOutputImpl() {
398             // Do nothing
399         }
400 
401         public void flush(NextFilter nextFilter, IoSession session) {
402             Queue<Object> messageQueue = getMessageQueue();
403 
404             while (!messageQueue.isEmpty()) {
405                 nextFilter.messageReceived(session, messageQueue.poll());
406             }
407         }
408     }
409 
410     private static class ProtocolEncoderOutputImpl extends AbstractProtocolEncoderOutput {
411         private final IoSession session;
412 
413         private final NextFilter nextFilter;
414 
415         /** The WriteRequest destination */
416         private final SocketAddress destination;
417 
418         public ProtocolEncoderOutputImpl(IoSession session, NextFilter nextFilter, WriteRequest writeRequest) {
419             this.session = session;
420             this.nextFilter = nextFilter;
421 
422             // Only store the destination, not the full WriteRequest.
423             destination = writeRequest.getDestination();
424         }
425 
426         public WriteFuture flush() {
427             Queue<Object> bufferQueue = getMessageQueue();
428             WriteFuture future = null;
429 
430             while (!bufferQueue.isEmpty()) {
431                 Object encodedMessage = bufferQueue.poll();
432 
433                 if (encodedMessage == null) {
434                     break;
435                 }
436 
437                 // Flush only when the buffer has remaining.
438                 if (!(encodedMessage instanceof IoBuffer) || ((IoBuffer) encodedMessage).hasRemaining()) {
439                     future = new DefaultWriteFuture(session);
440                     nextFilter.filterWrite(session, new EncodedWriteRequest(encodedMessage, future, destination));
441                 }
442             }
443 
444             if (future == null) {
445                 // Creates an empty writeRequest containing the destination
446                 WriteRequest writeRequest = new DefaultWriteRequest(
447                         DefaultWriteRequest.EMPTY_MESSAGE, null, destination);
448                 future = DefaultWriteFuture.newNotWrittenFuture(session, new NothingWrittenException(writeRequest));
449             }
450 
451             return future;
452         }
453     }
454 
455     //----------- Helper methods ---------------------------------------------
456     /**
457      * Dispose the encoder, decoder, and the callback for the decoded
458      * messages.
459      */
460     private void disposeCodec(IoSession session) {
461         // We just remove the two instances of encoder/decoder to release resources
462         // from the session
463         disposeEncoder(session);
464         disposeDecoder(session);
465 
466         // We also remove the callback
467         disposeDecoderOut(session);
468     }
469 
470     /**
471      * Dispose the encoder, removing its instance from the
472      * session's attributes, and calling the associated
473      * dispose method.
474      */
475     private void disposeEncoder(IoSession session) {
476         ProtocolEncoder encoder = (ProtocolEncoder) session.removeAttribute(ENCODER);
477         if (encoder == null) {
478             return;
479         }
480 
481         try {
482             encoder.dispose(session);
483         } catch (Exception e) {
484             LOGGER.warn("Failed to dispose: " + encoder.getClass().getName() + " (" + encoder + ')');
485         }
486     }
487 
488     /**
489      * Dispose the decoder, removing its instance from the
490      * session's attributes, and calling the associated
491      * dispose method.
492      */
493     private void disposeDecoder(IoSession session) {
494         ProtocolDecoder decoder = (ProtocolDecoder) session.removeAttribute(DECODER);
495         if (decoder == null) {
496             return;
497         }
498 
499         try {
500             decoder.dispose(session);
501         } catch (Exception e) {
502             LOGGER.warn("Failed to dispose: " + decoder.getClass().getName() + " (" + decoder + ')');
503         }
504     }
505 
506     /**
507      * Return a reference to the decoder callback. If it's not already created
508      * and stored into the session, we create a new instance.
509      */
510     private ProtocolDecoderOutput getDecoderOut(IoSession session, NextFilter nextFilter) {
511         ProtocolDecoderOutput out = (ProtocolDecoderOutput) session.getAttribute(DECODER_OUT);
512 
513         if (out == null) {
514             // Create a new instance, and stores it into the session
515             out = new ProtocolDecoderOutputImpl();
516             session.setAttribute(DECODER_OUT, out);
517         }
518 
519         return out;
520     }
521 
522     private ProtocolEncoderOutput getEncoderOut(IoSession session, NextFilter nextFilter, WriteRequest writeRequest) {
523         ProtocolEncoderOutput out = (ProtocolEncoderOutput) session.getAttribute(ENCODER_OUT);
524 
525         if (out == null) {
526             // Create a new instance, and stores it into the session
527             out = new ProtocolEncoderOutputImpl(session, nextFilter, writeRequest);
528             session.setAttribute(ENCODER_OUT, out);
529         }
530 
531         return out;
532     }
533 
534     /**
535      * Remove the decoder callback from the session's attributes.
536      */
537     private void disposeDecoderOut(IoSession session) {
538         session.removeAttribute(DECODER_OUT);
539     }
540 }