View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.filter.executor;
21  
22  import java.util.EnumSet;
23  import java.util.concurrent.Executor;
24  import java.util.concurrent.ExecutorService;
25  import java.util.concurrent.Executors;
26  import java.util.concurrent.ThreadFactory;
27  import java.util.concurrent.TimeUnit;
28  
29  import org.apache.mina.core.filterchain.IoFilterAdapter;
30  import org.apache.mina.core.filterchain.IoFilterChain;
31  import org.apache.mina.core.filterchain.IoFilterEvent;
32  import org.apache.mina.core.session.IdleStatus;
33  import org.apache.mina.core.session.IoEventType;
34  import org.apache.mina.core.session.IoSession;
35  import org.apache.mina.core.write.WriteRequest;
36  
37  /**
38   * A filter that forwards I/O events to {@link Executor} to enforce a certain
39   * thread model while allowing the events per session to be processed
40   * simultaneously. You can apply various thread model by inserting this filter
41   * to a {@link IoFilterChain}.
42   * 
43   * <h2>Life Cycle Management</h2>
44   * 
45   * Please note that this filter doesn't manage the life cycle of the {@link Executor}.
46   * If you created this filter using {@link #ExecutorFilter(Executor)} or similar
47   * constructor that accepts an {@link Executor} that you've instantiated, you have
48   * full control and responsibility of managing its life cycle (e.g. calling
49   * {@link ExecutorService#shutdown()}.
50   * <p> 
51   * If you created this filter using convenience constructors like
52   * {@link #ExecutorFilter(int)}, then you can shut down the executor by calling
53   * {@link #destroy()} explicitly.
54   * 
55   * <h2>Event Ordering</h2>
56   * 
57   * All convenience constructors of this filter creates a new
58   * {@link OrderedThreadPoolExecutor} instance.  Therefore, the order of event is
59   * maintained like the following:
60   * <ul>
61   * <li>All event handler methods are called exclusively.
62   *     (e.g. messageReceived and messageSent can't be invoked at the same time.)</li>
63   * <li>The event order is never mixed up.
64   *     (e.g. messageReceived is always invoked before sessionClosed or messageSent.)</li>
65   * </ul>
66   * However, if you specified other {@link Executor} instance in the constructor,
67   * the order of events are not maintained at all.  This means more than one event
68   * handler methods can be invoked at the same time with mixed order.  For example,
69   * let's assume that messageReceived, messageSent, and sessionClosed events are
70   * fired.
71   * <ul>
72   * <li>All event handler methods can be called simultaneously.
73   *     (e.g. messageReceived and messageSent can be invoked at the same time.)</li>
74   * <li>The event order can be mixed up.
75   *     (e.g. sessionClosed or messageSent can be invoked before messageReceived
76   *           is invoked.)</li>
77   * </ul>
78   * If you need to maintain the order of events per session, please specify an
79   * {@link OrderedThreadPoolExecutor} instance or use the convenience constructors.
80   * 
81   * <h2>Selective Filtering</h2>
82   * 
83   * By default, all event types but <tt>sessionCreated</tt>, <tt>filterWrite</tt>,
84   * <tt>filterClose</tt> and <tt>filterSetTrafficMask</tt> are submitted to the
85   * underlying executor, which is most common setting.
86   * <p>
87   * If you want to submit only a certain set of event types, you can specify them
88   * in the constructor.  For example, you could configure a thread pool for
89   * write operation for the maximum performance:
90   * <pre><code>
91   * IoService service = ...;
92   * DefaultIoFilterChainBuilder chain = service.getFilterChain();
93   * 
94   * chain.addLast("codec", new ProtocolCodecFilter(...));
95   * // Use one thread pool for most events.
96   * chain.addLast("executor1", new ExecutorFilter());
97   * // and another dedicated thread pool for 'filterWrite' events.
98   * chain.addLast("executor2", new ExecutorFilter(IoEventType.WRITE));
99   * </code></pre>
100  * 
101  * <h2>Preventing {@link OutOfMemoryError}</h2>
102  * 
103  * Please refer to {@link IoEventQueueThrottle}, which is specified as
104  * a parameter of the convenience constructors.
105  * 
106  * @author <a href="http://mina.apache.org">Apache MINA Project</a>
107  * 
108  * @see OrderedThreadPoolExecutor
109  * @see UnorderedThreadPoolExecutor
110  * @org.apache.xbean.XBean
111  */
112 public class ExecutorFilter extends IoFilterAdapter {
113     /** The list of handled events */
114     private EnumSet<IoEventType> eventTypes;
115 
116     /** The associated executor */
117     private Executor executor;
118 
119     /** A flag set if the executor can be managed */
120     private boolean manageableExecutor;
121 
122     /** The default pool size */
123     private static final int DEFAULT_MAX_POOL_SIZE = 16;
124 
125     /** The number of thread to create at startup */
126     private static final int BASE_THREAD_NUMBER = 0;
127 
128     /** The default KeepAlive time, in seconds */
129     private static final long DEFAULT_KEEPALIVE_TIME = 30;
130 
131     /** 
132      * A set of flags used to tell if the Executor has been created 
133      * in the constructor or passed as an argument. In the second case, 
134      * the executor state can be managed.
135      **/
136     private static final boolean MANAGEABLE_EXECUTOR = true;
137 
138     private static final boolean NOT_MANAGEABLE_EXECUTOR = false;
139 
140     /** A list of default EventTypes to be handled by the executor */
141     private static IoEventType[] DEFAULT_EVENT_SET = new IoEventType[] { IoEventType.EXCEPTION_CAUGHT,
142             IoEventType.MESSAGE_RECEIVED, IoEventType.MESSAGE_SENT, IoEventType.SESSION_CLOSED,
143             IoEventType.SESSION_IDLE, IoEventType.SESSION_OPENED };
144 
145     /**
146      * (Convenience constructor) Creates a new instance with a new
147      * {@link OrderedThreadPoolExecutor}, no thread in the pool, and a 
148      * maximum of 16 threads in the pool. All the event will be handled 
149      * by this default executor.
150      */
151     public ExecutorFilter() {
152         // Create a new default Executor
153         Executor executor = createDefaultExecutor(BASE_THREAD_NUMBER, DEFAULT_MAX_POOL_SIZE, DEFAULT_KEEPALIVE_TIME,
154                 TimeUnit.SECONDS, Executors.defaultThreadFactory(), null);
155 
156         // Initialize the filter
157         init(executor, MANAGEABLE_EXECUTOR);
158     }
159 
160     /**
161      * (Convenience constructor) Creates a new instance with a new
162      * {@link OrderedThreadPoolExecutor}, no thread in the pool, but 
163      * a maximum of threads in the pool is given. All the event will be handled 
164      * by this default executor.
165      * 
166      * @param maximumPoolSize The maximum pool size
167      */
168     public ExecutorFilter(int maximumPoolSize) {
169         // Create a new default Executor
170         Executor executor = createDefaultExecutor(BASE_THREAD_NUMBER, maximumPoolSize, DEFAULT_KEEPALIVE_TIME,
171                 TimeUnit.SECONDS, Executors.defaultThreadFactory(), null);
172 
173         // Initialize the filter
174         init(executor, MANAGEABLE_EXECUTOR);
175     }
176 
177     /**
178      * (Convenience constructor) Creates a new instance with a new
179      * {@link OrderedThreadPoolExecutor}, a number of thread to start with, a  
180      * maximum of threads the pool can contain. All the event will be handled 
181      * by this default executor.
182      *
183      * @param corePoolSize The initial pool size
184      * @param maximumPoolSize The maximum pool size
185      */
186     public ExecutorFilter(int corePoolSize, int maximumPoolSize) {
187         // Create a new default Executor
188         Executor executor = createDefaultExecutor(corePoolSize, maximumPoolSize, DEFAULT_KEEPALIVE_TIME,
189                 TimeUnit.SECONDS, Executors.defaultThreadFactory(), null);
190 
191         // Initialize the filter
192         init(executor, MANAGEABLE_EXECUTOR);
193     }
194 
195     /**
196      * (Convenience constructor) Creates a new instance with a new
197      * {@link OrderedThreadPoolExecutor}.
198      * 
199      * @param corePoolSize The initial pool size
200      * @param maximumPoolSize The maximum pool size
201      * @param keepAliveTime Default duration for a thread
202      * @param unit Time unit used for the keepAlive value
203      */
204     public ExecutorFilter(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit) {
205         // Create a new default Executor
206         Executor executor = createDefaultExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit,
207                 Executors.defaultThreadFactory(), null);
208 
209         // Initialize the filter
210         init(executor, MANAGEABLE_EXECUTOR);
211     }
212 
213     /**
214      * (Convenience constructor) Creates a new instance with a new
215      * {@link OrderedThreadPoolExecutor}.
216      * 
217      * @param corePoolSize The initial pool size
218      * @param maximumPoolSize The maximum pool size
219      * @param keepAliveTime Default duration for a thread
220      * @param unit Time unit used for the keepAlive value
221      * @param queueHandler The queue used to store events
222      */
223     public ExecutorFilter(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
224             IoEventQueueHandler queueHandler) {
225         // Create a new default Executor
226         Executor executor = createDefaultExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit,
227                 Executors.defaultThreadFactory(), queueHandler);
228 
229         // Initialize the filter
230         init(executor, MANAGEABLE_EXECUTOR);
231     }
232 
233     /**
234      * (Convenience constructor) Creates a new instance with a new
235      * {@link OrderedThreadPoolExecutor}.
236      * 
237      * @param corePoolSize The initial pool size
238      * @param maximumPoolSize The maximum pool size
239      * @param keepAliveTime Default duration for a thread
240      * @param unit Time unit used for the keepAlive value
241      * @param threadFactory The factory used to create threads
242      */
243     public ExecutorFilter(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
244             ThreadFactory threadFactory) {
245         // Create a new default Executor
246         Executor executor = createDefaultExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, threadFactory,
247                 null);
248 
249         // Initialize the filter
250         init(executor, MANAGEABLE_EXECUTOR);
251     }
252 
253     /**
254      * (Convenience constructor) Creates a new instance with a new
255      * {@link OrderedThreadPoolExecutor}.
256      * 
257      * @param corePoolSize The initial pool size
258      * @param maximumPoolSize The maximum pool size
259      * @param keepAliveTime Default duration for a thread
260      * @param unit Time unit used for the keepAlive value
261      * @param threadFactory The factory used to create threads
262      * @param queueHandler The queue used to store events
263      */
264     public ExecutorFilter(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
265             ThreadFactory threadFactory, IoEventQueueHandler queueHandler) {
266         // Create a new default Executor
267         Executor executor = new OrderedThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit,
268                 threadFactory, queueHandler);
269 
270         // Initialize the filter
271         init(executor, MANAGEABLE_EXECUTOR);
272     }
273 
274     /**
275      * (Convenience constructor) Creates a new instance with a new
276      * {@link OrderedThreadPoolExecutor}.
277      * 
278      * @param eventTypes The event for which the executor will be used
279      */
280     public ExecutorFilter(IoEventType... eventTypes) {
281         // Create a new default Executor
282         Executor executor = createDefaultExecutor(BASE_THREAD_NUMBER, DEFAULT_MAX_POOL_SIZE, DEFAULT_KEEPALIVE_TIME,
283                 TimeUnit.SECONDS, Executors.defaultThreadFactory(), null);
284 
285         // Initialize the filter
286         init(executor, MANAGEABLE_EXECUTOR, eventTypes);
287     }
288 
289     /**
290      * (Convenience constructor) Creates a new instance with a new
291      * {@link OrderedThreadPoolExecutor}.
292      * 
293      * @param maximumPoolSize The maximum pool size
294      * @param eventTypes The event for which the executor will be used
295      */
296     public ExecutorFilter(int maximumPoolSize, IoEventType... eventTypes) {
297         // Create a new default Executor
298         Executor executor = createDefaultExecutor(BASE_THREAD_NUMBER, maximumPoolSize, DEFAULT_KEEPALIVE_TIME,
299                 TimeUnit.SECONDS, Executors.defaultThreadFactory(), null);
300 
301         // Initialize the filter
302         init(executor, MANAGEABLE_EXECUTOR, eventTypes);
303     }
304 
305     /**
306      * (Convenience constructor) Creates a new instance with a new
307      * {@link OrderedThreadPoolExecutor}.
308      * 
309      * @param corePoolSize The initial pool size
310      * @param maximumPoolSize The maximum pool size
311      * @param eventTypes The event for which the executor will be used
312      */
313     public ExecutorFilter(int corePoolSize, int maximumPoolSize, IoEventType... eventTypes) {
314         // Create a new default Executor
315         Executor executor = createDefaultExecutor(corePoolSize, maximumPoolSize, DEFAULT_KEEPALIVE_TIME,
316                 TimeUnit.SECONDS, Executors.defaultThreadFactory(), null);
317 
318         // Initialize the filter
319         init(executor, MANAGEABLE_EXECUTOR, eventTypes);
320     }
321 
322     /**
323      * (Convenience constructor) Creates a new instance with a new
324      * {@link OrderedThreadPoolExecutor}.
325      * 
326      * @param corePoolSize The initial pool size
327      * @param maximumPoolSize The maximum pool size
328      * @param keepAliveTime Default duration for a thread
329      * @param unit Time unit used for the keepAlive value
330      * @param eventTypes The event for which the executor will be used
331      */
332     public ExecutorFilter(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
333             IoEventType... eventTypes) {
334         // Create a new default Executor
335         Executor executor = createDefaultExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit,
336                 Executors.defaultThreadFactory(), null);
337 
338         // Initialize the filter
339         init(executor, MANAGEABLE_EXECUTOR, eventTypes);
340     }
341 
342     /**
343      * (Convenience constructor) Creates a new instance with a new
344      * {@link OrderedThreadPoolExecutor}.
345      * 
346      * @param corePoolSize The initial pool size
347      * @param maximumPoolSize The maximum pool size
348      * @param keepAliveTime Default duration for a thread
349      * @param unit Time unit used for the keepAlive value
350      * @param queueHandler The queue used to store events
351      * @param eventTypes The event for which the executor will be used
352      */
353     public ExecutorFilter(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
354             IoEventQueueHandler queueHandler, IoEventType... eventTypes) {
355         // Create a new default Executor
356         Executor executor = createDefaultExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit,
357                 Executors.defaultThreadFactory(), queueHandler);
358 
359         // Initialize the filter
360         init(executor, MANAGEABLE_EXECUTOR, eventTypes);
361     }
362 
363     /**
364      * (Convenience constructor) Creates a new instance with a new
365      * {@link OrderedThreadPoolExecutor}.
366      * 
367      * @param corePoolSize The initial pool size
368      * @param maximumPoolSize The maximum pool size
369      * @param keepAliveTime Default duration for a thread
370      * @param unit Time unit used for the keepAlive value
371      * @param threadFactory The factory used to create threads
372      * @param eventTypes The event for which the executor will be used
373      */
374     public ExecutorFilter(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
375             ThreadFactory threadFactory, IoEventType... eventTypes) {
376         // Create a new default Executor
377         Executor executor = createDefaultExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, threadFactory,
378                 null);
379 
380         // Initialize the filter
381         init(executor, MANAGEABLE_EXECUTOR, eventTypes);
382     }
383 
384     /**
385      * (Convenience constructor) Creates a new instance with a new
386      * {@link OrderedThreadPoolExecutor}.
387      * 
388      * @param corePoolSize The initial pool size
389      * @param maximumPoolSize The maximum pool size
390      * @param keepAliveTime Default duration for a thread
391      * @param unit Time unit used for the keepAlive value
392      * @param threadFactory The factory used to create threads
393      * @param queueHandler The queue used to store events
394      * @param eventTypes The event for which the executor will be used
395      */
396     public ExecutorFilter(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
397             ThreadFactory threadFactory, IoEventQueueHandler queueHandler, IoEventType... eventTypes) {
398         // Create a new default Executor
399         Executor executor = new OrderedThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit,
400                 threadFactory, queueHandler);
401 
402         // Initialize the filter
403         init(executor, MANAGEABLE_EXECUTOR, eventTypes);
404     }
405 
406     /**
407      * Creates a new instance with the specified {@link Executor}.
408      * 
409      * @param executor the user's managed Executor to use in this filter
410      */
411     public ExecutorFilter(Executor executor) {
412         // Initialize the filter
413         init(executor, NOT_MANAGEABLE_EXECUTOR);
414     }
415 
416     /**
417      * Creates a new instance with the specified {@link Executor}.
418      * 
419      * @param executor the user's managed Executor to use in this filter
420      * @param eventTypes The event for which the executor will be used
421      */
422     public ExecutorFilter(Executor executor, IoEventType... eventTypes) {
423         // Initialize the filter
424         init(executor, NOT_MANAGEABLE_EXECUTOR, eventTypes);
425     }
426 
427     /**
428      * Create an OrderedThreadPool executor.
429      *
430      * @param corePoolSize The initial pool sizePoolSize
431      * @param maximumPoolSize The maximum pool size
432      * @param keepAliveTime Default duration for a thread
433      * @param unit Time unit used for the keepAlive value
434      * @param threadFactory The factory used to create threads
435      * @param queueHandler The queue used to store events
436      * @return An instance of the created Executor
437      */
438     private Executor createDefaultExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
439             ThreadFactory threadFactory, IoEventQueueHandler queueHandler) {
440         // Create a new Executor
441         Executor executor = new OrderedThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit,
442                 threadFactory, queueHandler);
443 
444         return executor;
445     }
446 
447     /**
448      * Create an EnumSet from an array of EventTypes, and set the associated
449      * eventTypes field.
450      *
451      * @param eventTypes The array of handled events
452      */
453     private void initEventTypes(IoEventType... eventTypes) {
454         if ((eventTypes == null) || (eventTypes.length == 0)) {
455             eventTypes = DEFAULT_EVENT_SET;
456         }
457 
458         // Copy the list of handled events in the event set
459         this.eventTypes = EnumSet.of(eventTypes[0], eventTypes);
460 
461         // Check that we don't have the SESSION_CREATED event in the set
462         if (this.eventTypes.contains(IoEventType.SESSION_CREATED)) {
463             this.eventTypes = null;
464             throw new IllegalArgumentException(IoEventType.SESSION_CREATED + " is not allowed.");
465         }
466     }
467 
468     /**
469      * Creates a new instance of ExecutorFilter. This private constructor is called by all
470      * the public constructor.
471      *
472      * @param executor The underlying {@link Executor} in charge of managing the Thread pool.
473      * @param manageableExecutor Tells if the Executor's Life Cycle can be managed or not
474      * @param eventTypes The lit of event which are handled by the executor
475      * @param
476      */
477     private void init(Executor executor, boolean manageableExecutor, IoEventType... eventTypes) {
478         if (executor == null) {
479             throw new IllegalArgumentException("executor");
480         }
481 
482         initEventTypes(eventTypes);
483         this.executor = executor;
484         this.manageableExecutor = manageableExecutor;
485     }
486 
487     /**
488      * Shuts down the underlying executor if this filter hase been created via
489      * a convenience constructor.
490      */
491     @Override
492     public void destroy() {
493         if (manageableExecutor) {
494             ((ExecutorService) executor).shutdown();
495         }
496     }
497 
498     /**
499      * Returns the underlying {@link Executor} instance this filter uses.
500      * 
501      * @return The underlying {@link Executor}
502      */
503     public final Executor getExecutor() {
504         return executor;
505     }
506 
507     /**
508      * Fires the specified event through the underlying executor.
509      * 
510      * @param event The filtered event
511      */
512     protected void fireEvent(IoFilterEvent event) {
513         executor.execute(event);
514     }
515 
516     /**
517      * {@inheritDoc}
518      */
519     @Override
520     public void onPreAdd(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception {
521         if (parent.contains(this)) {
522             throw new IllegalArgumentException(
523                     "You can't add the same filter instance more than once.  Create another instance and add it.");
524         }
525     }
526 
527     /**
528      * {@inheritDoc}
529      */
530     @Override
531     public final void sessionOpened(NextFilter nextFilter, IoSession session) {
532         if (eventTypes.contains(IoEventType.SESSION_OPENED)) {
533             IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.SESSION_OPENED, session, null);
534             fireEvent(event);
535         } else {
536             nextFilter.sessionOpened(session);
537         }
538     }
539 
540     /**
541      * {@inheritDoc}
542      */
543     @Override
544     public final void sessionClosed(NextFilter nextFilter, IoSession session) {
545         if (eventTypes.contains(IoEventType.SESSION_CLOSED)) {
546             IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.SESSION_CLOSED, session, null);
547             fireEvent(event);
548         } else {
549             nextFilter.sessionClosed(session);
550         }
551     }
552 
553     /**
554      * {@inheritDoc}
555      */
556     @Override
557     public final void sessionIdle(NextFilter nextFilter, IoSession session, IdleStatus status) {
558         if (eventTypes.contains(IoEventType.SESSION_IDLE)) {
559             IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.SESSION_IDLE, session, status);
560             fireEvent(event);
561         } else {
562             nextFilter.sessionIdle(session, status);
563         }
564     }
565 
566     /**
567      * {@inheritDoc}
568      */
569     @Override
570     public final void exceptionCaught(NextFilter nextFilter, IoSession session, Throwable cause) {
571         if (eventTypes.contains(IoEventType.EXCEPTION_CAUGHT)) {
572             IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.EXCEPTION_CAUGHT, session, cause);
573             fireEvent(event);
574         } else {
575             nextFilter.exceptionCaught(session, cause);
576         }
577     }
578 
579     /**
580      * {@inheritDoc}
581      */
582     @Override
583     public final void messageReceived(NextFilter nextFilter, IoSession session, Object message) {
584         if (eventTypes.contains(IoEventType.MESSAGE_RECEIVED)) {
585             IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.MESSAGE_RECEIVED, session, message);
586             fireEvent(event);
587         } else {
588             nextFilter.messageReceived(session, message);
589         }
590     }
591 
592     /**
593      * {@inheritDoc}
594      */
595     @Override
596     public final void messageSent(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) {
597         if (eventTypes.contains(IoEventType.MESSAGE_SENT)) {
598             IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.MESSAGE_SENT, session, writeRequest);
599             fireEvent(event);
600         } else {
601             nextFilter.messageSent(session, writeRequest);
602         }
603     }
604 
605     /**
606      * {@inheritDoc}
607      */
608     @Override
609     public final void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) {
610         if (eventTypes.contains(IoEventType.WRITE)) {
611             IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.WRITE, session, writeRequest);
612             fireEvent(event);
613         } else {
614             nextFilter.filterWrite(session, writeRequest);
615         }
616     }
617 
618     /**
619      * {@inheritDoc}
620      */
621     @Override
622     public final void filterClose(NextFilter nextFilter, IoSession session) throws Exception {
623         if (eventTypes.contains(IoEventType.CLOSE)) {
624             IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.CLOSE, session, null);
625             fireEvent(event);
626         } else {
627             nextFilter.filterClose(session);
628         }
629     }
630 }