1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.filter.executor;
21
22 import static org.junit.Assert.assertEquals;
23 import static org.junit.Assert.assertTrue;
24
25 import java.lang.reflect.Field;
26 import java.util.ArrayList;
27 import java.util.Comparator;
28 import java.util.List;
29 import java.util.concurrent.ExecutorService;
30 import java.util.concurrent.TimeUnit;
31 import java.util.concurrent.atomic.AtomicInteger;
32
33 import org.apache.mina.core.filterchain.IoFilter;
34 import org.apache.mina.core.session.DummySession;
35 import org.apache.mina.core.session.IdleStatus;
36 import org.apache.mina.core.session.IoSession;
37 import org.apache.mina.core.write.WriteRequest;
38 import org.junit.Ignore;
39 import org.junit.Test;
40
41
42
43
44
45
46
47 public class PriorityThreadPoolExecutorTest {
48
49
50
51
52
53
54
55
56
57 @Test
58 public void fifoEntryTestNoComparatorSameSession() throws Exception {
59
60 IoSession session = new DummySession();
61 PriorityThreadPoolExecutor.SessionEntry first = new PriorityThreadPoolExecutor.SessionEntry(session, null);
62 PriorityThreadPoolExecutor.SessionEntry last = new PriorityThreadPoolExecutor.SessionEntry(session, null);
63
64
65 int result = first.compareTo(last);
66
67
68 assertEquals("Without a comparator, entries of the same session are expected to be equal.", 0, result);
69 }
70
71
72
73
74
75
76
77
78
79
80 @Test
81 public void fifoEntryTestNoComparatorDifferentSession() throws Exception {
82
83
84 PriorityThreadPoolExecutor.SessionEntry first = new PriorityThreadPoolExecutor.SessionEntry(new DummySession(), null);
85 PriorityThreadPoolExecutor.SessionEntry last = new PriorityThreadPoolExecutor.SessionEntry(new DummySession(), null);
86
87
88 int result = first.compareTo(last);
89
90
91 assertTrue("Without a comparator, the first entry created should be the first entry out. Expected a negative result, instead, got: " + result, result < 0);
92 }
93
94
95
96
97
98
99
100
101
102
103
104 @Test
105 public void fifoEntryTestWithComparatorSameSession() throws Exception {
106
107 IoSession session = new DummySession();
108 final int predeterminedResult = 3853;
109
110 Comparator<IoSession> comparator = new Comparator<IoSession>() {
111 @Override
112 public int compare(IoSession o1, IoSession o2) {
113 return predeterminedResult;
114 }
115 };
116
117 PriorityThreadPoolExecutor.SessionEntry first = new PriorityThreadPoolExecutor.SessionEntry(session, comparator);
118 PriorityThreadPoolExecutor.SessionEntry last = new PriorityThreadPoolExecutor.SessionEntry(session, comparator);
119
120
121 int result = first.compareTo(last);
122
123
124 assertEquals("With a comparator, entries of the same session are expected to be equal.", 0, result);
125 }
126
127
128
129
130
131
132
133
134
135
136
137 @Test
138 public void fifoEntryTestComparatorDifferentSession() throws Exception {
139
140
141 final int predeterminedResult = 3853;
142
143 Comparator<IoSession> comparator = new Comparator<IoSession>() {
144 @Override
145 public int compare(IoSession o1, IoSession o2) {
146 return predeterminedResult;
147 }
148 };
149
150 PriorityThreadPoolExecutor.SessionEntry first = new PriorityThreadPoolExecutor.SessionEntry(new DummySession(), comparator);
151 PriorityThreadPoolExecutor.SessionEntry last = new PriorityThreadPoolExecutor.SessionEntry(new DummySession(), comparator);
152
153
154 int result = first.compareTo(last);
155
156
157 assertEquals("With a comparator, comparing entries of different sessions is expected to yield the comparator result.", predeterminedResult, result);
158 }
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175 @Test
176 @Ignore("This test faiuls randomly")
177 public void testPrioritisation() throws Throwable {
178
179 MockWorkFilter nextFilter = new MockWorkFilter();
180 List<LastActivityTracker> sessions = new ArrayList<>();
181
182 for (int i = 0; i < 10; i++) {
183 sessions.add(new LastActivityTracker());
184 }
185
186 LastActivityTracker preferredSession = sessions.get(4);
187
188
189 Comparator<IoSession> comparator = new UnfairComparator(preferredSession);
190 int maximumPoolSize = 1;
191 int amountOfTasks = 400;
192
193 ExecutorService executor = new PriorityThreadPoolExecutor(maximumPoolSize, comparator);
194 ExecutorFilter filter = new ExecutorFilter(executor);
195
196
197 for (int i = 0; i < amountOfTasks; i++) {
198 int sessionIndex = i % sessions.size();
199
200 LastActivityTracker currentSession = sessions.get(sessionIndex);
201 filter.messageReceived(nextFilter, currentSession, null);
202
203 if (nextFilter.throwable != null) {
204 throw nextFilter.throwable;
205 }
206 }
207
208 executor.shutdown();
209
210
211 executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
212
213 for (LastActivityTracker session : sessions) {
214 if (session != preferredSession) {
215 assertTrue("All other sessions should have finished later than the preferred session (but at least one did not).",
216 session.lastActivity > preferredSession.lastActivity);
217 }
218 }
219 }
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236 @Test
237 public void testRuntimeExceptionInWorkerRun() throws Throwable
238 {
239
240 int corePoolSize = 1;
241 PriorityThreadPoolExecutor executor = new PriorityThreadPoolExecutor(corePoolSize,1);
242 IoFilter.NextFilter nextFilter = new PriorityThreadPoolExecutorTest.NextFilterAdapter() {
243 @Override
244 public void messageReceived(IoSession session, Object message) {
245 throw new RuntimeException("A RuntimeException thrown during unit testing.");
246 }
247 };
248 DummySession session = new DummySession();
249 ExecutorFilter filter = new ExecutorFilter(executor);
250
251 try {
252
253 filter.messageReceived(nextFilter, session, null);
254
255
256 executor.shutdown();
257 if (!executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS)) {
258 throw new IllegalStateException("Bug in test implementation.");
259 }
260
261
262 final Field idleWorkersField = PriorityThreadPoolExecutor.class.getDeclaredField("idleWorkers");
263 idleWorkersField.setAccessible(true);
264 final AtomicInteger idleWorkers = (AtomicInteger) idleWorkersField.get(executor);
265 assertEquals("After all tasks have finished, the amount of workers that are idle should equal the amount of workers, but did not.", executor.getPoolSize(), idleWorkers.get());
266 } finally {
267
268 if (!executor.isShutdown()) {
269 executor.shutdownNow();
270 }
271 }
272 }
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289 @Test
290 public void testErrorInWorkerRun() throws Throwable
291 {
292
293 int corePoolSize = 1;
294 PriorityThreadPoolExecutor executor = new PriorityThreadPoolExecutor(corePoolSize,1);
295 IoFilter.NextFilter nextFilter = new PriorityThreadPoolExecutorTest.NextFilterAdapter() {
296 @Override
297 public void messageReceived(IoSession session, Object message) {
298 throw new Error("An Error thrown during unit testing.");
299 }
300 };
301 DummySession session = new DummySession();
302 ExecutorFilter filter = new ExecutorFilter(executor);
303
304 try {
305
306 filter.messageReceived(nextFilter, session, null);
307
308
309 executor.shutdown();
310 if (!executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS)) {
311 throw new IllegalStateException("Bug in test implementation.");
312 }
313
314
315 final Field idleWorkersField = PriorityThreadPoolExecutor.class.getDeclaredField("idleWorkers");
316 idleWorkersField.setAccessible(true);
317 final AtomicInteger idleWorkers = (AtomicInteger) idleWorkersField.get(executor);
318 assertEquals("After all tasks have finished, the amount of workers that are idle should equal the amount of workers, but did not.", executor.getPoolSize(), idleWorkers.get());
319 } finally {
320
321 if (!executor.isShutdown()) {
322 executor.shutdownNow();
323 }
324 }
325 }
326
327
328
329
330 private static class UnfairComparator implements Comparator<IoSession> {
331 private IoSession preferred;
332
333 public UnfairComparator(IoSession preferred) {
334 this.preferred = preferred;
335 }
336
337 @Override
338 public int compare(IoSession o1, IoSession o2) {
339 if (o1 == preferred) {
340 return -1;
341 }
342
343 if (o2 == preferred) {
344 return 1;
345 }
346
347 return 0;
348 }
349 }
350
351
352
353
354 private static class LastActivityTracker extends DummySession {
355 long lastActivity = System.currentTimeMillis();
356
357 public synchronized void setLastActivity() {
358 lastActivity = System.currentTimeMillis();
359 }
360 }
361
362
363
364
365 private static class MockWorkFilter implements IoFilter.NextFilter {
366 Throwable throwable;
367
368 public void sessionOpened(IoSession session) {
369
370 }
371
372 public void sessionClosed(IoSession session) {
373
374 }
375
376 public void sessionIdle(IoSession session, IdleStatus status) {
377
378 }
379
380 public void exceptionCaught(IoSession session, Throwable cause) {
381
382 }
383
384 public void inputClosed(IoSession session) {
385
386 }
387
388 public void messageReceived(IoSession session, Object message) {
389 try {
390 Thread.sleep(20);
391 ((LastActivityTracker) session).setLastActivity();
392 } catch (Exception e) {
393 if (this.throwable == null) {
394 this.throwable = e;
395 }
396 }
397 }
398
399 public void messageSent(IoSession session, WriteRequest writeRequest) {
400
401 }
402
403 public void filterWrite(IoSession session, WriteRequest writeRequest) {
404
405 }
406
407 public void filterClose(IoSession session) {
408
409 }
410
411 public void sessionCreated(IoSession session) {
412
413 }
414 }
415
416
417
418
419 private abstract static class NextFilterAdapter implements IoFilter.NextFilter {
420 public void sessionOpened(IoSession session) {}
421 public void sessionClosed(IoSession session) {}
422 public void sessionIdle(IoSession session, IdleStatus status) {}
423 public void exceptionCaught(IoSession session, Throwable cause) {}
424 public void inputClosed(IoSession session) {}
425 public void messageReceived(IoSession session, Object message) {}
426 public void messageSent(IoSession session, WriteRequest writeRequest) {}
427 public void filterWrite(IoSession session, WriteRequest writeRequest) {}
428 public void filterClose(IoSession session) {}
429 public void sessionCreated(IoSession session) {}
430 }
431 }