001/*
002 *  Licensed to the Apache Software Foundation (ASF) under one
003 *  or more contributor license agreements.  See the NOTICE file
004 *  distributed with this work for additional information
005 *  regarding copyright ownership.  The ASF licenses this file
006 *  to you under the Apache License, Version 2.0 (the
007 *  "License"); you may not use this file except in compliance
008 *  with the License.  You may obtain a copy of the License at
009 *
010 *    http://www.apache.org/licenses/LICENSE-2.0
011 *
012 *  Unless required by applicable law or agreed to in writing,
013 *  software distributed under the License is distributed on an
014 *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 *  KIND, either express or implied.  See the License for the
016 *  specific language governing permissions and limitations
017 *  under the License.
018 *
019 */
020package org.apache.mina.filter.codec;
021
022import java.net.SocketAddress;
023import java.util.Queue;
024
025import org.apache.mina.core.buffer.IoBuffer;
026import org.apache.mina.core.file.FileRegion;
027import org.apache.mina.core.filterchain.IoFilter;
028import org.apache.mina.core.filterchain.IoFilterAdapter;
029import org.apache.mina.core.filterchain.IoFilterChain;
030import org.apache.mina.core.future.DefaultWriteFuture;
031import org.apache.mina.core.future.WriteFuture;
032import org.apache.mina.core.session.AbstractIoSession;
033import org.apache.mina.core.session.AttributeKey;
034import org.apache.mina.core.session.IoSession;
035import org.apache.mina.core.write.DefaultWriteRequest;
036import org.apache.mina.core.write.NothingWrittenException;
037import org.apache.mina.core.write.WriteRequest;
038import org.apache.mina.core.write.WriteRequestWrapper;
039import org.slf4j.Logger;
040import org.slf4j.LoggerFactory;
041
042/**
043 * An {@link IoFilter} which translates binary or protocol specific data into
044 * message objects and vice versa using {@link ProtocolCodecFactory},
045 * {@link ProtocolEncoder}, or {@link ProtocolDecoder}.
046 *
047 * @author <a href="http://mina.apache.org">Apache MINA Project</a>
048 * @org.apache.xbean.XBean
049 */
050public class ProtocolCodecFilter extends IoFilterAdapter {
051    /** A logger for this class */
052    private static final Logger LOGGER = LoggerFactory.getLogger(ProtocolCodecFilter.class);
053
054    private static final Class<?>[] EMPTY_PARAMS = new Class[0];
055
056    private static final IoBuffer EMPTY_BUFFER = IoBuffer.wrap(new byte[0]);
057
058    private static final AttributeKey ENCODER = new AttributeKey(ProtocolCodecFilter.class, "encoder");
059
060    private static final AttributeKey DECODER = new AttributeKey(ProtocolCodecFilter.class, "decoder");
061
062    private static final AttributeKey DECODER_OUT = new AttributeKey(ProtocolCodecFilter.class, "decoderOut");
063
064    private static final AttributeKey ENCODER_OUT = new AttributeKey(ProtocolCodecFilter.class, "encoderOut");
065
066    /** The factory responsible for creating the encoder and decoder */
067    private final ProtocolCodecFactory factory;
068
069    /**
070     * Creates a new instance of ProtocolCodecFilter, associating a factory
071     * for the creation of the encoder and decoder.
072     *
073     * @param factory The associated factory
074     */
075    public ProtocolCodecFilter(ProtocolCodecFactory factory) {
076        if (factory == null) {
077            throw new IllegalArgumentException("factory");
078        }
079
080        this.factory = factory;
081    }
082
083    /**
084     * Creates a new instance of ProtocolCodecFilter, without any factory.
085     * The encoder/decoder factory will be created as an inner class, using
086     * the two parameters (encoder and decoder).
087     * 
088     * @param encoder The class responsible for encoding the message
089     * @param decoder The class responsible for decoding the message
090     */
091    public ProtocolCodecFilter(final ProtocolEncoder encoder, final ProtocolDecoder decoder) {
092        if (encoder == null) {
093            throw new IllegalArgumentException("encoder");
094        }
095        if (decoder == null) {
096            throw new IllegalArgumentException("decoder");
097        }
098
099        // Create the inner Factory based on the two parameters
100        this.factory = new ProtocolCodecFactory() {
101            public ProtocolEncoder getEncoder(IoSession session) {
102                return encoder;
103            }
104
105            public ProtocolDecoder getDecoder(IoSession session) {
106                return decoder;
107            }
108        };
109    }
110
111    /**
112     * Creates a new instance of ProtocolCodecFilter, without any factory.
113     * The encoder/decoder factory will be created as an inner class, using
114     * the two parameters (encoder and decoder), which are class names. Instances
115     * for those classes will be created in this constructor.
116     * 
117     * @param encoderClass The class responsible for encoding the message
118     * @param decoderClass The class responsible for decoding the message
119     */
120    public ProtocolCodecFilter(final Class<? extends ProtocolEncoder> encoderClass,
121            final Class<? extends ProtocolDecoder> decoderClass) {
122        if (encoderClass == null) {
123            throw new IllegalArgumentException("encoderClass");
124        }
125        if (decoderClass == null) {
126            throw new IllegalArgumentException("decoderClass");
127        }
128        if (!ProtocolEncoder.class.isAssignableFrom(encoderClass)) {
129            throw new IllegalArgumentException("encoderClass: " + encoderClass.getName());
130        }
131        if (!ProtocolDecoder.class.isAssignableFrom(decoderClass)) {
132            throw new IllegalArgumentException("decoderClass: " + decoderClass.getName());
133        }
134        try {
135            encoderClass.getConstructor(EMPTY_PARAMS);
136        } catch (NoSuchMethodException e) {
137            throw new IllegalArgumentException("encoderClass doesn't have a public default constructor.");
138        }
139        try {
140            decoderClass.getConstructor(EMPTY_PARAMS);
141        } catch (NoSuchMethodException e) {
142            throw new IllegalArgumentException("decoderClass doesn't have a public default constructor.");
143        }
144
145        final ProtocolEncoder encoder;
146
147        try {
148            encoder = encoderClass.newInstance();
149        } catch (Exception e) {
150            throw new IllegalArgumentException("encoderClass cannot be initialized");
151        }
152
153        final ProtocolDecoder decoder;
154
155        try {
156            decoder = decoderClass.newInstance();
157        } catch (Exception e) {
158            throw new IllegalArgumentException("decoderClass cannot be initialized");
159        }
160
161        // Create the inner factory based on the two parameters.
162        this.factory = new ProtocolCodecFactory() {
163            public ProtocolEncoder getEncoder(IoSession session) throws Exception {
164                return encoder;
165            }
166
167            public ProtocolDecoder getDecoder(IoSession session) throws Exception {
168                return decoder;
169            }
170        };
171    }
172
173    /**
174     * Get the encoder instance from a given session.
175     *
176     * @param session The associated session we will get the encoder from
177     * @return The encoder instance, if any
178     */
179    public ProtocolEncoder getEncoder(IoSession session) {
180        return (ProtocolEncoder) session.getAttribute(ENCODER);
181    }
182
183    @Override
184    public void onPreAdd(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception {
185        if (parent.contains(this)) {
186            throw new IllegalArgumentException(
187                    "You can't add the same filter instance more than once.  Create another instance and add it.");
188        }
189    }
190
191    @Override
192    public void onPostRemove(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception {
193        // Clean everything
194        disposeCodec(parent.getSession());
195    }
196
197    /**
198     * Process the incoming message, calling the session decoder. As the incoming
199     * buffer might contains more than one messages, we have to loop until the decoder
200     * throws an exception.
201     * 
202     *  while ( buffer not empty )
203     *    try
204     *      decode ( buffer )
205     *    catch
206     *      break;
207     * 
208     */
209    @Override
210    public void messageReceived(NextFilter nextFilter, IoSession session, Object message) throws Exception {
211        LOGGER.debug("Processing a MESSAGE_RECEIVED for session {}", session.getId());
212
213        if (!(message instanceof IoBuffer)) {
214            nextFilter.messageReceived(session, message);
215            return;
216        }
217
218        IoBuffer in = (IoBuffer) message;
219        ProtocolDecoder decoder = factory.getDecoder(session);
220        ProtocolDecoderOutput decoderOut = getDecoderOut(session, nextFilter);
221
222        // Loop until we don't have anymore byte in the buffer,
223        // or until the decoder throws an unrecoverable exception or
224        // can't decoder a message, because there are not enough
225        // data in the buffer
226        while (in.hasRemaining()) {
227            int oldPos = in.position();
228            try {
229                synchronized (session) {
230                    // Call the decoder with the read bytes
231                    decoder.decode(session, in, decoderOut);
232                }
233                // Finish decoding if no exception was thrown.
234                decoderOut.flush(nextFilter, session);
235            } catch (Exception e) {
236                ProtocolDecoderException pde;
237                if (e instanceof ProtocolDecoderException) {
238                    pde = (ProtocolDecoderException) e;
239                } else {
240                    pde = new ProtocolDecoderException(e);
241                }
242                if (pde.getHexdump() == null) {
243                    // Generate a message hex dump
244                    int curPos = in.position();
245                    in.position(oldPos);
246                    pde.setHexdump(in.getHexDump());
247                    in.position(curPos);
248                }
249                // Fire the exceptionCaught event.
250                decoderOut.flush(nextFilter, session);
251                nextFilter.exceptionCaught(session, pde);
252                // Retry only if the type of the caught exception is
253                // recoverable and the buffer position has changed.
254                // We check buffer position additionally to prevent an
255                // infinite loop.
256                if (!(e instanceof RecoverableProtocolDecoderException) || (in.position() == oldPos)) {
257                    break;
258                }
259            }
260        }
261    }
262
263    @Override
264    public void messageSent(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception {
265        if (writeRequest instanceof EncodedWriteRequest) {
266            return;
267        }
268
269        if (writeRequest instanceof MessageWriteRequest) {
270            MessageWriteRequest wrappedRequest = (MessageWriteRequest) writeRequest;
271            nextFilter.messageSent(session, wrappedRequest.getParentRequest());
272        } else {
273            nextFilter.messageSent(session, writeRequest);
274        }
275    }
276
277    @Override
278    public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception {
279        Object message = writeRequest.getMessage();
280
281        // Bypass the encoding if the message is contained in a IoBuffer,
282        // as it has already been encoded before
283        if ((message instanceof IoBuffer) || (message instanceof FileRegion)) {
284            nextFilter.filterWrite(session, writeRequest);
285            return;
286        }
287
288        // Get the encoder in the session
289        ProtocolEncoder encoder = factory.getEncoder(session);
290
291        ProtocolEncoderOutput encoderOut = getEncoderOut(session, nextFilter, writeRequest);
292
293        if (encoder == null) {
294            throw new ProtocolEncoderException("The encoder is null for the session " + session);
295        }
296
297        try {
298            // Now we can try to encode the response
299            encoder.encode(session, message, encoderOut);
300
301            // Send it directly
302            Queue<Object> bufferQueue = ((AbstractProtocolEncoderOutput) encoderOut).getMessageQueue();
303
304            // Write all the encoded messages now
305            while (!bufferQueue.isEmpty()) {
306                Object encodedMessage = bufferQueue.poll();
307
308                if (encodedMessage == null) {
309                    break;
310                }
311
312                // Flush only when the buffer has remaining.
313                if (!(encodedMessage instanceof IoBuffer) || ((IoBuffer) encodedMessage).hasRemaining()) {
314                    SocketAddress destination = writeRequest.getDestination();
315                    WriteRequest encodedWriteRequest = new EncodedWriteRequest(encodedMessage, null, destination);
316
317                    nextFilter.filterWrite(session, encodedWriteRequest);
318                }
319            }
320
321            // Call the next filter
322            nextFilter.filterWrite(session, new MessageWriteRequest(writeRequest));
323        } catch (Exception e) {
324            ProtocolEncoderException pee;
325
326            // Generate the correct exception
327            if (e instanceof ProtocolEncoderException) {
328                pee = (ProtocolEncoderException) e;
329            } else {
330                pee = new ProtocolEncoderException(e);
331            }
332
333            throw pee;
334        }
335    }
336
337    @Override
338    public void sessionClosed(NextFilter nextFilter, IoSession session) throws Exception {
339        // Call finishDecode() first when a connection is closed.
340        ProtocolDecoder decoder = factory.getDecoder(session);
341        ProtocolDecoderOutput decoderOut = getDecoderOut(session, nextFilter);
342
343        try {
344            decoder.finishDecode(session, decoderOut);
345        } catch (Exception e) {
346            ProtocolDecoderException pde;
347            if (e instanceof ProtocolDecoderException) {
348                pde = (ProtocolDecoderException) e;
349            } else {
350                pde = new ProtocolDecoderException(e);
351            }
352            throw pde;
353        } finally {
354            // Dispose everything
355            disposeCodec(session);
356            decoderOut.flush(nextFilter, session);
357        }
358
359        // Call the next filter
360        nextFilter.sessionClosed(session);
361    }
362
363    private static class EncodedWriteRequest extends DefaultWriteRequest {
364        public EncodedWriteRequest(Object encodedMessage, WriteFuture future, SocketAddress destination) {
365            super(encodedMessage, future, destination);
366        }
367
368        public boolean isEncoded() {
369            return true;
370        }
371    }
372
373    private static class MessageWriteRequest extends WriteRequestWrapper {
374        public MessageWriteRequest(WriteRequest writeRequest) {
375            super(writeRequest);
376        }
377
378        @Override
379        public Object getMessage() {
380            return EMPTY_BUFFER;
381        }
382
383        @Override
384        public String toString() {
385            return "MessageWriteRequest, parent : " + super.toString();
386        }
387    }
388
389    private static class ProtocolDecoderOutputImpl extends AbstractProtocolDecoderOutput {
390        public ProtocolDecoderOutputImpl() {
391            // Do nothing
392        }
393
394        public void flush(NextFilter nextFilter, IoSession session) {
395            Queue<Object> messageQueue = getMessageQueue();
396
397            while (!messageQueue.isEmpty()) {
398                nextFilter.messageReceived(session, messageQueue.poll());
399            }
400        }
401    }
402
403    private static class ProtocolEncoderOutputImpl extends AbstractProtocolEncoderOutput {
404        private final IoSession session;
405
406        private final NextFilter nextFilter;
407
408        /** The WriteRequest destination */
409        private final SocketAddress destination;
410
411        public ProtocolEncoderOutputImpl(IoSession session, NextFilter nextFilter, WriteRequest writeRequest) {
412            this.session = session;
413            this.nextFilter = nextFilter;
414
415            // Only store the destination, not the full WriteRequest.
416            destination = writeRequest.getDestination();
417        }
418
419        public WriteFuture flush() {
420            Queue<Object> bufferQueue = getMessageQueue();
421            WriteFuture future = null;
422
423            while (!bufferQueue.isEmpty()) {
424                Object encodedMessage = bufferQueue.poll();
425
426                if (encodedMessage == null) {
427                    break;
428                }
429
430                // Flush only when the buffer has remaining.
431                if (!(encodedMessage instanceof IoBuffer) || ((IoBuffer) encodedMessage).hasRemaining()) {
432                    future = new DefaultWriteFuture(session);
433                    nextFilter.filterWrite(session, new EncodedWriteRequest(encodedMessage, future, destination));
434                }
435            }
436
437            if (future == null) {
438                // Creates an empty writeRequest containing the destination
439                future = DefaultWriteFuture.newNotWrittenFuture(session, new NothingWrittenException(AbstractIoSession.MESSAGE_SENT_REQUEST));
440            }
441
442            return future;
443        }
444    }
445
446    //----------- Helper methods ---------------------------------------------
447    /**
448     * Dispose the encoder, decoder, and the callback for the decoded
449     * messages.
450     */
451    private void disposeCodec(IoSession session) {
452        // We just remove the two instances of encoder/decoder to release resources
453        // from the session
454        disposeEncoder(session);
455        disposeDecoder(session);
456
457        // We also remove the callback
458        disposeDecoderOut(session);
459    }
460
461    /**
462     * Dispose the encoder, removing its instance from the
463     * session's attributes, and calling the associated
464     * dispose method.
465     */
466    private void disposeEncoder(IoSession session) {
467        ProtocolEncoder encoder = (ProtocolEncoder) session.removeAttribute(ENCODER);
468        if (encoder == null) {
469            return;
470        }
471
472        try {
473            encoder.dispose(session);
474        } catch (Exception e) {
475            LOGGER.warn("Failed to dispose: " + encoder.getClass().getName() + " (" + encoder + ')');
476        }
477    }
478
479    /**
480     * Dispose the decoder, removing its instance from the
481     * session's attributes, and calling the associated
482     * dispose method.
483     */
484    private void disposeDecoder(IoSession session) {
485        ProtocolDecoder decoder = (ProtocolDecoder) session.removeAttribute(DECODER);
486        if (decoder == null) {
487            return;
488        }
489
490        try {
491            decoder.dispose(session);
492        } catch (Exception e) {
493            LOGGER.warn("Failed to dispose: " + decoder.getClass().getName() + " (" + decoder + ')');
494        }
495    }
496
497    /**
498     * Return a reference to the decoder callback. If it's not already created
499     * and stored into the session, we create a new instance.
500     */
501    private ProtocolDecoderOutput getDecoderOut(IoSession session, NextFilter nextFilter) {
502        ProtocolDecoderOutput out = (ProtocolDecoderOutput) session.getAttribute(DECODER_OUT);
503
504        if (out == null) {
505            // Create a new instance, and stores it into the session
506            out = new ProtocolDecoderOutputImpl();
507            session.setAttribute(DECODER_OUT, out);
508        }
509
510        return out;
511    }
512
513    private ProtocolEncoderOutput getEncoderOut(IoSession session, NextFilter nextFilter, WriteRequest writeRequest) {
514        ProtocolEncoderOutput out = (ProtocolEncoderOutput) session.getAttribute(ENCODER_OUT);
515
516        if (out == null) {
517            // Create a new instance, and stores it into the session
518            out = new ProtocolEncoderOutputImpl(session, nextFilter, writeRequest);
519            session.setAttribute(ENCODER_OUT, out);
520        }
521
522        return out;
523    }
524
525    /**
526     * Remove the decoder callback from the session's attributes.
527     */
528    private void disposeDecoderOut(IoSession session) {
529        session.removeAttribute(DECODER_OUT);
530    }
531}