View Javadoc
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.filter.executor;
21  
22  import java.util.ArrayList;
23  import java.util.HashSet;
24  import java.util.List;
25  import java.util.Queue;
26  import java.util.Set;
27  import java.util.concurrent.BlockingQueue;
28  import java.util.concurrent.ConcurrentLinkedQueue;
29  import java.util.concurrent.Executors;
30  import java.util.concurrent.LinkedBlockingQueue;
31  import java.util.concurrent.RejectedExecutionHandler;
32  import java.util.concurrent.SynchronousQueue;
33  import java.util.concurrent.ThreadFactory;
34  import java.util.concurrent.ThreadPoolExecutor;
35  import java.util.concurrent.TimeUnit;
36  import java.util.concurrent.atomic.AtomicInteger;
37  import java.util.concurrent.atomic.AtomicLong;
38  
39  import org.apache.mina.core.session.AttributeKey;
40  import org.apache.mina.core.session.DummySession;
41  import org.apache.mina.core.session.IoEvent;
42  import org.apache.mina.core.session.IoSession;
43  import org.slf4j.Logger;
44  import org.slf4j.LoggerFactory;
45  
46  /**
47   * A {@link ThreadPoolExecutor} that maintains the order of {@link IoEvent}s.
48   * <p>
49   * If you don't need to maintain the order of events per session, please use
50   * {@link UnorderedThreadPoolExecutor}.
51  
52   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
53   * @org.apache.xbean.XBean
54   */
55  public class OrderedThreadPoolExecutor extends ThreadPoolExecutor {
56      /** A logger for this class (commented as it breaks MDCFlter tests) */
57      private static final Logger LOGGER = LoggerFactory.getLogger(OrderedThreadPoolExecutor.class);
58  
59      /** A default value for the initial pool size */
60      private static final int DEFAULT_INITIAL_THREAD_POOL_SIZE = 0;
61  
62      /** A default value for the maximum pool size */
63      private static final int DEFAULT_MAX_THREAD_POOL = 16;
64  
65      /** A default value for the KeepAlive delay */
66      private static final int DEFAULT_KEEP_ALIVE = 30;
67  
68      private static final IoSession EXIT_SIGNAL = new DummySession();
69  
70      /** A key stored into the session's attribute for the event tasks being queued */
71      private final AttributeKey TASKS_QUEUE = new AttributeKey(getClass(), "tasksQueue");
72  
73      /** A queue used to store the available sessions */
74      private final BlockingQueue<IoSession> waitingSessions = new LinkedBlockingQueue<IoSession>();
75  
76      private final Set<Worker> workers = new HashSet<Worker>();
77  
78      private volatile int largestPoolSize;
79  
80      private final AtomicInteger idleWorkers = new AtomicInteger();
81  
82      private long completedTaskCount;
83  
84      private volatile boolean shutdown;
85  
86      private final IoEventQueueHandler eventQueueHandler;
87  
88      /**
89       * Creates a default ThreadPool, with default values :
90       * - minimum pool size is 0
91       * - maximum pool size is 16
92       * - keepAlive set to 30 seconds
93       * - A default ThreadFactory
94       * - All events are accepted
95       */
96      public OrderedThreadPoolExecutor() {
97          this(DEFAULT_INITIAL_THREAD_POOL_SIZE, DEFAULT_MAX_THREAD_POOL, DEFAULT_KEEP_ALIVE, TimeUnit.SECONDS, Executors
98                  .defaultThreadFactory(), null);
99      }
100 
101     /**
102      * Creates a default ThreadPool, with default values :
103      * - minimum pool size is 0
104      * - keepAlive set to 30 seconds
105      * - A default ThreadFactory
106      * - All events are accepted
107      * 
108      * @param maximumPoolSize The maximum pool size
109      */
110     public OrderedThreadPoolExecutor(int maximumPoolSize) {
111         this(DEFAULT_INITIAL_THREAD_POOL_SIZE, maximumPoolSize, DEFAULT_KEEP_ALIVE, TimeUnit.SECONDS, Executors
112                 .defaultThreadFactory(), null);
113     }
114 
115     /**
116      * Creates a default ThreadPool, with default values :
117      * - keepAlive set to 30 seconds
118      * - A default ThreadFactory
119      * - All events are accepted
120      *
121      * @param corePoolSize The initial pool sizePoolSize
122      * @param maximumPoolSize The maximum pool size
123      */
124     public OrderedThreadPoolExecutor(int corePoolSize, int maximumPoolSize) {
125         this(corePoolSize, maximumPoolSize, DEFAULT_KEEP_ALIVE, TimeUnit.SECONDS, Executors.defaultThreadFactory(),
126                 null);
127     }
128 
129     /**
130      * Creates a default ThreadPool, with default values :
131      * - A default ThreadFactory
132      * - All events are accepted
133      * 
134      * @param corePoolSize The initial pool sizePoolSize
135      * @param maximumPoolSize The maximum pool size
136      * @param keepAliveTime Default duration for a thread
137      * @param unit Time unit used for the keepAlive value
138      */
139     public OrderedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit) {
140         this(corePoolSize, maximumPoolSize, keepAliveTime, unit, Executors.defaultThreadFactory(), null);
141     }
142 
143     /**
144      * Creates a default ThreadPool, with default values :
145      * - A default ThreadFactory
146      * 
147      * @param corePoolSize The initial pool sizePoolSize
148      * @param maximumPoolSize The maximum pool size
149      * @param keepAliveTime Default duration for a thread
150      * @param unit Time unit used for the keepAlive value
151      * @param eventQueueHandler The queue used to store events
152      */
153     public OrderedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
154             IoEventQueueHandler eventQueueHandler) {
155         this(corePoolSize, maximumPoolSize, keepAliveTime, unit, Executors.defaultThreadFactory(), eventQueueHandler);
156     }
157 
158     /**
159      * Creates a default ThreadPool, with default values :
160      * - A default ThreadFactory
161      * 
162      * @param corePoolSize The initial pool sizePoolSize
163      * @param maximumPoolSize The maximum pool size
164      * @param keepAliveTime Default duration for a thread
165      * @param unit Time unit used for the keepAlive value
166      * @param threadFactory The factory used to create threads
167      */
168     public OrderedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
169             ThreadFactory threadFactory) {
170         this(corePoolSize, maximumPoolSize, keepAliveTime, unit, threadFactory, null);
171     }
172 
173     /**
174      * Creates a new instance of a OrderedThreadPoolExecutor.
175      * 
176      * @param corePoolSize The initial pool sizePoolSize
177      * @param maximumPoolSize The maximum pool size
178      * @param keepAliveTime Default duration for a thread
179      * @param unit Time unit used for the keepAlive value
180      * @param threadFactory The factory used to create threads
181      * @param eventQueueHandler The queue used to store events
182      */
183     public OrderedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
184             ThreadFactory threadFactory, IoEventQueueHandler eventQueueHandler) {
185         // We have to initialize the pool with default values (0 and 1) in order to
186         // handle the exception in a better way. We can't add a try {} catch() {}
187         // around the super() call.
188         super(DEFAULT_INITIAL_THREAD_POOL_SIZE, 1, keepAliveTime, unit, new SynchronousQueue<Runnable>(),
189                 threadFactory, new AbortPolicy());
190 
191         if (corePoolSize < DEFAULT_INITIAL_THREAD_POOL_SIZE) {
192             throw new IllegalArgumentException("corePoolSize: " + corePoolSize);
193         }
194 
195         if ((maximumPoolSize == 0) || (maximumPoolSize < corePoolSize)) {
196             throw new IllegalArgumentException("maximumPoolSize: " + maximumPoolSize);
197         }
198 
199         // Now, we can setup the pool sizes
200         super.setCorePoolSize(corePoolSize);
201         super.setMaximumPoolSize(maximumPoolSize);
202 
203         // The queueHandler might be null.
204         if (eventQueueHandler == null) {
205             this.eventQueueHandler = IoEventQueueHandler.NOOP;
206         } else {
207             this.eventQueueHandler = eventQueueHandler;
208         }
209     }
210 
211     /**
212      * Get the session's tasks queue.
213      */
214     private SessionTasksQueue getSessionTasksQueue(IoSession session) {
215         SessionTasksQueue queue = (SessionTasksQueue) session.getAttribute(TASKS_QUEUE);
216 
217         if (queue == null) {
218             queue = new SessionTasksQueue();
219             SessionTasksQueue oldQueue = (SessionTasksQueue) session.setAttributeIfAbsent(TASKS_QUEUE, queue);
220 
221             if (oldQueue != null) {
222                 queue = oldQueue;
223             }
224         }
225 
226         return queue;
227     }
228 
229     /**
230      * @return The associated queue handler. 
231      */
232     public IoEventQueueHandler getQueueHandler() {
233         return eventQueueHandler;
234     }
235 
236     /**
237      * {@inheritDoc}
238      */
239     @Override
240     public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {
241         // Ignore the request.  It must always be AbortPolicy.
242     }
243 
244     /**
245      * Add a new thread to execute a task, if needed and possible.
246      * It depends on the current pool size. If it's full, we do nothing.
247      */
248     private void addWorker() {
249         synchronized (workers) {
250             if (workers.size() >= super.getMaximumPoolSize()) {
251                 return;
252             }
253 
254             // Create a new worker, and add it to the thread pool
255             Worker worker = new Worker();
256             Thread thread = getThreadFactory().newThread(worker);
257 
258             // As we have added a new thread, it's considered as idle.
259             idleWorkers.incrementAndGet();
260 
261             // Now, we can start it.
262             thread.start();
263             workers.add(worker);
264 
265             if (workers.size() > largestPoolSize) {
266                 largestPoolSize = workers.size();
267             }
268         }
269     }
270 
271     /**
272      * Add a new Worker only if there are no idle worker.
273      */
274     private void addWorkerIfNecessary() {
275         if (idleWorkers.get() == 0) {
276             synchronized (workers) {
277                 if (workers.isEmpty() || (idleWorkers.get() == 0)) {
278                     addWorker();
279                 }
280             }
281         }
282     }
283 
284     private void removeWorker() {
285         synchronized (workers) {
286             if (workers.size() <= super.getCorePoolSize()) {
287                 return;
288             }
289             waitingSessions.offer(EXIT_SIGNAL);
290         }
291     }
292 
293     /**
294      * {@inheritDoc}
295      */
296     @Override
297     public int getMaximumPoolSize() {
298         return super.getMaximumPoolSize();
299     }
300 
301     /**
302      * {@inheritDoc}
303      */
304     @Override
305     public void setMaximumPoolSize(int maximumPoolSize) {
306         if ((maximumPoolSize <= 0) || (maximumPoolSize < super.getCorePoolSize())) {
307             throw new IllegalArgumentException("maximumPoolSize: " + maximumPoolSize);
308         }
309 
310         synchronized (workers) {
311             super.setMaximumPoolSize(maximumPoolSize);
312             int difference = workers.size() - maximumPoolSize;
313             while (difference > 0) {
314                 removeWorker();
315                 --difference;
316             }
317         }
318     }
319 
320     /**
321      * {@inheritDoc}
322      */
323     @Override
324     public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
325 
326         long deadline = System.currentTimeMillis() + unit.toMillis(timeout);
327 
328         synchronized (workers) {
329             while (!isTerminated()) {
330                 long waitTime = deadline - System.currentTimeMillis();
331                 if (waitTime <= 0) {
332                     break;
333                 }
334 
335                 workers.wait(waitTime);
336             }
337         }
338         return isTerminated();
339     }
340 
341     /**
342      * {@inheritDoc}
343      */
344     @Override
345     public boolean isShutdown() {
346         return shutdown;
347     }
348 
349     /**
350      * {@inheritDoc}
351      */
352     @Override
353     public boolean isTerminated() {
354         if (!shutdown) {
355             return false;
356         }
357 
358         synchronized (workers) {
359             return workers.isEmpty();
360         }
361     }
362 
363     /**
364      * {@inheritDoc}
365      */
366     @Override
367     public void shutdown() {
368         if (shutdown) {
369             return;
370         }
371 
372         shutdown = true;
373 
374         synchronized (workers) {
375             for (int i = workers.size(); i > 0; i--) {
376                 waitingSessions.offer(EXIT_SIGNAL);
377             }
378         }
379     }
380 
381     /**
382      * {@inheritDoc}
383      */
384     @Override
385     public List<Runnable> shutdownNow() {
386         shutdown();
387 
388         List<Runnable> answer = new ArrayList<Runnable>();
389         IoSession session;
390 
391         while ((session = waitingSessions.poll()) != null) {
392             if (session == EXIT_SIGNAL) {
393                 waitingSessions.offer(EXIT_SIGNAL);
394                 Thread.yield(); // Let others take the signal.
395                 continue;
396             }
397 
398             SessionTasksQueue sessionTasksQueue = (SessionTasksQueue) session.getAttribute(TASKS_QUEUE);
399 
400             synchronized (sessionTasksQueue.tasksQueue) {
401 
402                 for (Runnable task : sessionTasksQueue.tasksQueue) {
403                     getQueueHandler().polled(this, (IoEvent) task);
404                     answer.add(task);
405                 }
406 
407                 sessionTasksQueue.tasksQueue.clear();
408             }
409         }
410 
411         return answer;
412     }
413 
414     /**
415      * A Helper class used to print the list of events being queued. 
416      */
417     private void print(Queue<Runnable> queue, IoEvent event) {
418         StringBuilder sb = new StringBuilder();
419         sb.append("Adding event ").append(event.getType()).append(" to session ").append(event.getSession().getId());
420         boolean first = true;
421         sb.append("\nQueue : [");
422         for (Runnable elem : queue) {
423             if (first) {
424                 first = false;
425             } else {
426                 sb.append(", ");
427             }
428 
429             sb.append(((IoEvent) elem).getType()).append(", ");
430         }
431         sb.append("]\n");
432         LOGGER.debug(sb.toString());
433     }
434 
435     /**
436      * {@inheritDoc}
437      */
438     @Override
439     public void execute(Runnable task) {
440         if (shutdown) {
441             rejectTask(task);
442         }
443 
444         // Check that it's a IoEvent task
445         checkTaskType(task);
446 
447         IoEvent event = (IoEvent) task;
448 
449         // Get the associated session
450         IoSession session = event.getSession();
451 
452         // Get the session's queue of events
453         SessionTasksQueue sessionTasksQueue = getSessionTasksQueue(session);
454         Queue<Runnable> tasksQueue = sessionTasksQueue.tasksQueue;
455 
456         boolean offerSession;
457 
458         // propose the new event to the event queue handler. If we
459         // use a throttle queue handler, the message may be rejected
460         // if the maximum size has been reached.
461         boolean offerEvent = eventQueueHandler.accept(this, event);
462 
463         if (offerEvent) {
464             // Ok, the message has been accepted
465             synchronized (tasksQueue) {
466                 // Inject the event into the executor taskQueue
467                 tasksQueue.offer(event);
468 
469                 if (sessionTasksQueue.processingCompleted) {
470                     sessionTasksQueue.processingCompleted = false;
471                     offerSession = true;
472                 } else {
473                     offerSession = false;
474                 }
475 
476                 if (LOGGER.isDebugEnabled()) {
477                     print(tasksQueue, event);
478                 }
479             }
480         } else {
481             offerSession = false;
482         }
483 
484         if (offerSession) {
485             // As the tasksQueue was empty, the task has been executed
486             // immediately, so we can move the session to the queue
487             // of sessions waiting for completion.
488             waitingSessions.offer(session);
489         }
490 
491         addWorkerIfNecessary();
492 
493         if (offerEvent) {
494             eventQueueHandler.offered(this, event);
495         }
496     }
497 
498     private void rejectTask(Runnable task) {
499         getRejectedExecutionHandler().rejectedExecution(task, this);
500     }
501 
502     private void checkTaskType(Runnable task) {
503         if (!(task instanceof IoEvent)) {
504             throw new IllegalArgumentException("task must be an IoEvent or its subclass.");
505         }
506     }
507 
508     /**
509      * {@inheritDoc}
510      */
511     @Override
512     public int getActiveCount() {
513         synchronized (workers) {
514             return workers.size() - idleWorkers.get();
515         }
516     }
517 
518     /**
519      * {@inheritDoc}
520      */
521     @Override
522     public long getCompletedTaskCount() {
523         synchronized (workers) {
524             long answer = completedTaskCount;
525             for (Worker w : workers) {
526                 answer += w.completedTaskCount.get();
527             }
528 
529             return answer;
530         }
531     }
532 
533     /**
534      * {@inheritDoc}
535      */
536     @Override
537     public int getLargestPoolSize() {
538         return largestPoolSize;
539     }
540 
541     /**
542      * {@inheritDoc}
543      */
544     @Override
545     public int getPoolSize() {
546         synchronized (workers) {
547             return workers.size();
548         }
549     }
550 
551     /**
552      * {@inheritDoc}
553      */
554     @Override
555     public long getTaskCount() {
556         return getCompletedTaskCount();
557     }
558 
559     /**
560      * {@inheritDoc}
561      */
562     @Override
563     public boolean isTerminating() {
564         synchronized (workers) {
565             return isShutdown() && !isTerminated();
566         }
567     }
568 
569     /**
570      * {@inheritDoc}
571      */
572     @Override
573     public int prestartAllCoreThreads() {
574         int answer = 0;
575         synchronized (workers) {
576             for (int i = super.getCorePoolSize() - workers.size(); i > 0; i--) {
577                 addWorker();
578                 answer++;
579             }
580         }
581         return answer;
582     }
583 
584     /**
585      * {@inheritDoc}
586      */
587     @Override
588     public boolean prestartCoreThread() {
589         synchronized (workers) {
590             if (workers.size() < super.getCorePoolSize()) {
591                 addWorker();
592                 return true;
593             } else {
594                 return false;
595             }
596         }
597     }
598 
599     /**
600      * {@inheritDoc}
601      */
602     @Override
603     public BlockingQueue<Runnable> getQueue() {
604         throw new UnsupportedOperationException();
605     }
606 
607     /**
608      * {@inheritDoc}
609      */
610     @Override
611     public void purge() {
612         // Nothing to purge in this implementation.
613     }
614 
615     /**
616      * {@inheritDoc}
617      */
618     @Override
619     public boolean remove(Runnable task) {
620         checkTaskType(task);
621         IoEvent event = (IoEvent) task;
622         IoSession session = event.getSession();
623         SessionTasksQueue sessionTasksQueue = (SessionTasksQueue) session.getAttribute(TASKS_QUEUE);
624 
625         if (sessionTasksQueue == null) {
626             return false;
627         }
628 
629         boolean removed;
630         Queue<Runnable> tasksQueue = sessionTasksQueue.tasksQueue;
631 
632         synchronized (tasksQueue) {
633             removed = tasksQueue.remove(task);
634         }
635 
636         if (removed) {
637             getQueueHandler().polled(this, event);
638         }
639 
640         return removed;
641     }
642 
643     /**
644      * {@inheritDoc}
645      */
646     @Override
647     public int getCorePoolSize() {
648         return super.getCorePoolSize();
649     }
650 
651     /**
652      * {@inheritDoc}
653      */
654     @Override
655     public void setCorePoolSize(int corePoolSize) {
656         if (corePoolSize < 0) {
657             throw new IllegalArgumentException("corePoolSize: " + corePoolSize);
658         }
659         if (corePoolSize > super.getMaximumPoolSize()) {
660             throw new IllegalArgumentException("corePoolSize exceeds maximumPoolSize");
661         }
662 
663         synchronized (workers) {
664             if (super.getCorePoolSize() > corePoolSize) {
665                 for (int i = super.getCorePoolSize() - corePoolSize; i > 0; i--) {
666                     removeWorker();
667                 }
668             }
669             super.setCorePoolSize(corePoolSize);
670         }
671     }
672 
673     private class Worker implements Runnable {
674 
675         private AtomicLong completedTaskCount = new AtomicLong(0);
676 
677         private Thread thread;
678 
679         public void run() {
680             thread = Thread.currentThread();
681 
682             try {
683                 for (;;) {
684                     IoSession session = fetchSession();
685 
686                     idleWorkers.decrementAndGet();
687 
688                     if (session == null) {
689                         synchronized (workers) {
690                             if (workers.size() > getCorePoolSize()) {
691                                 // Remove now to prevent duplicate exit.
692                                 workers.remove(this);
693                                 break;
694                             }
695                         }
696                     }
697 
698                     if (session == EXIT_SIGNAL) {
699                         break;
700                     }
701 
702                     try {
703                         if (session != null) {
704                             runTasks(getSessionTasksQueue(session));
705                         }
706                     } finally {
707                         idleWorkers.incrementAndGet();
708                     }
709                 }
710             } finally {
711                 synchronized (workers) {
712                     workers.remove(this);
713                     OrderedThreadPoolExecutor.this.completedTaskCount += completedTaskCount.get();
714                     workers.notifyAll();
715                 }
716             }
717         }
718 
719         private IoSession fetchSession() {
720             IoSession session = null;
721             long currentTime = System.currentTimeMillis();
722             long deadline = currentTime + getKeepAliveTime(TimeUnit.MILLISECONDS);
723             for (;;) {
724                 try {
725                     long waitTime = deadline - currentTime;
726                     if (waitTime <= 0) {
727                         break;
728                     }
729 
730                     try {
731                         session = waitingSessions.poll(waitTime, TimeUnit.MILLISECONDS);
732                         break;
733                     } finally {
734                         if (session == null) {
735                             currentTime = System.currentTimeMillis();
736                         }
737                     }
738                 } catch (InterruptedException e) {
739                     // Ignore.
740                     continue;
741                 }
742             }
743             return session;
744         }
745 
746         private void runTasks(SessionTasksQueue sessionTasksQueue) {
747             for (;;) {
748                 Runnable task;
749                 Queue<Runnable> tasksQueue = sessionTasksQueue.tasksQueue;
750 
751                 synchronized (tasksQueue) {
752                     task = tasksQueue.poll();
753 
754                     if (task == null) {
755                         sessionTasksQueue.processingCompleted = true;
756                         break;
757                     }
758                 }
759 
760                 eventQueueHandler.polled(OrderedThreadPoolExecutor.this, (IoEvent) task);
761 
762                 runTask(task);
763             }
764         }
765 
766         private void runTask(Runnable task) {
767             beforeExecute(thread, task);
768             boolean ran = false;
769             try {
770                 task.run();
771                 ran = true;
772                 afterExecute(task, null);
773                 completedTaskCount.incrementAndGet();
774             } catch (RuntimeException e) {
775                 if (!ran) {
776                     afterExecute(task, e);
777                 }
778                 throw e;
779             }
780         }
781     }
782 
783     /**
784      * A class used to store the ordered list of events to be processed by the
785      * session, and the current task state.
786      */
787     private class SessionTasksQueue {
788         /**  A queue of ordered event waiting to be processed */
789         private final Queue<Runnable> tasksQueue = new ConcurrentLinkedQueue<Runnable>();
790 
791         /** The current task state */
792         private boolean processingCompleted = true;
793     }
794 }