View Javadoc
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.filter.executor;
21  
22  import static org.junit.Assert.assertEquals;
23  import static org.junit.Assert.assertTrue;
24  
25  import java.lang.reflect.Field;
26  import java.util.ArrayList;
27  import java.util.Comparator;
28  import java.util.List;
29  import java.util.concurrent.ExecutorService;
30  import java.util.concurrent.TimeUnit;
31  import java.util.concurrent.atomic.AtomicInteger;
32  
33  import org.apache.mina.core.filterchain.IoFilter;
34  import org.apache.mina.core.session.DummySession;
35  import org.apache.mina.core.session.IdleStatus;
36  import org.apache.mina.core.session.IoSession;
37  import org.apache.mina.core.write.WriteRequest;
38  import org.junit.Ignore;
39  import org.junit.Test;
40  
41  /**
42   * Tests that verify the functionality provided by the implementation of
43   * {@link PriorityThreadPoolExecutor}.
44   *
45   * @author Guus der Kinderen, guus.der.kinderen@gmail.com
46   */
47  public class PriorityThreadPoolExecutorTest {
48      /**
49       * Tests that verify the functionality provided by the implementation of
50       * {@link org.apache.mina.filter.executor.PriorityThreadPoolExecutor.SessionEntry}
51       * .
52       *
53       * This test asserts that, without a provided comparator, entries are
54       * considered equal, when they reference the same session.
55       * @throws Exception an exception
56       */
57      @Test
58      public void fifoEntryTestNoComparatorSameSession() throws Exception {
59          // Set up fixture.
60          IoSession session = new DummySession();
61          PriorityThreadPoolExecutor.SessionEntry first = new PriorityThreadPoolExecutor.SessionEntry(session, null);
62          PriorityThreadPoolExecutor.SessionEntry last = new PriorityThreadPoolExecutor.SessionEntry(session, null);
63      
64          // Execute system under test.
65          int result = first.compareTo(last);
66      
67          // Verify results.
68          assertEquals("Without a comparator, entries of the same session are expected to be equal.", 0, result);
69      }
70  
71      /**
72       * Tests that verify the functionality provided by the implementation of
73       * {@link org.apache.mina.filter.executor.PriorityThreadPoolExecutor.SessionEntry}
74       * .
75       *
76       * This test asserts that, without a provided comparator, the first entry
77       * created is 'less than' an entry that is created later.
78       * @throws Exception an exception
79       */
80      @Test
81      public void fifoEntryTestNoComparatorDifferentSession() throws Exception {
82          // Set up fixture (the order in which the entries are created is
83          // relevant here!)
84          PriorityThreadPoolExecutor.SessionEntry first = new PriorityThreadPoolExecutor.SessionEntry(new DummySession(), null);
85          PriorityThreadPoolExecutor.SessionEntry last = new PriorityThreadPoolExecutor.SessionEntry(new DummySession(), null);
86          
87          // Execute system under test.
88          int result = first.compareTo(last);
89          
90          // Verify results.
91          assertTrue("Without a comparator, the first entry created should be the first entry out. Expected a negative result, instead, got: " + result, result < 0);
92      }
93  
94      /**
95       * Tests that verify the functionality provided by the implementation of
96       * {@link org.apache.mina.filter.executor.PriorityThreadPoolExecutor.SessionEntry}
97       * .
98       *
99       * This test asserts that, with a provided comparator, entries are
100      * considered equal, when they reference the same session (the provided
101      * comparator is ignored).
102      * @throws Exception an exception
103      */
104     @Test
105     public void fifoEntryTestWithComparatorSameSession() throws Exception {
106         // Set up fixture.
107         IoSession session = new DummySession();
108         final int predeterminedResult = 3853;
109         
110         Comparator<IoSession> comparator = new Comparator<IoSession>() {
111             @Override
112             public int compare(IoSession o1, IoSession o2) {
113                 return predeterminedResult;
114             }
115         };
116         
117         PriorityThreadPoolExecutor.SessionEntry first = new PriorityThreadPoolExecutor.SessionEntry(session, comparator);
118         PriorityThreadPoolExecutor.SessionEntry last = new PriorityThreadPoolExecutor.SessionEntry(session, comparator);
119         
120         // Execute system under test.
121         int result = first.compareTo(last);
122         
123         // Verify results.
124         assertEquals("With a comparator, entries of the same session are expected to be equal.", 0, result);
125     }
126 
127     /**
128      * Tests that verify the functionality provided by the implementation of
129      * {@link org.apache.mina.filter.executor.PriorityThreadPoolExecutor.SessionEntry}
130      * .
131      *
132      * This test asserts that a provided comparator is used instead of the
133      * (fallback) default behavior (when entries are referring different
134      * sessions).
135      * @throws Exception an exception
136      */
137     @Test
138     public void fifoEntryTestComparatorDifferentSession() throws Exception {
139         // Set up fixture (the order in which the entries are created is
140         // relevant here!)
141         final int predeterminedResult = 3853;
142         
143         Comparator<IoSession> comparator = new Comparator<IoSession>() {
144             @Override
145             public int compare(IoSession o1, IoSession o2) {
146                 return predeterminedResult;
147             }
148         };
149         
150         PriorityThreadPoolExecutor.SessionEntry first = new PriorityThreadPoolExecutor.SessionEntry(new DummySession(), comparator);
151         PriorityThreadPoolExecutor.SessionEntry last = new PriorityThreadPoolExecutor.SessionEntry(new DummySession(), comparator);
152         
153         // Execute system under test.
154         int result = first.compareTo(last);
155         
156         // Verify results.
157         assertEquals("With a comparator, comparing entries of different sessions is expected to yield the comparator result.", predeterminedResult, result);
158     }
159 
160     /**
161      * Asserts that, when enough work is being submitted to the executor for it
162      * to start queuing work, prioritisation of work starts to occur.
163      *
164      * This implementation starts a number of sessions, and evenly distributes a
165      * number of messages to them. Processing each message is artificially made
166      * 'expensive', while the executor pool is kept small. This causes work to
167      * be queued in the executor.
168      *
169      * The executor that is used is configured to prefer one specific session.
170      * Each session records the timestamp of its last activity. After all work
171      * has been processed, the test asserts that the last activity of all
172      * sessions was later than the last activity of the preferred session.
173      * @throws Exception an exception
174      */
175     @Test
176     @Ignore("This test faiuls randomly")
177     public void testPrioritisation() throws Throwable {
178         // Set up fixture.
179         MockWorkFilter nextFilter = new MockWorkFilter();
180         List<LastActivityTracker> sessions = new ArrayList<>();
181         
182         for (int i = 0; i < 10; i++) {
183             sessions.add(new LastActivityTracker());
184         }
185         
186         LastActivityTracker preferredSession = sessions.get(4); // prefer an arbitrary session
187                                                                 // (but not the first or last
188                                                                 // session, for good measure).
189         Comparator<IoSession> comparator = new UnfairComparator(preferredSession);
190         int maximumPoolSize = 1; // keep this low, to force resource contention.
191         int amountOfTasks = 400;
192         
193         ExecutorService executor = new PriorityThreadPoolExecutor(maximumPoolSize, comparator);
194         ExecutorFilter filter = new ExecutorFilter(executor);
195         
196         // Execute system under test.
197         for (int i = 0; i < amountOfTasks; i++) {
198             int sessionIndex = i % sessions.size();
199             
200             LastActivityTracker currentSession = sessions.get(sessionIndex);
201             filter.messageReceived(nextFilter, currentSession, null);
202         
203             if (nextFilter.throwable != null) {
204                 throw nextFilter.throwable;
205             }
206         }
207         
208         executor.shutdown();
209         
210         // Verify results.
211         executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
212         
213         for (LastActivityTracker session : sessions) {
214             if (session != preferredSession) {
215                 assertTrue("All other sessions should have finished later than the preferred session (but at least one did not).", 
216                     session.lastActivity > preferredSession.lastActivity);
217             }
218         }
219     }
220 
221     /**
222      * Tests the state of PriorityThreadPoolExecutor workers
223      * after a RuntimeException is thrown when the PriorityThreadPoolExecutor Worker is running.
224      *
225      * Note that the implementation of this test is <em>not representative</em> of how tasks are normally executed, as
226      * tasks would ordinarily be 'wrapped' in a FilterChain. Most FilterChain implementations would catch the
227      * RuntimeException that is being used in the implementation of this test. The purpose of this test is to verify
228      * Worker's behavior when a RuntimeException is thrown during execution occurs (even if that RuntimeException cannot
229      * occur in the way that this test simulates it). A test that implements the execution in a more realistic manner is
230      * provided in {@link org.apache.mina.handler.DIRMINA1156Test}.
231      *
232      * @see org.apache.mina.handler.DIRMINA1156Test
233      * @see <a href="https://issues.apache.org/jira/browse/DIRMINA-1132">Issue DIRMINA-1156: Inconsistent worker / idleWorker in ThreadPoolExecutors</a>
234      * @throws Exception an exception
235      */
236     @Test
237     public void testRuntimeExceptionInWorkerRun() throws Throwable
238     {
239         // Set up test fixture.
240         int corePoolSize = 1; // Prevent an idle worker from being cleaned up, which would skew the results of this test.
241         PriorityThreadPoolExecutor executor = new PriorityThreadPoolExecutor(corePoolSize,1);
242         IoFilter.NextFilter nextFilter = new PriorityThreadPoolExecutorTest.NextFilterAdapter() {
243             @Override
244             public void messageReceived(IoSession session, Object message) {
245                 throw new RuntimeException("A RuntimeException thrown during unit testing.");
246             }
247         };
248         DummySession session = new DummySession();
249         ExecutorFilter filter = new ExecutorFilter(executor);
250 
251         try {
252             // Execute system under test.
253             filter.messageReceived(nextFilter, session, null);
254 
255             // Shutting down and awaiting termination ensures that test execution blocks until Worker execution has happened.
256             executor.shutdown();
257             if (!executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS)) {
258                 throw new IllegalStateException("Bug in test implementation.");
259             }
260 
261             // Verify results.
262             final Field idleWorkersField = PriorityThreadPoolExecutor.class.getDeclaredField("idleWorkers"); // Using reflection as the field is not accessible. It might be nicer to make the field package-protected for testing.
263             idleWorkersField.setAccessible(true);
264             final AtomicInteger idleWorkers = (AtomicInteger) idleWorkersField.get(executor);
265             assertEquals("After all tasks have finished, the amount of workers that are idle should equal the amount of workers, but did not.", executor.getPoolSize(), idleWorkers.get());
266         } finally {
267             // Clean up test fixture.
268             if (!executor.isShutdown()) {
269                 executor.shutdownNow();
270             }
271         }
272     }
273 
274     /**
275      * Tests the state of PriorityThreadPoolExecutor workers
276      * after an Error is thrown when the PriorityThreadPoolExecutor worker is running.
277      *
278      * Note that the implementation of this test is <em>not representative</em> of how tasks are normally executed, as
279      * tasks would ordinarily be 'wrapped' in a FilterChain. Most FilterChain implementations would catch the Error that
280      * is being used in the implementation of this test. The purpose of this test is to verify Worker's behavior when an
281      * Error is thrown during execution occurs (even if that Error cannot occur in the way that this test simulates it).
282      * A test that implements the execution in a more realistic manner is provided in
283      * {@link org.apache.mina.handler.DIRMINA1156Test}.
284      *
285      * @see org.apache.mina.handler.DIRMINA1156Test
286      * @see <a href="https://issues.apache.org/jira/browse/DIRMINA-1132">Issue DIRMINA-1156: Inconsistent worker / idleWorker in ThreadPoolExecutors</a>
287      * @throws Exception an exception
288      */
289     @Test
290     public void testErrorInWorkerRun() throws Throwable
291     {
292         // Set up test fixture.
293         int corePoolSize = 1; // Prevent an idle worker from being cleaned up, which would skew the results of this test.
294         PriorityThreadPoolExecutor executor = new PriorityThreadPoolExecutor(corePoolSize,1);
295         IoFilter.NextFilter nextFilter = new PriorityThreadPoolExecutorTest.NextFilterAdapter() {
296             @Override
297             public void messageReceived(IoSession session, Object message) {
298                 throw new Error("An Error thrown during unit testing.");
299             }
300         };
301         DummySession session = new DummySession();
302         ExecutorFilter filter = new ExecutorFilter(executor);
303 
304         try {
305             // Execute system under test.
306             filter.messageReceived(nextFilter, session, null);
307 
308             // Ensure that the task has been executed in the executor.
309             executor.shutdown(); // Shutting down and awaiting termination ensures that test execution blocks until Worker execution has happened.
310             if (!executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS)) {
311                 throw new IllegalStateException("Bug in test implementation.");
312             }
313 
314             // Verify results.
315             final Field idleWorkersField = PriorityThreadPoolExecutor.class.getDeclaredField("idleWorkers"); // Using reflection as the field is not accessible. It might be nicer to make the field package-protected for testing.
316             idleWorkersField.setAccessible(true);
317             final AtomicInteger idleWorkers = (AtomicInteger) idleWorkersField.get(executor);
318             assertEquals("After all tasks have finished, the amount of workers that are idle should equal the amount of workers, but did not.", executor.getPoolSize(), idleWorkers.get());
319         } finally {
320             // Clean up test fixture.
321             if (!executor.isShutdown()) {
322                 executor.shutdownNow();
323             }
324         }
325     }
326 
327     /**
328      * A comparator that prefers a particular session.
329      */
330     private static class UnfairComparator implements Comparator<IoSession> {
331         private IoSession preferred;
332         
333         public UnfairComparator(IoSession preferred) {
334             this.preferred = preferred;
335         }
336         
337         @Override
338         public int compare(IoSession o1, IoSession o2) {
339             if (o1 == preferred) {
340                 return -1;
341             }
342         
343             if (o2 == preferred) {
344                 return 1;
345             }
346         
347             return 0;
348         }
349     }
350 
351     /**
352      * A session that tracks the timestamp of last activity.
353      */
354     private static class LastActivityTracker extends DummySession {
355         long lastActivity = System.currentTimeMillis();
356 
357         public synchronized void setLastActivity() {
358             lastActivity = System.currentTimeMillis();
359         }
360     }
361 
362     /**
363      * A filter that simulates a non-negligible amount of work.
364      */
365     private static class MockWorkFilter implements IoFilter.NextFilter {
366         Throwable throwable;
367         
368         public void sessionOpened(IoSession session) {
369             // Do nothing
370         }
371         
372         public void sessionClosed(IoSession session) {
373             // Do nothing
374         }
375         
376         public void sessionIdle(IoSession session, IdleStatus status) {
377             // Do nothing
378         }
379         
380         public void exceptionCaught(IoSession session, Throwable cause) {
381             // Do nothing
382         }
383         
384         public void inputClosed(IoSession session) {
385             // Do nothing
386         }
387         
388         public void messageReceived(IoSession session, Object message) {
389             try {
390                 Thread.sleep(20); // mimic work.
391                 ((LastActivityTracker) session).setLastActivity();
392             } catch (Exception e) {
393                 if (this.throwable == null) {
394                     this.throwable = e;
395                 }
396             }
397         }
398         
399         public void messageSent(IoSession session, WriteRequest writeRequest) {
400             // Do nothing
401         }
402         
403         public void filterWrite(IoSession session, WriteRequest writeRequest) {
404             // Do nothing
405         }
406         
407         public void filterClose(IoSession session) {
408             // Do nothing
409         }
410         
411         public void sessionCreated(IoSession session) {
412             // Do nothing
413         }
414     }
415 
416     /**
417      * Empty implementation of IoFilter.NextFilterAdapter, intended to facilitate easy subclassing.
418      */
419     private abstract static class NextFilterAdapter implements IoFilter.NextFilter {
420         public void sessionOpened(IoSession session) {}
421         public void sessionClosed(IoSession session) {}
422         public void sessionIdle(IoSession session, IdleStatus status) {}
423         public void exceptionCaught(IoSession session, Throwable cause) {}
424         public void inputClosed(IoSession session) {}
425         public void messageReceived(IoSession session, Object message) {}
426         public void messageSent(IoSession session, WriteRequest writeRequest) {}
427         public void filterWrite(IoSession session, WriteRequest writeRequest) {}
428         public void filterClose(IoSession session) {}
429         public void sessionCreated(IoSession session) {}
430     }
431 }