1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 *
19 */
20 package org.apache.mina.filter.executor;
21
22 import java.util.EnumSet;
23 import java.util.concurrent.Executor;
24 import java.util.concurrent.ExecutorService;
25 import java.util.concurrent.Executors;
26 import java.util.concurrent.ThreadFactory;
27 import java.util.concurrent.TimeUnit;
28
29 import org.apache.mina.core.filterchain.IoFilterAdapter;
30 import org.apache.mina.core.filterchain.IoFilterChain;
31 import org.apache.mina.core.filterchain.IoFilterEvent;
32 import org.apache.mina.core.session.IdleStatus;
33 import org.apache.mina.core.session.IoEventType;
34 import org.apache.mina.core.session.IoSession;
35 import org.apache.mina.core.write.WriteRequest;
36
37 /**
38 * A filter that forwards I/O events to {@link Executor} to enforce a certain
39 * thread model while allowing the events per session to be processed
40 * simultaneously. You can apply various thread model by inserting this filter
41 * to a {@link IoFilterChain}.
42 *
43 * <h2>Life Cycle Management</h2>
44 *
45 * Please note that this filter doesn't manage the life cycle of the {@link Executor}.
46 * If you created this filter using {@link #ExecutorFilter(Executor)} or similar
47 * constructor that accepts an {@link Executor} that you've instantiated, you have
48 * full control and responsibility of managing its life cycle (e.g. calling
49 * {@link ExecutorService#shutdown()}.
50 * <p>
51 * If you created this filter using convenience constructors like
52 * {@link #ExecutorFilter(int)}, then you can shut down the executor by calling
53 * {@link #destroy()} explicitly.
54 *
55 * <h2>Event Ordering</h2>
56 *
57 * All convenience constructors of this filter creates a new
58 * {@link OrderedThreadPoolExecutor} instance. Therefore, the order of event is
59 * maintained like the following:
60 * <ul>
61 * <li>All event handler methods are called exclusively.
62 * (e.g. messageReceived and messageSent can't be invoked at the same time.)</li>
63 * <li>The event order is never mixed up.
64 * (e.g. messageReceived is always invoked before sessionClosed or messageSent.)</li>
65 * </ul>
66 * However, if you specified other {@link Executor} instance in the constructor,
67 * the order of events are not maintained at all. This means more than one event
68 * handler methods can be invoked at the same time with mixed order. For example,
69 * let's assume that messageReceived, messageSent, and sessionClosed events are
70 * fired.
71 * <ul>
72 * <li>All event handler methods can be called simultaneously.
73 * (e.g. messageReceived and messageSent can be invoked at the same time.)</li>
74 * <li>The event order can be mixed up.
75 * (e.g. sessionClosed or messageSent can be invoked before messageReceived
76 * is invoked.)</li>
77 * </ul>
78 * If you need to maintain the order of events per session, please specify an
79 * {@link OrderedThreadPoolExecutor} instance or use the convenience constructors.
80 *
81 * <h2>Selective Filtering</h2>
82 *
83 * By default, all event types but <tt>sessionCreated</tt>, <tt>filterWrite</tt>,
84 * <tt>filterClose</tt> and <tt>filterSetTrafficMask</tt> are submitted to the
85 * underlying executor, which is most common setting.
86 * <p>
87 * If you want to submit only a certain set of event types, you can specify them
88 * in the constructor. For example, you could configure a thread pool for
89 * write operation for the maximum performance:
90 * <pre><code>
91 * IoService service = ...;
92 * DefaultIoFilterChainBuilder chain = service.getFilterChain();
93 *
94 * chain.addLast("codec", new ProtocolCodecFilter(...));
95 * // Use one thread pool for most events.
96 * chain.addLast("executor1", new ExecutorFilter());
97 * // and another dedicated thread pool for 'filterWrite' events.
98 * chain.addLast("executor2", new ExecutorFilter(IoEventType.WRITE));
99 * </code></pre>
100 *
101 * <h2>Preventing {@link OutOfMemoryError}</h2>
102 *
103 * Please refer to {@link IoEventQueueThrottle}, which is specified as
104 * a parameter of the convenience constructors.
105 *
106 * @author <a href="http://mina.apache.org">Apache MINA Project</a>
107 *
108 * @see OrderedThreadPoolExecutor
109 * @see UnorderedThreadPoolExecutor
110 * @org.apache.xbean.XBean
111 */
112 public class ExecutorFilter extends IoFilterAdapter {
113 /** The list of handled events */
114 private EnumSet<IoEventType> eventTypes;
115
116 /** The associated executor */
117 private Executor executor;
118
119 /** A flag set if the executor can be managed */
120 private boolean manageableExecutor;
121
122 /** The default pool size */
123 private static final int DEFAULT_MAX_POOL_SIZE = 16;
124
125 /** The number of thread to create at startup */
126 private static final int BASE_THREAD_NUMBER = 0;
127
128 /** The default KeepAlive time, in seconds */
129 private static final long DEFAULT_KEEPALIVE_TIME = 30;
130
131 /**
132 * A set of flags used to tell if the Executor has been created
133 * in the constructor or passed as an argument. In the second case,
134 * the executor state can be managed.
135 **/
136 private static final boolean MANAGEABLE_EXECUTOR = true;
137
138 private static final boolean NOT_MANAGEABLE_EXECUTOR = false;
139
140 /** A list of default EventTypes to be handled by the executor */
141 private static IoEventType[] DEFAULT_EVENT_SET = new IoEventType[] { IoEventType.EXCEPTION_CAUGHT,
142 IoEventType.MESSAGE_RECEIVED, IoEventType.MESSAGE_SENT, IoEventType.SESSION_CLOSED,
143 IoEventType.SESSION_IDLE, IoEventType.SESSION_OPENED };
144
145 /**
146 * (Convenience constructor) Creates a new instance with a new
147 * {@link OrderedThreadPoolExecutor}, no thread in the pool, and a
148 * maximum of 16 threads in the pool. All the event will be handled
149 * by this default executor.
150 */
151 public ExecutorFilter() {
152 // Create a new default Executor
153 Executor executor = createDefaultExecutor(BASE_THREAD_NUMBER, DEFAULT_MAX_POOL_SIZE, DEFAULT_KEEPALIVE_TIME,
154 TimeUnit.SECONDS, Executors.defaultThreadFactory(), null);
155
156 // Initialize the filter
157 init(executor, MANAGEABLE_EXECUTOR);
158 }
159
160 /**
161 * (Convenience constructor) Creates a new instance with a new
162 * {@link OrderedThreadPoolExecutor}, no thread in the pool, but
163 * a maximum of threads in the pool is given. All the event will be handled
164 * by this default executor.
165 *
166 * @param maximumPoolSize The maximum pool size
167 */
168 public ExecutorFilter(int maximumPoolSize) {
169 // Create a new default Executor
170 Executor executor = createDefaultExecutor(BASE_THREAD_NUMBER, maximumPoolSize, DEFAULT_KEEPALIVE_TIME,
171 TimeUnit.SECONDS, Executors.defaultThreadFactory(), null);
172
173 // Initialize the filter
174 init(executor, MANAGEABLE_EXECUTOR);
175 }
176
177 /**
178 * (Convenience constructor) Creates a new instance with a new
179 * {@link OrderedThreadPoolExecutor}, a number of thread to start with, a
180 * maximum of threads the pool can contain. All the event will be handled
181 * by this default executor.
182 *
183 * @param corePoolSize The initial pool size
184 * @param maximumPoolSize The maximum pool size
185 */
186 public ExecutorFilter(int corePoolSize, int maximumPoolSize) {
187 // Create a new default Executor
188 Executor executor = createDefaultExecutor(corePoolSize, maximumPoolSize, DEFAULT_KEEPALIVE_TIME,
189 TimeUnit.SECONDS, Executors.defaultThreadFactory(), null);
190
191 // Initialize the filter
192 init(executor, MANAGEABLE_EXECUTOR);
193 }
194
195 /**
196 * (Convenience constructor) Creates a new instance with a new
197 * {@link OrderedThreadPoolExecutor}.
198 *
199 * @param corePoolSize The initial pool size
200 * @param maximumPoolSize The maximum pool size
201 * @param keepAliveTime Default duration for a thread
202 * @param unit Time unit used for the keepAlive value
203 */
204 public ExecutorFilter(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit) {
205 // Create a new default Executor
206 Executor executor = createDefaultExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit,
207 Executors.defaultThreadFactory(), null);
208
209 // Initialize the filter
210 init(executor, MANAGEABLE_EXECUTOR);
211 }
212
213 /**
214 * (Convenience constructor) Creates a new instance with a new
215 * {@link OrderedThreadPoolExecutor}.
216 *
217 * @param corePoolSize The initial pool size
218 * @param maximumPoolSize The maximum pool size
219 * @param keepAliveTime Default duration for a thread
220 * @param unit Time unit used for the keepAlive value
221 * @param queueHandler The queue used to store events
222 */
223 public ExecutorFilter(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
224 IoEventQueueHandler queueHandler) {
225 // Create a new default Executor
226 Executor executor = createDefaultExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit,
227 Executors.defaultThreadFactory(), queueHandler);
228
229 // Initialize the filter
230 init(executor, MANAGEABLE_EXECUTOR);
231 }
232
233 /**
234 * (Convenience constructor) Creates a new instance with a new
235 * {@link OrderedThreadPoolExecutor}.
236 *
237 * @param corePoolSize The initial pool size
238 * @param maximumPoolSize The maximum pool size
239 * @param keepAliveTime Default duration for a thread
240 * @param unit Time unit used for the keepAlive value
241 * @param threadFactory The factory used to create threads
242 */
243 public ExecutorFilter(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
244 ThreadFactory threadFactory) {
245 // Create a new default Executor
246 Executor executor = createDefaultExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, threadFactory,
247 null);
248
249 // Initialize the filter
250 init(executor, MANAGEABLE_EXECUTOR);
251 }
252
253 /**
254 * (Convenience constructor) Creates a new instance with a new
255 * {@link OrderedThreadPoolExecutor}.
256 *
257 * @param corePoolSize The initial pool size
258 * @param maximumPoolSize The maximum pool size
259 * @param keepAliveTime Default duration for a thread
260 * @param unit Time unit used for the keepAlive value
261 * @param threadFactory The factory used to create threads
262 * @param queueHandler The queue used to store events
263 */
264 public ExecutorFilter(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
265 ThreadFactory threadFactory, IoEventQueueHandler queueHandler) {
266 // Create a new default Executor
267 Executor executor = new OrderedThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit,
268 threadFactory, queueHandler);
269
270 // Initialize the filter
271 init(executor, MANAGEABLE_EXECUTOR);
272 }
273
274 /**
275 * (Convenience constructor) Creates a new instance with a new
276 * {@link OrderedThreadPoolExecutor}.
277 *
278 * @param eventTypes The event for which the executor will be used
279 */
280 public ExecutorFilter(IoEventType... eventTypes) {
281 // Create a new default Executor
282 Executor executor = createDefaultExecutor(BASE_THREAD_NUMBER, DEFAULT_MAX_POOL_SIZE, DEFAULT_KEEPALIVE_TIME,
283 TimeUnit.SECONDS, Executors.defaultThreadFactory(), null);
284
285 // Initialize the filter
286 init(executor, MANAGEABLE_EXECUTOR, eventTypes);
287 }
288
289 /**
290 * (Convenience constructor) Creates a new instance with a new
291 * {@link OrderedThreadPoolExecutor}.
292 *
293 * @param maximumPoolSize The maximum pool size
294 * @param eventTypes The event for which the executor will be used
295 */
296 public ExecutorFilter(int maximumPoolSize, IoEventType... eventTypes) {
297 // Create a new default Executor
298 Executor executor = createDefaultExecutor(BASE_THREAD_NUMBER, maximumPoolSize, DEFAULT_KEEPALIVE_TIME,
299 TimeUnit.SECONDS, Executors.defaultThreadFactory(), null);
300
301 // Initialize the filter
302 init(executor, MANAGEABLE_EXECUTOR, eventTypes);
303 }
304
305 /**
306 * (Convenience constructor) Creates a new instance with a new
307 * {@link OrderedThreadPoolExecutor}.
308 *
309 * @param corePoolSize The initial pool size
310 * @param maximumPoolSize The maximum pool size
311 * @param eventTypes The event for which the executor will be used
312 */
313 public ExecutorFilter(int corePoolSize, int maximumPoolSize, IoEventType... eventTypes) {
314 // Create a new default Executor
315 Executor executor = createDefaultExecutor(corePoolSize, maximumPoolSize, DEFAULT_KEEPALIVE_TIME,
316 TimeUnit.SECONDS, Executors.defaultThreadFactory(), null);
317
318 // Initialize the filter
319 init(executor, MANAGEABLE_EXECUTOR, eventTypes);
320 }
321
322 /**
323 * (Convenience constructor) Creates a new instance with a new
324 * {@link OrderedThreadPoolExecutor}.
325 *
326 * @param corePoolSize The initial pool size
327 * @param maximumPoolSize The maximum pool size
328 * @param keepAliveTime Default duration for a thread
329 * @param unit Time unit used for the keepAlive value
330 * @param eventTypes The event for which the executor will be used
331 */
332 public ExecutorFilter(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
333 IoEventType... eventTypes) {
334 // Create a new default Executor
335 Executor executor = createDefaultExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit,
336 Executors.defaultThreadFactory(), null);
337
338 // Initialize the filter
339 init(executor, MANAGEABLE_EXECUTOR, eventTypes);
340 }
341
342 /**
343 * (Convenience constructor) Creates a new instance with a new
344 * {@link OrderedThreadPoolExecutor}.
345 *
346 * @param corePoolSize The initial pool size
347 * @param maximumPoolSize The maximum pool size
348 * @param keepAliveTime Default duration for a thread
349 * @param unit Time unit used for the keepAlive value
350 * @param queueHandler The queue used to store events
351 * @param eventTypes The event for which the executor will be used
352 */
353 public ExecutorFilter(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
354 IoEventQueueHandler queueHandler, IoEventType... eventTypes) {
355 // Create a new default Executor
356 Executor executor = createDefaultExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit,
357 Executors.defaultThreadFactory(), queueHandler);
358
359 // Initialize the filter
360 init(executor, MANAGEABLE_EXECUTOR, eventTypes);
361 }
362
363 /**
364 * (Convenience constructor) Creates a new instance with a new
365 * {@link OrderedThreadPoolExecutor}.
366 *
367 * @param corePoolSize The initial pool size
368 * @param maximumPoolSize The maximum pool size
369 * @param keepAliveTime Default duration for a thread
370 * @param unit Time unit used for the keepAlive value
371 * @param threadFactory The factory used to create threads
372 * @param eventTypes The event for which the executor will be used
373 */
374 public ExecutorFilter(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
375 ThreadFactory threadFactory, IoEventType... eventTypes) {
376 // Create a new default Executor
377 Executor executor = createDefaultExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, threadFactory,
378 null);
379
380 // Initialize the filter
381 init(executor, MANAGEABLE_EXECUTOR, eventTypes);
382 }
383
384 /**
385 * (Convenience constructor) Creates a new instance with a new
386 * {@link OrderedThreadPoolExecutor}.
387 *
388 * @param corePoolSize The initial pool size
389 * @param maximumPoolSize The maximum pool size
390 * @param keepAliveTime Default duration for a thread
391 * @param unit Time unit used for the keepAlive value
392 * @param threadFactory The factory used to create threads
393 * @param queueHandler The queue used to store events
394 * @param eventTypes The event for which the executor will be used
395 */
396 public ExecutorFilter(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
397 ThreadFactory threadFactory, IoEventQueueHandler queueHandler, IoEventType... eventTypes) {
398 // Create a new default Executor
399 Executor executor = new OrderedThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit,
400 threadFactory, queueHandler);
401
402 // Initialize the filter
403 init(executor, MANAGEABLE_EXECUTOR, eventTypes);
404 }
405
406 /**
407 * Creates a new instance with the specified {@link Executor}.
408 *
409 * @param executor the user's managed Executor to use in this filter
410 */
411 public ExecutorFilter(Executor executor) {
412 // Initialize the filter
413 init(executor, NOT_MANAGEABLE_EXECUTOR);
414 }
415
416 /**
417 * Creates a new instance with the specified {@link Executor}.
418 *
419 * @param executor the user's managed Executor to use in this filter
420 * @param eventTypes The event for which the executor will be used
421 */
422 public ExecutorFilter(Executor executor, IoEventType... eventTypes) {
423 // Initialize the filter
424 init(executor, NOT_MANAGEABLE_EXECUTOR, eventTypes);
425 }
426
427 /**
428 * Create an OrderedThreadPool executor.
429 *
430 * @param corePoolSize The initial pool sizePoolSize
431 * @param maximumPoolSize The maximum pool size
432 * @param keepAliveTime Default duration for a thread
433 * @param unit Time unit used for the keepAlive value
434 * @param threadFactory The factory used to create threads
435 * @param queueHandler The queue used to store events
436 * @return An instance of the created Executor
437 */
438 private Executor createDefaultExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
439 ThreadFactory threadFactory, IoEventQueueHandler queueHandler) {
440 // Create a new Executor
441 Executor executor = new OrderedThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit,
442 threadFactory, queueHandler);
443
444 return executor;
445 }
446
447 /**
448 * Create an EnumSet from an array of EventTypes, and set the associated
449 * eventTypes field.
450 *
451 * @param eventTypes The array of handled events
452 */
453 private void initEventTypes(IoEventType... eventTypes) {
454 if ((eventTypes == null) || (eventTypes.length == 0)) {
455 eventTypes = DEFAULT_EVENT_SET;
456 }
457
458 // Copy the list of handled events in the event set
459 this.eventTypes = EnumSet.of(eventTypes[0], eventTypes);
460
461 // Check that we don't have the SESSION_CREATED event in the set
462 if (this.eventTypes.contains(IoEventType.SESSION_CREATED)) {
463 this.eventTypes = null;
464 throw new IllegalArgumentException(IoEventType.SESSION_CREATED + " is not allowed.");
465 }
466 }
467
468 /**
469 * Creates a new instance of ExecutorFilter. This private constructor is called by all
470 * the public constructor.
471 *
472 * @param executor The underlying {@link Executor} in charge of managing the Thread pool.
473 * @param manageableExecutor Tells if the Executor's Life Cycle can be managed or not
474 * @param eventTypes The lit of event which are handled by the executor
475 * @param
476 */
477 private void init(Executor executor, boolean manageableExecutor, IoEventType... eventTypes) {
478 if (executor == null) {
479 throw new IllegalArgumentException("executor");
480 }
481
482 initEventTypes(eventTypes);
483 this.executor = executor;
484 this.manageableExecutor = manageableExecutor;
485 }
486
487 /**
488 * Shuts down the underlying executor if this filter hase been created via
489 * a convenience constructor.
490 */
491 @Override
492 public void destroy() {
493 if (manageableExecutor) {
494 ((ExecutorService) executor).shutdown();
495 }
496 }
497
498 /**
499 * Returns the underlying {@link Executor} instance this filter uses.
500 *
501 * @return The underlying {@link Executor}
502 */
503 public final Executor getExecutor() {
504 return executor;
505 }
506
507 /**
508 * Fires the specified event through the underlying executor.
509 *
510 * @param event The filtered event
511 */
512 protected void fireEvent(IoFilterEvent event) {
513 executor.execute(event);
514 }
515
516 /**
517 * {@inheritDoc}
518 */
519 @Override
520 public void onPreAdd(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception {
521 if (parent.contains(this)) {
522 throw new IllegalArgumentException(
523 "You can't add the same filter instance more than once. Create another instance and add it.");
524 }
525 }
526
527 /**
528 * {@inheritDoc}
529 */
530 @Override
531 public final void sessionOpened(NextFilter nextFilter, IoSession session) {
532 if (eventTypes.contains(IoEventType.SESSION_OPENED)) {
533 IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.SESSION_OPENED, session, null);
534 fireEvent(event);
535 } else {
536 nextFilter.sessionOpened(session);
537 }
538 }
539
540 /**
541 * {@inheritDoc}
542 */
543 @Override
544 public final void sessionClosed(NextFilter nextFilter, IoSession session) {
545 if (eventTypes.contains(IoEventType.SESSION_CLOSED)) {
546 IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.SESSION_CLOSED, session, null);
547 fireEvent(event);
548 } else {
549 nextFilter.sessionClosed(session);
550 }
551 }
552
553 /**
554 * {@inheritDoc}
555 */
556 @Override
557 public final void sessionIdle(NextFilter nextFilter, IoSession session, IdleStatus status) {
558 if (eventTypes.contains(IoEventType.SESSION_IDLE)) {
559 IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.SESSION_IDLE, session, status);
560 fireEvent(event);
561 } else {
562 nextFilter.sessionIdle(session, status);
563 }
564 }
565
566 /**
567 * {@inheritDoc}
568 */
569 @Override
570 public final void exceptionCaught(NextFilter nextFilter, IoSession session, Throwable cause) {
571 if (eventTypes.contains(IoEventType.EXCEPTION_CAUGHT)) {
572 IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.EXCEPTION_CAUGHT, session, cause);
573 fireEvent(event);
574 } else {
575 nextFilter.exceptionCaught(session, cause);
576 }
577 }
578
579 /**
580 * {@inheritDoc}
581 */
582 @Override
583 public final void messageReceived(NextFilter nextFilter, IoSession session, Object message) {
584 if (eventTypes.contains(IoEventType.MESSAGE_RECEIVED)) {
585 IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.MESSAGE_RECEIVED, session, message);
586 fireEvent(event);
587 } else {
588 nextFilter.messageReceived(session, message);
589 }
590 }
591
592 /**
593 * {@inheritDoc}
594 */
595 @Override
596 public final void messageSent(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) {
597 if (eventTypes.contains(IoEventType.MESSAGE_SENT)) {
598 IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.MESSAGE_SENT, session, writeRequest);
599 fireEvent(event);
600 } else {
601 nextFilter.messageSent(session, writeRequest);
602 }
603 }
604
605 /**
606 * {@inheritDoc}
607 */
608 @Override
609 public final void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) {
610 if (eventTypes.contains(IoEventType.WRITE)) {
611 IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.WRITE, session, writeRequest);
612 fireEvent(event);
613 } else {
614 nextFilter.filterWrite(session, writeRequest);
615 }
616 }
617
618 /**
619 * {@inheritDoc}
620 */
621 @Override
622 public final void filterClose(NextFilter nextFilter, IoSession session) throws Exception {
623 if (eventTypes.contains(IoEventType.CLOSE)) {
624 IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.CLOSE, session, null);
625 fireEvent(event);
626 } else {
627 nextFilter.filterClose(session);
628 }
629 }
630 }