View Javadoc
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.filter.executor;
21  
22  import java.util.ArrayList;
23  import java.util.HashSet;
24  import java.util.List;
25  import java.util.Queue;
26  import java.util.Set;
27  import java.util.concurrent.BlockingQueue;
28  import java.util.concurrent.ConcurrentLinkedQueue;
29  import java.util.concurrent.Executors;
30  import java.util.concurrent.LinkedBlockingQueue;
31  import java.util.concurrent.RejectedExecutionHandler;
32  import java.util.concurrent.SynchronousQueue;
33  import java.util.concurrent.ThreadFactory;
34  import java.util.concurrent.ThreadPoolExecutor;
35  import java.util.concurrent.TimeUnit;
36  import java.util.concurrent.atomic.AtomicInteger;
37  
38  import org.apache.mina.core.session.AttributeKey;
39  import org.apache.mina.core.session.DummySession;
40  import org.apache.mina.core.session.IoEvent;
41  import org.apache.mina.core.session.IoSession;
42  import org.slf4j.Logger;
43  import org.slf4j.LoggerFactory;
44  
45  /**
46   * A {@link ThreadPoolExecutor} that maintains the order of {@link IoEvent}s.
47   * <p>
48   * If you don't need to maintain the order of events per session, please use
49   * {@link UnorderedThreadPoolExecutor}.
50  
51   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
52   * @org.apache.xbean.XBean
53   */
54  public class OrderedThreadPoolExecutor extends ThreadPoolExecutor {
55      /** A logger for this class (commented as it breaks MDCFlter tests) */
56      private static Logger LOGGER = LoggerFactory.getLogger(OrderedThreadPoolExecutor.class);
57  
58      /** A default value for the initial pool size */
59      private static final int DEFAULT_INITIAL_THREAD_POOL_SIZE = 0;
60  
61      /** A default value for the maximum pool size */
62      private static final int DEFAULT_MAX_THREAD_POOL = 16;
63  
64      /** A default value for the KeepAlive delay */
65      private static final int DEFAULT_KEEP_ALIVE = 30;
66  
67      private static final IoSession EXIT_SIGNAL = new DummySession();
68  
69      /** A key stored into the session's attribute for the event tasks being queued */
70      private final AttributeKey TASKS_QUEUE = new AttributeKey(getClass(), "tasksQueue");
71  
72      /** A queue used to store the available sessions */
73      private final BlockingQueue<IoSession> waitingSessions = new LinkedBlockingQueue<IoSession>();
74  
75      private final Set<Worker> workers = new HashSet<Worker>();
76  
77      private volatile int largestPoolSize;
78  
79      private final AtomicInteger idleWorkers = new AtomicInteger();
80  
81      private long completedTaskCount;
82  
83      private volatile boolean shutdown;
84  
85      private final IoEventQueueHandler eventQueueHandler;
86  
87      /**
88       * Creates a default ThreadPool, with default values :
89       * - minimum pool size is 0
90       * - maximum pool size is 16
91       * - keepAlive set to 30 seconds
92       * - A default ThreadFactory
93       * - All events are accepted
94       */
95      public OrderedThreadPoolExecutor() {
96          this(DEFAULT_INITIAL_THREAD_POOL_SIZE, DEFAULT_MAX_THREAD_POOL, DEFAULT_KEEP_ALIVE, TimeUnit.SECONDS, Executors
97                  .defaultThreadFactory(), null);
98      }
99  
100     /**
101      * Creates a default ThreadPool, with default values :
102      * - minimum pool size is 0
103      * - keepAlive set to 30 seconds
104      * - A default ThreadFactory
105      * - All events are accepted
106      * 
107      * @param maximumPoolSize The maximum pool size
108      */
109     public OrderedThreadPoolExecutor(int maximumPoolSize) {
110         this(DEFAULT_INITIAL_THREAD_POOL_SIZE, maximumPoolSize, DEFAULT_KEEP_ALIVE, TimeUnit.SECONDS, Executors
111                 .defaultThreadFactory(), null);
112     }
113 
114     /**
115      * Creates a default ThreadPool, with default values :
116      * - keepAlive set to 30 seconds
117      * - A default ThreadFactory
118      * - All events are accepted
119      *
120      * @param corePoolSize The initial pool sizePoolSize
121      * @param maximumPoolSize The maximum pool size
122      */
123     public OrderedThreadPoolExecutor(int corePoolSize, int maximumPoolSize) {
124         this(corePoolSize, maximumPoolSize, DEFAULT_KEEP_ALIVE, TimeUnit.SECONDS, Executors.defaultThreadFactory(),
125                 null);
126     }
127 
128     /**
129      * Creates a default ThreadPool, with default values :
130      * - A default ThreadFactory
131      * - All events are accepted
132      * 
133      * @param corePoolSize The initial pool sizePoolSize
134      * @param maximumPoolSize The maximum pool size
135      * @param keepAliveTime Default duration for a thread
136      * @param unit Time unit used for the keepAlive value
137      */
138     public OrderedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit) {
139         this(corePoolSize, maximumPoolSize, keepAliveTime, unit, Executors.defaultThreadFactory(), null);
140     }
141 
142     /**
143      * Creates a default ThreadPool, with default values :
144      * - A default ThreadFactory
145      * 
146      * @param corePoolSize The initial pool sizePoolSize
147      * @param maximumPoolSize The maximum pool size
148      * @param keepAliveTime Default duration for a thread
149      * @param unit Time unit used for the keepAlive value
150      * @param eventQueueHandler The queue used to store events
151      */
152     public OrderedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
153             IoEventQueueHandler eventQueueHandler) {
154         this(corePoolSize, maximumPoolSize, keepAliveTime, unit, Executors.defaultThreadFactory(), eventQueueHandler);
155     }
156 
157     /**
158      * Creates a default ThreadPool, with default values :
159      * - A default ThreadFactory
160      * 
161      * @param corePoolSize The initial pool sizePoolSize
162      * @param maximumPoolSize The maximum pool size
163      * @param keepAliveTime Default duration for a thread
164      * @param unit Time unit used for the keepAlive value
165      * @param threadFactory The factory used to create threads
166      */
167     public OrderedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
168             ThreadFactory threadFactory) {
169         this(corePoolSize, maximumPoolSize, keepAliveTime, unit, threadFactory, null);
170     }
171 
172     /**
173      * Creates a new instance of a OrderedThreadPoolExecutor.
174      * 
175      * @param corePoolSize The initial pool sizePoolSize
176      * @param maximumPoolSize The maximum pool size
177      * @param keepAliveTime Default duration for a thread
178      * @param unit Time unit used for the keepAlive value
179      * @param threadFactory The factory used to create threads
180      * @param eventQueueHandler The queue used to store events
181      */
182     public OrderedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
183             ThreadFactory threadFactory, IoEventQueueHandler eventQueueHandler) {
184         // We have to initialize the pool with default values (0 and 1) in order to
185         // handle the exception in a better way. We can't add a try {} catch() {}
186         // around the super() call.
187         super(DEFAULT_INITIAL_THREAD_POOL_SIZE, 1, keepAliveTime, unit, new SynchronousQueue<Runnable>(),
188                 threadFactory, new AbortPolicy());
189 
190         if (corePoolSize < DEFAULT_INITIAL_THREAD_POOL_SIZE) {
191             throw new IllegalArgumentException("corePoolSize: " + corePoolSize);
192         }
193 
194         if ((maximumPoolSize == 0) || (maximumPoolSize < corePoolSize)) {
195             throw new IllegalArgumentException("maximumPoolSize: " + maximumPoolSize);
196         }
197 
198         // Now, we can setup the pool sizes
199         super.setCorePoolSize(corePoolSize);
200         super.setMaximumPoolSize(maximumPoolSize);
201 
202         // The queueHandler might be null.
203         if (eventQueueHandler == null) {
204             this.eventQueueHandler = IoEventQueueHandler.NOOP;
205         } else {
206             this.eventQueueHandler = eventQueueHandler;
207         }
208     }
209 
210     /**
211      * Get the session's tasks queue.
212      */
213     private SessionTasksQueue getSessionTasksQueue(IoSession session) {
214         SessionTasksQueue queue = (SessionTasksQueue) session.getAttribute(TASKS_QUEUE);
215 
216         if (queue == null) {
217             queue = new SessionTasksQueue();
218             SessionTasksQueue oldQueue = (SessionTasksQueue) session.setAttributeIfAbsent(TASKS_QUEUE, queue);
219 
220             if (oldQueue != null) {
221                 queue = oldQueue;
222             }
223         }
224 
225         return queue;
226     }
227 
228     /**
229      * @return The associated queue handler. 
230      */
231     public IoEventQueueHandler getQueueHandler() {
232         return eventQueueHandler;
233     }
234 
235     /**
236      * {@inheritDoc}
237      */
238     @Override
239     public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {
240         // Ignore the request.  It must always be AbortPolicy.
241     }
242 
243     /**
244      * Add a new thread to execute a task, if needed and possible.
245      * It depends on the current pool size. If it's full, we do nothing.
246      */
247     private void addWorker() {
248         synchronized (workers) {
249             if (workers.size() >= super.getMaximumPoolSize()) {
250                 return;
251             }
252 
253             // Create a new worker, and add it to the thread pool
254             Worker worker = new Worker();
255             Thread thread = getThreadFactory().newThread(worker);
256 
257             // As we have added a new thread, it's considered as idle.
258             idleWorkers.incrementAndGet();
259 
260             // Now, we can start it.
261             thread.start();
262             workers.add(worker);
263 
264             if (workers.size() > largestPoolSize) {
265                 largestPoolSize = workers.size();
266             }
267         }
268     }
269 
270     /**
271      * Add a new Worker only if there are no idle worker.
272      */
273     private void addWorkerIfNecessary() {
274         if (idleWorkers.get() == 0) {
275             synchronized (workers) {
276                 if (workers.isEmpty() || (idleWorkers.get() == 0)) {
277                     addWorker();
278                 }
279             }
280         }
281     }
282 
283     private void removeWorker() {
284         synchronized (workers) {
285             if (workers.size() <= super.getCorePoolSize()) {
286                 return;
287             }
288             waitingSessions.offer(EXIT_SIGNAL);
289         }
290     }
291 
292     /**
293      * {@inheritDoc}
294      */
295     @Override
296     public int getMaximumPoolSize() {
297         return super.getMaximumPoolSize();
298     }
299 
300     /**
301      * {@inheritDoc}
302      */
303     @Override
304     public void setMaximumPoolSize(int maximumPoolSize) {
305         if ((maximumPoolSize <= 0) || (maximumPoolSize < super.getCorePoolSize())) {
306             throw new IllegalArgumentException("maximumPoolSize: " + maximumPoolSize);
307         }
308 
309         synchronized (workers) {
310             super.setMaximumPoolSize(maximumPoolSize);
311             int difference = workers.size() - maximumPoolSize;
312             while (difference > 0) {
313                 removeWorker();
314                 --difference;
315             }
316         }
317     }
318 
319     /**
320      * {@inheritDoc}
321      */
322     @Override
323     public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
324 
325         long deadline = System.currentTimeMillis() + unit.toMillis(timeout);
326 
327         synchronized (workers) {
328             while (!isTerminated()) {
329                 long waitTime = deadline - System.currentTimeMillis();
330                 if (waitTime <= 0) {
331                     break;
332                 }
333 
334                 workers.wait(waitTime);
335             }
336         }
337         return isTerminated();
338     }
339 
340     /**
341      * {@inheritDoc}
342      */
343     @Override
344     public boolean isShutdown() {
345         return shutdown;
346     }
347 
348     /**
349      * {@inheritDoc}
350      */
351     @Override
352     public boolean isTerminated() {
353         if (!shutdown) {
354             return false;
355         }
356 
357         synchronized (workers) {
358             return workers.isEmpty();
359         }
360     }
361 
362     /**
363      * {@inheritDoc}
364      */
365     @Override
366     public void shutdown() {
367         if (shutdown) {
368             return;
369         }
370 
371         shutdown = true;
372 
373         synchronized (workers) {
374             for (int i = workers.size(); i > 0; i--) {
375                 waitingSessions.offer(EXIT_SIGNAL);
376             }
377         }
378     }
379 
380     /**
381      * {@inheritDoc}
382      */
383     @Override
384     public List<Runnable> shutdownNow() {
385         shutdown();
386 
387         List<Runnable> answer = new ArrayList<Runnable>();
388         IoSession session;
389 
390         while ((session = waitingSessions.poll()) != null) {
391             if (session == EXIT_SIGNAL) {
392                 waitingSessions.offer(EXIT_SIGNAL);
393                 Thread.yield(); // Let others take the signal.
394                 continue;
395             }
396 
397             SessionTasksQueue sessionTasksQueue = (SessionTasksQueue) session.getAttribute(TASKS_QUEUE);
398 
399             synchronized (sessionTasksQueue.tasksQueue) {
400 
401                 for (Runnable task : sessionTasksQueue.tasksQueue) {
402                     getQueueHandler().polled(this, (IoEvent) task);
403                     answer.add(task);
404                 }
405 
406                 sessionTasksQueue.tasksQueue.clear();
407             }
408         }
409 
410         return answer;
411     }
412 
413     /**
414      * A Helper class used to print the list of events being queued. 
415      */
416     private void print(Queue<Runnable> queue, IoEvent event) {
417         StringBuilder sb = new StringBuilder();
418         sb.append("Adding event ").append(event.getType()).append(" to session ").append(event.getSession().getId());
419         boolean first = true;
420         sb.append("\nQueue : [");
421         for (Runnable elem : queue) {
422             if (first) {
423                 first = false;
424             } else {
425                 sb.append(", ");
426             }
427 
428             sb.append(((IoEvent) elem).getType()).append(", ");
429         }
430         sb.append("]\n");
431         LOGGER.debug(sb.toString());
432     }
433 
434     /**
435      * {@inheritDoc}
436      */
437     @Override
438     public void execute(Runnable task) {
439         if (shutdown) {
440             rejectTask(task);
441         }
442 
443         // Check that it's a IoEvent task
444         checkTaskType(task);
445 
446         IoEvent event = (IoEvent) task;
447 
448         // Get the associated session
449         IoSession session = event.getSession();
450 
451         // Get the session's queue of events
452         SessionTasksQueue sessionTasksQueue = getSessionTasksQueue(session);
453         Queue<Runnable> tasksQueue = sessionTasksQueue.tasksQueue;
454 
455         boolean offerSession;
456 
457         // propose the new event to the event queue handler. If we
458         // use a throttle queue handler, the message may be rejected
459         // if the maximum size has been reached.
460         boolean offerEvent = eventQueueHandler.accept(this, event);
461 
462         if (offerEvent) {
463             // Ok, the message has been accepted
464             synchronized (tasksQueue) {
465                 // Inject the event into the executor taskQueue
466                 tasksQueue.offer(event);
467 
468                 if (sessionTasksQueue.processingCompleted) {
469                     sessionTasksQueue.processingCompleted = false;
470                     offerSession = true;
471                 } else {
472                     offerSession = false;
473                 }
474 
475                 if (LOGGER.isDebugEnabled()) {
476                     print(tasksQueue, event);
477                 }
478             }
479         } else {
480             offerSession = false;
481         }
482 
483         if (offerSession) {
484             // As the tasksQueue was empty, the task has been executed
485             // immediately, so we can move the session to the queue
486             // of sessions waiting for completion.
487             waitingSessions.offer(session);
488         }
489 
490         addWorkerIfNecessary();
491 
492         if (offerEvent) {
493             eventQueueHandler.offered(this, event);
494         }
495     }
496 
497     private void rejectTask(Runnable task) {
498         getRejectedExecutionHandler().rejectedExecution(task, this);
499     }
500 
501     private void checkTaskType(Runnable task) {
502         if (!(task instanceof IoEvent)) {
503             throw new IllegalArgumentException("task must be an IoEvent or its subclass.");
504         }
505     }
506 
507     /**
508      * {@inheritDoc}
509      */
510     @Override
511     public int getActiveCount() {
512         synchronized (workers) {
513             return workers.size() - idleWorkers.get();
514         }
515     }
516 
517     /**
518      * {@inheritDoc}
519      */
520     @Override
521     public long getCompletedTaskCount() {
522         synchronized (workers) {
523             long answer = completedTaskCount;
524             for (Worker w : workers) {
525                 answer += w.completedTaskCount;
526             }
527 
528             return answer;
529         }
530     }
531 
532     /**
533      * {@inheritDoc}
534      */
535     @Override
536     public int getLargestPoolSize() {
537         return largestPoolSize;
538     }
539 
540     /**
541      * {@inheritDoc}
542      */
543     @Override
544     public int getPoolSize() {
545         synchronized (workers) {
546             return workers.size();
547         }
548     }
549 
550     /**
551      * {@inheritDoc}
552      */
553     @Override
554     public long getTaskCount() {
555         return getCompletedTaskCount();
556     }
557 
558     /**
559      * {@inheritDoc}
560      */
561     @Override
562     public boolean isTerminating() {
563         synchronized (workers) {
564             return isShutdown() && !isTerminated();
565         }
566     }
567 
568     /**
569      * {@inheritDoc}
570      */
571     @Override
572     public int prestartAllCoreThreads() {
573         int answer = 0;
574         synchronized (workers) {
575             for (int i = super.getCorePoolSize() - workers.size(); i > 0; i--) {
576                 addWorker();
577                 answer++;
578             }
579         }
580         return answer;
581     }
582 
583     /**
584      * {@inheritDoc}
585      */
586     @Override
587     public boolean prestartCoreThread() {
588         synchronized (workers) {
589             if (workers.size() < super.getCorePoolSize()) {
590                 addWorker();
591                 return true;
592             } else {
593                 return false;
594             }
595         }
596     }
597 
598     /**
599      * {@inheritDoc}
600      */
601     @Override
602     public BlockingQueue<Runnable> getQueue() {
603         throw new UnsupportedOperationException();
604     }
605 
606     /**
607      * {@inheritDoc}
608      */
609     @Override
610     public void purge() {
611         // Nothing to purge in this implementation.
612     }
613 
614     /**
615      * {@inheritDoc}
616      */
617     @Override
618     public boolean remove(Runnable task) {
619         checkTaskType(task);
620         IoEvent event = (IoEvent) task;
621         IoSession session = event.getSession();
622         SessionTasksQueue sessionTasksQueue = (SessionTasksQueue) session.getAttribute(TASKS_QUEUE);
623         Queue<Runnable> tasksQueue = sessionTasksQueue.tasksQueue;
624 
625         if (sessionTasksQueue == null) {
626             return false;
627         }
628 
629         boolean removed;
630 
631         synchronized (tasksQueue) {
632             removed = tasksQueue.remove(task);
633         }
634 
635         if (removed) {
636             getQueueHandler().polled(this, event);
637         }
638 
639         return removed;
640     }
641 
642     /**
643      * {@inheritDoc}
644      */
645     @Override
646     public int getCorePoolSize() {
647         return super.getCorePoolSize();
648     }
649 
650     /**
651      * {@inheritDoc}
652      */
653     @Override
654     public void setCorePoolSize(int corePoolSize) {
655         if (corePoolSize < 0) {
656             throw new IllegalArgumentException("corePoolSize: " + corePoolSize);
657         }
658         if (corePoolSize > super.getMaximumPoolSize()) {
659             throw new IllegalArgumentException("corePoolSize exceeds maximumPoolSize");
660         }
661 
662         synchronized (workers) {
663             if (super.getCorePoolSize() > corePoolSize) {
664                 for (int i = super.getCorePoolSize() - corePoolSize; i > 0; i--) {
665                     removeWorker();
666                 }
667             }
668             super.setCorePoolSize(corePoolSize);
669         }
670     }
671 
672     private class Worker implements Runnable {
673 
674         private volatile long completedTaskCount;
675 
676         private Thread thread;
677 
678         public void run() {
679             thread = Thread.currentThread();
680 
681             try {
682                 for (;;) {
683                     IoSession session = fetchSession();
684 
685                     idleWorkers.decrementAndGet();
686 
687                     if (session == null) {
688                         synchronized (workers) {
689                             if (workers.size() > getCorePoolSize()) {
690                                 // Remove now to prevent duplicate exit.
691                                 workers.remove(this);
692                                 break;
693                             }
694                         }
695                     }
696 
697                     if (session == EXIT_SIGNAL) {
698                         break;
699                     }
700 
701                     try {
702                         if (session != null) {
703                             runTasks(getSessionTasksQueue(session));
704                         }
705                     } finally {
706                         idleWorkers.incrementAndGet();
707                     }
708                 }
709             } finally {
710                 synchronized (workers) {
711                     workers.remove(this);
712                     OrderedThreadPoolExecutor.this.completedTaskCount += completedTaskCount;
713                     workers.notifyAll();
714                 }
715             }
716         }
717 
718         private IoSession fetchSession() {
719             IoSession session = null;
720             long currentTime = System.currentTimeMillis();
721             long deadline = currentTime + getKeepAliveTime(TimeUnit.MILLISECONDS);
722             for (;;) {
723                 try {
724                     long waitTime = deadline - currentTime;
725                     if (waitTime <= 0) {
726                         break;
727                     }
728 
729                     try {
730                         session = waitingSessions.poll(waitTime, TimeUnit.MILLISECONDS);
731                         break;
732                     } finally {
733                         if (session == null) {
734                             currentTime = System.currentTimeMillis();
735                         }
736                     }
737                 } catch (InterruptedException e) {
738                     // Ignore.
739                     continue;
740                 }
741             }
742             return session;
743         }
744 
745         private void runTasks(SessionTasksQueue sessionTasksQueue) {
746             for (;;) {
747                 Runnable task;
748                 Queue<Runnable> tasksQueue = sessionTasksQueue.tasksQueue;
749 
750                 synchronized (tasksQueue) {
751                     task = tasksQueue.poll();
752 
753                     if (task == null) {
754                         sessionTasksQueue.processingCompleted = true;
755                         break;
756                     }
757                 }
758 
759                 eventQueueHandler.polled(OrderedThreadPoolExecutor.this, (IoEvent) task);
760 
761                 runTask(task);
762             }
763         }
764 
765         private void runTask(Runnable task) {
766             beforeExecute(thread, task);
767             boolean ran = false;
768             try {
769                 task.run();
770                 ran = true;
771                 afterExecute(task, null);
772                 completedTaskCount++;
773             } catch (RuntimeException e) {
774                 if (!ran) {
775                     afterExecute(task, e);
776                 }
777                 throw e;
778             }
779         }
780     }
781 
782     /**
783      * A class used to store the ordered list of events to be processed by the
784      * session, and the current task state.
785      */
786     private class SessionTasksQueue {
787         /**  A queue of ordered event waiting to be processed */
788         private final Queue<Runnable> tasksQueue = new ConcurrentLinkedQueue<Runnable>();
789 
790         /** The current task state */
791         private boolean processingCompleted = true;
792     }
793 }