View Javadoc
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.filter.executor;
21  
22  import java.util.ArrayList;
23  import java.util.HashSet;
24  import java.util.List;
25  import java.util.Queue;
26  import java.util.Set;
27  import java.util.concurrent.BlockingQueue;
28  import java.util.concurrent.ConcurrentLinkedQueue;
29  import java.util.concurrent.Executors;
30  import java.util.concurrent.LinkedBlockingQueue;
31  import java.util.concurrent.RejectedExecutionHandler;
32  import java.util.concurrent.SynchronousQueue;
33  import java.util.concurrent.ThreadFactory;
34  import java.util.concurrent.ThreadPoolExecutor;
35  import java.util.concurrent.TimeUnit;
36  import java.util.concurrent.atomic.AtomicInteger;
37  import java.util.concurrent.atomic.AtomicLong;
38  
39  import org.apache.mina.core.session.AttributeKey;
40  import org.apache.mina.core.session.DummySession;
41  import org.apache.mina.core.session.IoEvent;
42  import org.apache.mina.core.session.IoSession;
43  import org.slf4j.Logger;
44  import org.slf4j.LoggerFactory;
45  
46  /**
47   * A {@link ThreadPoolExecutor} that maintains the order of {@link IoEvent}s.
48   * <p>
49   * If you don't need to maintain the order of events per session, please use
50   * {@link UnorderedThreadPoolExecutor}.
51  
52   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
53   * @org.apache.xbean.XBean
54   */
55  public class OrderedThreadPoolExecutor extends ThreadPoolExecutor {
56      /** A logger for this class (commented as it breaks MDCFlter tests) */
57      private static final Logger LOGGER = LoggerFactory.getLogger(OrderedThreadPoolExecutor.class);
58  
59      /** A default value for the initial pool size */
60      private static final int DEFAULT_INITIAL_THREAD_POOL_SIZE = 0;
61  
62      /** A default value for the maximum pool size */
63      private static final int DEFAULT_MAX_THREAD_POOL = 16;
64  
65      /** A default value for the KeepAlive delay */
66      private static final int DEFAULT_KEEP_ALIVE = 30;
67  
68      private static final IoSession EXIT_SIGNAL = new DummySession();
69  
70      /** A key stored into the session's attribute for the event tasks being queued */
71      private static final AttributeKey TASKS_QUEUE = new AttributeKey(OrderedThreadPoolExecutor.class, "tasksQueue");
72  
73      /** A queue used to store the available sessions */
74      private final BlockingQueue<IoSession> waitingSessions = new LinkedBlockingQueue<>();
75  
76      private final Set<Worker> workers = new HashSet<>();
77  
78      private volatile int largestPoolSize;
79  
80      private final AtomicInteger idleWorkers = new AtomicInteger();
81  
82      private long completedTaskCount;
83  
84      private volatile boolean shutdown;
85  
86      private final IoEventQueueHandler eventQueueHandler;
87  
88      /**
89       * Creates a default ThreadPool, with default values :
90       * - minimum pool size is 0
91       * - maximum pool size is 16
92       * - keepAlive set to 30 seconds
93       * - A default ThreadFactory
94       * - All events are accepted
95       */
96      public OrderedThreadPoolExecutor() {
97          this(DEFAULT_INITIAL_THREAD_POOL_SIZE, DEFAULT_MAX_THREAD_POOL, DEFAULT_KEEP_ALIVE, TimeUnit.SECONDS, Executors
98                  .defaultThreadFactory(), null);
99      }
100 
101     /**
102      * Creates a default ThreadPool, with default values :
103      * - minimum pool size is 0
104      * - keepAlive set to 30 seconds
105      * - A default ThreadFactory
106      * - All events are accepted
107      * 
108      * @param maximumPoolSize The maximum pool size
109      */
110     public OrderedThreadPoolExecutor(int maximumPoolSize) {
111         this(DEFAULT_INITIAL_THREAD_POOL_SIZE, maximumPoolSize, DEFAULT_KEEP_ALIVE, TimeUnit.SECONDS, Executors
112                 .defaultThreadFactory(), null);
113     }
114 
115     /**
116      * Creates a default ThreadPool, with default values :
117      * - keepAlive set to 30 seconds
118      * - A default ThreadFactory
119      * - All events are accepted
120      *
121      * @param corePoolSize The initial pool sizePoolSize
122      * @param maximumPoolSize The maximum pool size
123      */
124     public OrderedThreadPoolExecutor(int corePoolSize, int maximumPoolSize) {
125         this(corePoolSize, maximumPoolSize, DEFAULT_KEEP_ALIVE, TimeUnit.SECONDS, Executors.defaultThreadFactory(),
126                 null);
127     }
128 
129     /**
130      * Creates a default ThreadPool, with default values :
131      * - A default ThreadFactory
132      * - All events are accepted
133      * 
134      * @param corePoolSize The initial pool sizePoolSize
135      * @param maximumPoolSize The maximum pool size
136      * @param keepAliveTime Default duration for a thread
137      * @param unit Time unit used for the keepAlive value
138      */
139     public OrderedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit) {
140         this(corePoolSize, maximumPoolSize, keepAliveTime, unit, Executors.defaultThreadFactory(), null);
141     }
142 
143     /**
144      * Creates a default ThreadPool, with default values :
145      * - A default ThreadFactory
146      * 
147      * @param corePoolSize The initial pool sizePoolSize
148      * @param maximumPoolSize The maximum pool size
149      * @param keepAliveTime Default duration for a thread
150      * @param unit Time unit used for the keepAlive value
151      * @param eventQueueHandler The queue used to store events
152      */
153     public OrderedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
154             IoEventQueueHandler eventQueueHandler) {
155         this(corePoolSize, maximumPoolSize, keepAliveTime, unit, Executors.defaultThreadFactory(), eventQueueHandler);
156     }
157 
158     /**
159      * Creates a default ThreadPool, with default values :
160      * - A default ThreadFactory
161      * 
162      * @param corePoolSize The initial pool sizePoolSize
163      * @param maximumPoolSize The maximum pool size
164      * @param keepAliveTime Default duration for a thread
165      * @param unit Time unit used for the keepAlive value
166      * @param threadFactory The factory used to create threads
167      */
168     public OrderedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
169             ThreadFactory threadFactory) {
170         this(corePoolSize, maximumPoolSize, keepAliveTime, unit, threadFactory, null);
171     }
172 
173     /**
174      * Creates a new instance of a OrderedThreadPoolExecutor.
175      * 
176      * @param corePoolSize The initial pool sizePoolSize
177      * @param maximumPoolSize The maximum pool size
178      * @param keepAliveTime Default duration for a thread
179      * @param unit Time unit used for the keepAlive value
180      * @param threadFactory The factory used to create threads
181      * @param eventQueueHandler The queue used to store events
182      */
183     public OrderedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
184             ThreadFactory threadFactory, IoEventQueueHandler eventQueueHandler) {
185         // We have to initialize the pool with default values (0 and 1) in order to
186         // handle the exception in a better way. We can't add a try {} catch() {}
187         // around the super() call.
188         super(DEFAULT_INITIAL_THREAD_POOL_SIZE, 1, keepAliveTime, unit, new SynchronousQueue<Runnable>(),
189                 threadFactory, new AbortPolicy());
190 
191         if (corePoolSize < DEFAULT_INITIAL_THREAD_POOL_SIZE) {
192             throw new IllegalArgumentException("corePoolSize: " + corePoolSize);
193         }
194 
195         if ((maximumPoolSize == 0) || (maximumPoolSize < corePoolSize)) {
196             throw new IllegalArgumentException("maximumPoolSize: " + maximumPoolSize);
197         }
198 
199         // Now, we can setup the pool sizes
200         super.setCorePoolSize(corePoolSize);
201         super.setMaximumPoolSize(maximumPoolSize);
202 
203         // The queueHandler might be null.
204         if (eventQueueHandler == null) {
205             this.eventQueueHandler = IoEventQueueHandler.NOOP;
206         } else {
207             this.eventQueueHandler = eventQueueHandler;
208         }
209     }
210 
211     /**
212      * Get the session's tasks queue.
213      */
214     private SessionTasksQueue getSessionTasksQueue(IoSession session) {
215         SessionTasksQueue queue = (SessionTasksQueue) session.getAttribute(TASKS_QUEUE);
216 
217         if (queue == null) {
218             queue = new SessionTasksQueue();
219             SessionTasksQueue oldQueue = (SessionTasksQueue) session.setAttributeIfAbsent(TASKS_QUEUE, queue);
220 
221             if (oldQueue != null) {
222                 queue = oldQueue;
223             }
224         }
225 
226         return queue;
227     }
228 
229     /**
230      * @return The associated queue handler. 
231      */
232     public IoEventQueueHandler getQueueHandler() {
233         return eventQueueHandler;
234     }
235 
236     /**
237      * {@inheritDoc}
238      */
239     @Override
240     public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {
241         // Ignore the request.  It must always be AbortPolicy.
242     }
243 
244     /**
245      * Add a new thread to execute a task, if needed and possible.
246      * It depends on the current pool size. If it's full, we do nothing.
247      */
248     private void addWorker() {
249         synchronized (workers) {
250             if (workers.size() >= super.getMaximumPoolSize()) {
251                 return;
252             }
253 
254             // Create a new worker, and add it to the thread pool
255             Worker worker = new Worker();
256             Thread thread = getThreadFactory().newThread(worker);
257 
258             // As we have added a new thread, it's considered as idle.
259             idleWorkers.incrementAndGet();
260 
261             // Now, we can start it.
262             thread.start();
263             workers.add(worker);
264 
265             if (workers.size() > largestPoolSize) {
266                 largestPoolSize = workers.size();
267             }
268         }
269     }
270 
271     /**
272      * Add a new Worker only if there are no idle worker.
273      */
274     private void addWorkerIfNecessary() {
275         if (idleWorkers.get() == 0) {
276             synchronized (workers) {
277                 if (workers.isEmpty() || (idleWorkers.get() == 0)) {
278                     addWorker();
279                 }
280             }
281         }
282     }
283 
284     private void removeWorker() {
285         synchronized (workers) {
286             if (workers.size() <= super.getCorePoolSize()) {
287                 return;
288             }
289             waitingSessions.offer(EXIT_SIGNAL);
290         }
291     }
292 
293     /**
294      * {@inheritDoc}
295      */
296     @Override
297     public void setMaximumPoolSize(int maximumPoolSize) {
298         if ((maximumPoolSize <= 0) || (maximumPoolSize < super.getCorePoolSize())) {
299             throw new IllegalArgumentException("maximumPoolSize: " + maximumPoolSize);
300         }
301 
302         synchronized (workers) {
303             super.setMaximumPoolSize(maximumPoolSize);
304             int difference = workers.size() - maximumPoolSize;
305             while (difference > 0) {
306                 removeWorker();
307                 --difference;
308             }
309         }
310     }
311 
312     /**
313      * {@inheritDoc}
314      */
315     @Override
316     public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
317 
318         long deadline = System.currentTimeMillis() + unit.toMillis(timeout);
319 
320         synchronized (workers) {
321             while (!isTerminated()) {
322                 long waitTime = deadline - System.currentTimeMillis();
323                 if (waitTime <= 0) {
324                     break;
325                 }
326 
327                 workers.wait(waitTime);
328             }
329         }
330         return isTerminated();
331     }
332 
333     /**
334      * {@inheritDoc}
335      */
336     @Override
337     public boolean isShutdown() {
338         return shutdown;
339     }
340 
341     /**
342      * {@inheritDoc}
343      */
344     @Override
345     public boolean isTerminated() {
346         if (!shutdown) {
347             return false;
348         }
349 
350         synchronized (workers) {
351             return workers.isEmpty();
352         }
353     }
354 
355     /**
356      * {@inheritDoc}
357      */
358     @Override
359     public void shutdown() {
360         if (shutdown) {
361             return;
362         }
363 
364         shutdown = true;
365 
366         synchronized (workers) {
367             for (int i = workers.size(); i > 0; i--) {
368                 waitingSessions.offer(EXIT_SIGNAL);
369             }
370         }
371     }
372 
373     /**
374      * {@inheritDoc}
375      */
376     @Override
377     public List<Runnable> shutdownNow() {
378         shutdown();
379 
380         List<Runnable> answer = new ArrayList<>();
381         IoSession session;
382 
383         while ((session = waitingSessions.poll()) != null) {
384             if (session == EXIT_SIGNAL) {
385                 waitingSessions.offer(EXIT_SIGNAL);
386                 Thread.yield(); // Let others take the signal.
387                 continue;
388             }
389 
390             SessionTasksQueue sessionTasksQueue = (SessionTasksQueue) session.getAttribute(TASKS_QUEUE);
391 
392             synchronized (sessionTasksQueue.tasksQueue) {
393 
394                 for (Runnable task : sessionTasksQueue.tasksQueue) {
395                     getQueueHandler().polled(this, (IoEvent) task);
396                     answer.add(task);
397                 }
398 
399                 sessionTasksQueue.tasksQueue.clear();
400             }
401         }
402 
403         return answer;
404     }
405 
406     /**
407      * A Helper class used to print the list of events being queued. 
408      */
409     private void print(Queue<Runnable> queue, IoEvent event) {
410         StringBuilder sb = new StringBuilder();
411         sb.append("Adding event ").append(event.getType()).append(" to session ").append(event.getSession().getId());
412         boolean first = true;
413         sb.append("\nQueue : [");
414         for (Runnable elem : queue) {
415             if (first) {
416                 first = false;
417             } else {
418                 sb.append(", ");
419             }
420 
421             sb.append(((IoEvent) elem).getType()).append(", ");
422         }
423         sb.append("]\n");
424         LOGGER.debug(sb.toString());
425     }
426 
427     /**
428      * {@inheritDoc}
429      */
430     @Override
431     public void execute(Runnable task) {
432         if (shutdown) {
433             rejectTask(task);
434         }
435 
436         // Check that it's a IoEvent task
437         checkTaskType(task);
438 
439         IoEvent event = (IoEvent) task;
440 
441         // Get the associated session
442         IoSession session = event.getSession();
443 
444         // Get the session's queue of events
445         SessionTasksQueue sessionTasksQueue = getSessionTasksQueue(session);
446         Queue<Runnable> tasksQueue = sessionTasksQueue.tasksQueue;
447 
448         boolean offerSession;
449 
450         // propose the new event to the event queue handler. If we
451         // use a throttle queue handler, the message may be rejected
452         // if the maximum size has been reached.
453         boolean offerEvent = eventQueueHandler.accept(this, event);
454 
455         if (offerEvent) {
456             // Ok, the message has been accepted
457             synchronized (tasksQueue) {
458                 // Inject the event into the executor taskQueue
459                 tasksQueue.offer(event);
460 
461                 if (sessionTasksQueue.processingCompleted) {
462                     sessionTasksQueue.processingCompleted = false;
463                     offerSession = true;
464                 } else {
465                     offerSession = false;
466                 }
467 
468                 if (LOGGER.isDebugEnabled()) {
469                     print(tasksQueue, event);
470                 }
471             }
472         } else {
473             offerSession = false;
474         }
475 
476         if (offerSession) {
477             // As the tasksQueue was empty, the task has been executed
478             // immediately, so we can move the session to the queue
479             // of sessions waiting for completion.
480             waitingSessions.offer(session);
481         }
482 
483         addWorkerIfNecessary();
484 
485         if (offerEvent) {
486             eventQueueHandler.offered(this, event);
487         }
488     }
489 
490     private void rejectTask(Runnable task) {
491         getRejectedExecutionHandler().rejectedExecution(task, this);
492     }
493 
494     private void checkTaskType(Runnable task) {
495         if (!(task instanceof IoEvent)) {
496             throw new IllegalArgumentException("task must be an IoEvent or its subclass.");
497         }
498     }
499 
500     /**
501      * {@inheritDoc}
502      */
503     @Override
504     public int getActiveCount() {
505         synchronized (workers) {
506             return workers.size() - idleWorkers.get();
507         }
508     }
509 
510     /**
511      * {@inheritDoc}
512      */
513     @Override
514     public long getCompletedTaskCount() {
515         synchronized (workers) {
516             long answer = completedTaskCount;
517             for (Worker w : workers) {
518                 answer += w.completedTaskCount.get();
519             }
520 
521             return answer;
522         }
523     }
524 
525     /**
526      * {@inheritDoc}
527      */
528     @Override
529     public int getLargestPoolSize() {
530         return largestPoolSize;
531     }
532 
533     /**
534      * {@inheritDoc}
535      */
536     @Override
537     public int getPoolSize() {
538         synchronized (workers) {
539             return workers.size();
540         }
541     }
542 
543     /**
544      * {@inheritDoc}
545      */
546     @Override
547     public long getTaskCount() {
548         return getCompletedTaskCount();
549     }
550 
551     /**
552      * {@inheritDoc}
553      */
554     @Override
555     public boolean isTerminating() {
556         synchronized (workers) {
557             return isShutdown() && !isTerminated();
558         }
559     }
560 
561     /**
562      * {@inheritDoc}
563      */
564     @Override
565     public int prestartAllCoreThreads() {
566         int answer = 0;
567         synchronized (workers) {
568             for (int i = super.getCorePoolSize() - workers.size(); i > 0; i--) {
569                 addWorker();
570                 answer++;
571             }
572         }
573         return answer;
574     }
575 
576     /**
577      * {@inheritDoc}
578      */
579     @Override
580     public boolean prestartCoreThread() {
581         synchronized (workers) {
582             if (workers.size() < super.getCorePoolSize()) {
583                 addWorker();
584                 return true;
585             } else {
586                 return false;
587             }
588         }
589     }
590 
591     /**
592      * {@inheritDoc}
593      */
594     @Override
595     public BlockingQueue<Runnable> getQueue() {
596         throw new UnsupportedOperationException();
597     }
598 
599     /**
600      * {@inheritDoc}
601      */
602     @Override
603     public void purge() {
604         // Nothing to purge in this implementation.
605     }
606 
607     /**
608      * {@inheritDoc}
609      */
610     @Override
611     public boolean remove(Runnable task) {
612         checkTaskType(task);
613         IoEvent event = (IoEvent) task;
614         IoSession session = event.getSession();
615         SessionTasksQueue sessionTasksQueue = (SessionTasksQueue) session.getAttribute(TASKS_QUEUE);
616 
617         if (sessionTasksQueue == null) {
618             return false;
619         }
620 
621         boolean removed;
622         Queue<Runnable> tasksQueue = sessionTasksQueue.tasksQueue;
623 
624         synchronized (tasksQueue) {
625             removed = tasksQueue.remove(task);
626         }
627 
628         if (removed) {
629             getQueueHandler().polled(this, event);
630         }
631 
632         return removed;
633     }
634 
635     /**
636      * {@inheritDoc}
637      */
638     @Override
639     public void setCorePoolSize(int corePoolSize) {
640         if (corePoolSize < 0) {
641             throw new IllegalArgumentException("corePoolSize: " + corePoolSize);
642         }
643         if (corePoolSize > super.getMaximumPoolSize()) {
644             throw new IllegalArgumentException("corePoolSize exceeds maximumPoolSize");
645         }
646 
647         synchronized (workers) {
648             if (super.getCorePoolSize() > corePoolSize) {
649                 for (int i = super.getCorePoolSize() - corePoolSize; i > 0; i--) {
650                     removeWorker();
651                 }
652             }
653             super.setCorePoolSize(corePoolSize);
654         }
655     }
656 
657     private class Worker implements Runnable {
658 
659         private AtomicLong completedTaskCount = new AtomicLong(0);
660 
661         private Thread thread;
662 
663         /**
664          * @inheritedDoc
665          */
666         @Override
667         public void run() {
668             thread = Thread.currentThread();
669 
670             try {
671                 for (;;) {
672                     IoSession session = fetchSession();
673 
674                     idleWorkers.decrementAndGet();
675 
676                     if (session == null) {
677                         synchronized (workers) {
678                             if (workers.size() > getCorePoolSize()) {
679                                 // Remove now to prevent duplicate exit.
680                                 workers.remove(this);
681                                 break;
682                             }
683                         }
684                     }
685 
686                     if (session == EXIT_SIGNAL) {
687                         break;
688                     }
689 
690                     try {
691                         if (session != null) {
692                             runTasks(getSessionTasksQueue(session));
693                         }
694                     } finally {
695                         idleWorkers.incrementAndGet();
696                     }
697                 }
698             } finally {
699                 synchronized (workers) {
700                     workers.remove(this);
701                     OrderedThreadPoolExecutor.this.completedTaskCount += completedTaskCount.get();
702                     workers.notifyAll();
703                 }
704             }
705         }
706 
707         private IoSession fetchSession() {
708             IoSession session = null;
709             long currentTime = System.currentTimeMillis();
710             long deadline = currentTime + getKeepAliveTime(TimeUnit.MILLISECONDS);
711             
712             for (;;) {
713                 try {
714                     long waitTime = deadline - currentTime;
715                     
716                     if (waitTime <= 0) {
717                         break;
718                     }
719 
720                     try {
721                         session = waitingSessions.poll(waitTime, TimeUnit.MILLISECONDS);
722                         break;
723                     } finally {
724                         if (session != null) {
725                             currentTime = System.currentTimeMillis();
726                         }
727                     }
728                 } catch (InterruptedException e) {
729                     // Ignore.
730                     continue;
731                 }
732             }
733             
734             return session;
735         }
736 
737         private void runTasks(SessionTasksQueue sessionTasksQueue) {
738             for (;;) {
739                 Runnable task;
740                 Queue<Runnable> tasksQueue = sessionTasksQueue.tasksQueue;
741 
742                 synchronized (tasksQueue) {
743                     task = tasksQueue.poll();
744 
745                     if (task == null) {
746                         sessionTasksQueue.processingCompleted = true;
747                         break;
748                     }
749                 }
750 
751                 eventQueueHandler.polled(OrderedThreadPoolExecutor.this, (IoEvent) task);
752 
753                 runTask(task);
754             }
755         }
756 
757         private void runTask(Runnable task) {
758             beforeExecute(thread, task);
759             boolean ran = false;
760             try {
761                 task.run();
762                 ran = true;
763                 afterExecute(task, null);
764                 completedTaskCount.incrementAndGet();
765             } catch (RuntimeException e) {
766                 if (!ran) {
767                     afterExecute(task, e);
768                 }
769                 throw e;
770             }
771         }
772     }
773 
774     /**
775      * A class used to store the ordered list of events to be processed by the
776      * session, and the current task state.
777      */
778     private class SessionTasksQueue {
779         /**  A queue of ordered event waiting to be processed */
780         private final Queue<Runnable> tasksQueue = new ConcurrentLinkedQueue<>();
781 
782         /** The current task state */
783         private boolean processingCompleted = true;
784     }
785 }