View Javadoc
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.filter.executor;
21  
22  import static org.junit.Assert.assertEquals;
23  
24  import java.lang.reflect.Field;
25  import java.util.concurrent.TimeUnit;
26  import java.util.concurrent.atomic.AtomicInteger;
27  
28  import org.apache.mina.core.filterchain.IoFilter;
29  import org.apache.mina.core.session.DummySession;
30  import org.apache.mina.core.session.IdleStatus;
31  import org.apache.mina.core.session.IoSession;
32  import org.apache.mina.core.write.WriteRequest;
33  import org.junit.Test;
34  
35  /**
36   * Tests that verify the functionality provided by the implementation of
37   * {@link OrderedThreadPoolExecutor}.
38   *
39   * @author Guus der Kinderen, guus.der.kinderen@gmail.com
40   */
41  public class OrderedThreadPoolExecutorTest
42  {
43      /**
44       * Tests the state of OrderedThreadPoolExecutor Workers
45       * after a RuntimeException is thrown when the OrderedThreadPoolExecutor Worker is running.
46       *
47       * Note that the implementation of this test is <em>not representative</em> of how tasks are normally executed, as
48       * tasks would ordinarily be 'wrapped' in a FilterChain. Most FilterChain implementations would catch the
49       * RuntimeException that is being used in the implementation of this test. The purpose of this test is to verify
50       * Worker's behavior when a RuntimeException is thrown during execution occurs (even if that RuntimeException cannot
51       * occur in the way that this test simulates it). A test that implements the execution in a more realistic manner is
52       * provided in {@link org.apache.mina.handler.DIRMINA1156Test}.
53       *
54       * @see org.apache.mina.handler.DIRMINA1156Test
55       * @see <a href="https://issues.apache.org/jira/browse/DIRMINA-1132">Issue DIRMINA-1156: Inconsistent worker / idleWorker in ThreadPoolExecutors</a>
56       * @throws Throwable an exception
57       */
58      @Test
59      public void testRuntimeExceptionInWorkerRun() throws Throwable
60      {
61          // Set up test fixture.
62          int corePoolSize = 1; // Prevent an idle worker from being cleaned up, which would skew the results of this test.
63          OrderedThreadPoolExecutor executor = new OrderedThreadPoolExecutor(corePoolSize,1);
64          IoFilter.NextFilter nextFilter = new NextFilterAdapter() {
65              @Override
66              public void messageReceived(IoSession session, Object message) {
67                  throw new RuntimeException("A RuntimeException thrown during unit testing.");
68              }
69          };
70          DummySession session = new DummySession();
71          ExecutorFilter filter = new ExecutorFilter(executor);
72  
73          try {
74              // Execute system under test.
75              filter.messageReceived(nextFilter, session, null);
76  
77              // Shutting down and awaiting termination ensures that test execution blocks until Worker execution has happened.
78              executor.shutdown();
79              if (!executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS)) {
80                  throw new IllegalStateException("Bug in test implementation.");
81              }
82  
83              // Verify results.
84              final Field idleWorkersField = OrderedThreadPoolExecutor.class.getDeclaredField("idleWorkers"); // Using reflection as the field is not accessible. It might be nicer to make the field package-protected for testing.
85              idleWorkersField.setAccessible(true);
86              final AtomicInteger idleWorkers = (AtomicInteger) idleWorkersField.get(executor);
87              assertEquals("After all tasks have finished, the amount of workers that are idle should equal the amount of workers, but did not.", executor.getPoolSize(), idleWorkers.get());
88          } finally {
89              // Clean up test fixture.
90              if (!executor.isShutdown()) {
91                  executor.shutdownNow();
92              }
93          }
94      }
95  
96      /**
97       * Tests the state of OrderedThreadPoolExecutor workers
98       * after an Error is thrown when the OrderedThreadPoolExecutor Worker is running.
99       *
100      * Note that the implementation of this test is <em>not representative</em> of how tasks are normally executed, as
101      * tasks would ordinarily be 'wrapped' in a FilterChain. Most FilterChain implementations would catch the Error that
102      * is being used in the implementation of this test. The purpose of this test is to verify Worker's behavior when an
103      * Error is thrown during execution occurs (even if that Error cannot occur in the way that this test simulates it).
104      * A test that implements the execution in a more realistic manner is provided in
105      * {@link org.apache.mina.handler.DIRMINA1156Test}.
106      *
107      * @see org.apache.mina.handler.DIRMINA1156Test
108      * @see <a href="https://issues.apache.org/jira/browse/DIRMINA-1132">Issue DIRMINA-1156: Inconsistent worker / idleWorker in ThreadPoolExecutors</a>
109      * @throws Throwable an exception
110      */
111     @Test
112     public void testErrorInWorkerRun() throws Throwable
113     {
114         // Set up test fixture.
115         int corePoolSize = 1; // Prevent an idle worker from being cleaned up, which would skew the results of this test.
116         OrderedThreadPoolExecutor executor = new OrderedThreadPoolExecutor(corePoolSize,1);
117         IoFilter.NextFilter nextFilter = new NextFilterAdapter() {
118             @Override
119             public void messageReceived(IoSession session, Object message) {
120                 throw new Error("An Error thrown during unit testing.");
121             }
122         };
123         DummySession session = new DummySession();
124         ExecutorFilter filter = new ExecutorFilter(executor);
125 
126         try {
127             // Execute system under test.
128             filter.messageReceived(nextFilter, session, null);
129 
130             // Ensure that the task has been executed in the executor.
131             executor.shutdown(); // Shutting down and awaiting termination ensures that test execution blocks until Worker execution has happened.
132             if (!executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS)) {
133                 throw new IllegalStateException("Bug in test implementation.");
134             }
135 
136             // Verify results.
137             final Field idleWorkersField = OrderedThreadPoolExecutor.class.getDeclaredField("idleWorkers"); // Using reflection as the field is not accessible. It might be nicer to make the field package-protected for testing.
138             idleWorkersField.setAccessible(true);
139             final AtomicInteger idleWorkers = (AtomicInteger) idleWorkersField.get(executor);
140             assertEquals("After all tasks have finished, the amount of workers that are idle should equal the amount of workers, but did not.", executor.getPoolSize(), idleWorkers.get());
141         } finally {
142             // Clean up test fixture.
143             if (!executor.isShutdown()) {
144                 executor.shutdownNow();
145             }
146         }
147     }
148 
149     /**
150      * Empty implementation of IoFilter.NextFilterAdapter, intended to facilitate easy subclassing.
151      */
152     private abstract static class NextFilterAdapter implements IoFilter.NextFilter {
153         public void sessionOpened(IoSession session) {}
154         public void sessionClosed(IoSession session) {}
155         public void sessionIdle(IoSession session, IdleStatus status) {}
156         public void exceptionCaught(IoSession session, Throwable cause) {}
157         public void inputClosed(IoSession session) {}
158         public void messageReceived(IoSession session, Object message) {}
159         public void messageSent(IoSession session, WriteRequest writeRequest) {}
160         public void filterWrite(IoSession session, WriteRequest writeRequest) {}
161         public void filterClose(IoSession session) {}
162         public void sessionCreated(IoSession session) {}
163     }
164 }