View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.filter.executor;
21  
22  import java.util.ArrayList;
23  import java.util.HashSet;
24  import java.util.List;
25  import java.util.Queue;
26  import java.util.Set;
27  import java.util.concurrent.BlockingQueue;
28  import java.util.concurrent.ConcurrentLinkedQueue;
29  import java.util.concurrent.Executors;
30  import java.util.concurrent.LinkedBlockingQueue;
31  import java.util.concurrent.RejectedExecutionHandler;
32  import java.util.concurrent.SynchronousQueue;
33  import java.util.concurrent.ThreadFactory;
34  import java.util.concurrent.ThreadPoolExecutor;
35  import java.util.concurrent.TimeUnit;
36  import java.util.concurrent.atomic.AtomicInteger;
37  
38  import org.apache.mina.core.session.AttributeKey;
39  import org.apache.mina.core.session.DummySession;
40  import org.apache.mina.core.session.IoEvent;
41  import org.apache.mina.core.session.IoSession;
42  import org.slf4j.Logger;
43  import org.slf4j.LoggerFactory;
44  
45  /**
46   * A {@link ThreadPoolExecutor} that maintains the order of {@link IoEvent}s.
47   * <p>
48   * If you don't need to maintain the order of events per session, please use
49   * {@link UnorderedThreadPoolExecutor}.
50  
51   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
52   * @org.apache.xbean.XBean
53   */
54  public class OrderedThreadPoolExecutor extends ThreadPoolExecutor {
55      /** A logger for this class (commented as it breaks MDCFlter tests) */
56      static Logger LOGGER = LoggerFactory.getLogger(OrderedThreadPoolExecutor.class);
57  
58      /** A default value for the initial pool size */
59      private static final int DEFAULT_INITIAL_THREAD_POOL_SIZE = 0;
60      
61      /** A default value for the maximum pool size */
62      private static final int DEFAULT_MAX_THREAD_POOL = 16;
63      
64      /** A default value for the KeepAlive delay */
65      private static final int DEFAULT_KEEP_ALIVE = 30;
66      
67      private static final IoSession EXIT_SIGNAL = new DummySession();
68  
69      /** A key stored into the session's attribute for the event tasks being queued */ 
70      private final AttributeKey TASKS_QUEUE = new AttributeKey(getClass(), "tasksQueue");
71      
72      /** A queue used to store the available sessions */
73      private final BlockingQueue<IoSession> waitingSessions = new LinkedBlockingQueue<IoSession>();
74  
75      private final Set<Worker> workers = new HashSet<Worker>();
76  
77      private volatile int largestPoolSize;
78      private final AtomicInteger idleWorkers = new AtomicInteger();
79  
80      private long completedTaskCount;
81      private volatile boolean shutdown;
82  
83      private final IoEventQueueHandler eventQueueHandler;
84  
85      /**
86       * Creates a default ThreadPool, with default values :
87       * - minimum pool size is 0
88       * - maximum pool size is 16
89       * - keepAlive set to 30 seconds
90       * - A default ThreadFactory
91       * - All events are accepted
92       */
93      public OrderedThreadPoolExecutor() {
94          this(DEFAULT_INITIAL_THREAD_POOL_SIZE, DEFAULT_MAX_THREAD_POOL, 
95              DEFAULT_KEEP_ALIVE, TimeUnit.SECONDS, Executors.defaultThreadFactory(), null);
96      }
97  
98      /**
99       * Creates a default ThreadPool, with default values :
100      * - minimum pool size is 0
101      * - keepAlive set to 30 seconds
102      * - A default ThreadFactory
103      * - All events are accepted
104      * 
105      * @param maximumPoolSize The maximum pool size
106      */
107     public OrderedThreadPoolExecutor(int maximumPoolSize) {
108         this(DEFAULT_INITIAL_THREAD_POOL_SIZE, maximumPoolSize, DEFAULT_KEEP_ALIVE, TimeUnit.SECONDS, 
109             Executors.defaultThreadFactory(), null);
110     }
111 
112     /**
113      * Creates a default ThreadPool, with default values :
114      * - keepAlive set to 30 seconds
115      * - A default ThreadFactory
116      * - All events are accepted
117      *
118      * @param corePoolSize The initial pool sizePoolSize
119      * @param maximumPoolSize The maximum pool size
120      */
121     public OrderedThreadPoolExecutor(int corePoolSize, int maximumPoolSize) {
122         this(corePoolSize, maximumPoolSize, DEFAULT_KEEP_ALIVE, TimeUnit.SECONDS, 
123             Executors.defaultThreadFactory(), null);
124     }
125 
126     /**
127      * Creates a default ThreadPool, with default values :
128      * - A default ThreadFactory
129      * - All events are accepted
130      * 
131      * @param corePoolSize The initial pool sizePoolSize
132      * @param maximumPoolSize The maximum pool size
133      * @param keepAliveTime Default duration for a thread
134      * @param unit Time unit used for the keepAlive value
135      */
136     public OrderedThreadPoolExecutor(
137             int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit) {
138         this(corePoolSize, maximumPoolSize, keepAliveTime, unit, 
139             Executors.defaultThreadFactory(), null);
140     }
141 
142     /**
143      * Creates a default ThreadPool, with default values :
144      * - A default ThreadFactory
145      * 
146      * @param corePoolSize The initial pool sizePoolSize
147      * @param maximumPoolSize The maximum pool size
148      * @param keepAliveTime Default duration for a thread
149      * @param unit Time unit used for the keepAlive value
150      * @param eventQueueHandler The queue used to store events
151      */
152     public OrderedThreadPoolExecutor(
153             int corePoolSize, int maximumPoolSize,
154             long keepAliveTime, TimeUnit unit,
155             IoEventQueueHandler eventQueueHandler) {
156         this(corePoolSize, maximumPoolSize, keepAliveTime, unit, 
157             Executors.defaultThreadFactory(), eventQueueHandler);
158     }
159 
160     /**
161      * Creates a default ThreadPool, with default values :
162      * - A default ThreadFactory
163      * 
164      * @param corePoolSize The initial pool sizePoolSize
165      * @param maximumPoolSize The maximum pool size
166      * @param keepAliveTime Default duration for a thread
167      * @param unit Time unit used for the keepAlive value
168      * @param threadFactory The factory used to create threads
169      */
170     public OrderedThreadPoolExecutor(
171             int corePoolSize, int maximumPoolSize,
172             long keepAliveTime, TimeUnit unit,
173             ThreadFactory threadFactory) {
174         this(corePoolSize, maximumPoolSize, keepAliveTime, unit, threadFactory, null);
175     }
176 
177     /**
178      * Creates a new instance of a OrderedThreadPoolExecutor.
179      * 
180      * @param corePoolSize The initial pool sizePoolSize
181      * @param maximumPoolSize The maximum pool size
182      * @param keepAliveTime Default duration for a thread
183      * @param unit Time unit used for the keepAlive value
184      * @param threadFactory The factory used to create threads
185      * @param eventQueueHandler The queue used to store events
186      */
187     public OrderedThreadPoolExecutor(
188             int corePoolSize, int maximumPoolSize,
189             long keepAliveTime, TimeUnit unit,
190             ThreadFactory threadFactory, IoEventQueueHandler eventQueueHandler) {
191         // We have to initialize the pool with default values (0 and 1) in order to
192         // handle the exception in a better way. We can't add a try {} catch() {}
193         // around the super() call.
194         super(DEFAULT_INITIAL_THREAD_POOL_SIZE, 1, keepAliveTime, unit, 
195             new SynchronousQueue<Runnable>(), threadFactory, new AbortPolicy());
196 
197         if (corePoolSize < DEFAULT_INITIAL_THREAD_POOL_SIZE) {
198             throw new IllegalArgumentException("corePoolSize: " + corePoolSize);
199         }
200 
201         if ((maximumPoolSize == 0) || (maximumPoolSize < corePoolSize)) {
202             throw new IllegalArgumentException("maximumPoolSize: " + maximumPoolSize);
203         }
204 
205         // Now, we can setup the pool sizes
206         super.setCorePoolSize( corePoolSize );
207         super.setMaximumPoolSize( maximumPoolSize );
208         
209         // The queueHandler might be null.
210         if (eventQueueHandler == null) {
211             this.eventQueueHandler = IoEventQueueHandler.NOOP;
212         } else {
213             this.eventQueueHandler = eventQueueHandler;
214         }
215     }
216     
217 
218     /**
219      * Get the session's tasks queue.
220      */
221     private SessionTasksQueue getSessionTasksQueue(IoSession session) {
222         SessionTasksQueue queue = (SessionTasksQueue) session.getAttribute(TASKS_QUEUE);
223 
224         if (queue == null) {
225             queue = new SessionTasksQueue();
226             SessionTasksQueue oldQueue = 
227                 (SessionTasksQueue) session.setAttributeIfAbsent(TASKS_QUEUE, queue);
228             
229             if (oldQueue != null) {
230                 queue = oldQueue;
231             }
232         }
233         
234         return queue;
235     }
236     
237     /**
238      * @return The associated queue handler. 
239      */
240     public IoEventQueueHandler getQueueHandler() {
241         return eventQueueHandler;
242     }
243 
244     /**
245      * {@inheritDoc}
246      */
247     @Override
248     public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {
249         // Ignore the request.  It must always be AbortPolicy.
250     }
251 
252     /**
253      * Add a new thread to execute a task, if needed and possible.
254      * It depends on the current pool size. If it's full, we do nothing.
255      */
256     private void addWorker() {
257         synchronized (workers) {
258             if (workers.size() >= super.getMaximumPoolSize()) {
259                 return;
260             }
261 
262             // Create a new worker, and add it to the thread pool
263             Worker worker = new Worker();
264             Thread thread = getThreadFactory().newThread(worker);
265             
266             // As we have added a new thread, it's considered as idle.
267             idleWorkers.incrementAndGet();
268             
269             // Now, we can start it.
270             thread.start();
271             workers.add(worker);
272 
273             if (workers.size() > largestPoolSize) {
274                 largestPoolSize = workers.size();
275             }
276         }
277     }
278 
279     /**
280      * Add a new Worker only if there are no idle worker.
281      */
282     private void addWorkerIfNecessary() {
283         if (idleWorkers.get() == 0) {
284             synchronized (workers) {
285                 if (workers.isEmpty() || (idleWorkers.get() == 0)) {
286                     addWorker();
287                 }
288             }
289         }
290     }
291 
292     private void removeWorker() {
293         synchronized (workers) {
294             if (workers.size() <= super.getCorePoolSize()) {
295                 return;
296             }
297             waitingSessions.offer(EXIT_SIGNAL);
298         }
299     }
300 
301     /**
302      * {@inheritDoc}
303      */
304     @Override
305     public int getMaximumPoolSize() {
306         return super.getMaximumPoolSize();
307     }
308 
309     /**
310      * {@inheritDoc}
311      */
312     @Override
313     public void setMaximumPoolSize(int maximumPoolSize) {
314         if ((maximumPoolSize <= 0) || (maximumPoolSize < super.getCorePoolSize())) {
315             throw new IllegalArgumentException("maximumPoolSize: "
316                     + maximumPoolSize);
317         }
318 
319         synchronized (workers) {
320             super.setMaximumPoolSize( maximumPoolSize );
321             int difference = workers.size() - maximumPoolSize;
322             while (difference > 0) {
323                 removeWorker();
324                 --difference;
325             }
326         }
327     }
328 
329     /**
330      * {@inheritDoc}
331      */
332     @Override
333     public boolean awaitTermination(long timeout, TimeUnit unit)
334             throws InterruptedException {
335 
336         long deadline = System.currentTimeMillis() + unit.toMillis(timeout);
337 
338         synchronized (workers) {
339             while (!isTerminated()) {
340                 long waitTime = deadline - System.currentTimeMillis();
341                 if (waitTime <= 0) {
342                     break;
343                 }
344 
345                 workers.wait(waitTime);
346             }
347         }
348         return isTerminated();
349     }
350 
351     /**
352      * {@inheritDoc}
353      */
354     @Override
355     public boolean isShutdown() {
356         return shutdown;
357     }
358 
359     /**
360      * {@inheritDoc}
361      */
362     @Override
363     public boolean isTerminated() {
364         if (!shutdown) {
365             return false;
366         }
367 
368         synchronized (workers) {
369             return workers.isEmpty();
370         }
371     }
372 
373     /**
374      * {@inheritDoc}
375      */
376     @Override
377     public void shutdown() {
378         if (shutdown) {
379             return;
380         }
381 
382         shutdown = true;
383 
384         synchronized (workers) {
385             for (int i = workers.size(); i > 0; i --) {
386                 waitingSessions.offer(EXIT_SIGNAL);
387             }
388         }
389     }
390 
391     /**
392      * {@inheritDoc}
393      */
394     @Override
395     public List<Runnable> shutdownNow() {
396         shutdown();
397 
398         List<Runnable> answer = new ArrayList<Runnable>();
399         IoSession session;
400         
401         while ((session = waitingSessions.poll()) != null) {
402             if (session == EXIT_SIGNAL) {
403                 waitingSessions.offer(EXIT_SIGNAL);
404                 Thread.yield(); // Let others take the signal.
405                 continue;
406             }
407 
408             SessionTasksQueue sessionTasksQueue = (SessionTasksQueue) session.getAttribute(TASKS_QUEUE);
409             
410             synchronized (sessionTasksQueue.tasksQueue) {
411                 
412                 for (Runnable task: sessionTasksQueue.tasksQueue) {
413                     getQueueHandler().polled(this, (IoEvent) task);
414                     answer.add(task);
415                 }
416                 
417                 sessionTasksQueue.tasksQueue.clear();
418             }
419         }
420 
421         return answer;
422     }
423     
424     
425     /**
426      * A Helper class used to print the list of events being queued. 
427      */
428     private void print( Queue<Runnable> queue, IoEvent event) {
429         StringBuilder sb = new StringBuilder();
430         sb.append( "Adding event " ).append( event.getType() ).append( " to session " ).append(event.getSession().getId() );
431         boolean first = true;
432         sb.append( "\nQueue : [" );
433         for (Runnable elem:queue) {
434             if ( first ) {
435                 first = false;
436             } else {
437                 sb.append( ", " );
438             }
439                 
440             sb.append(((IoEvent)elem).getType()).append(", ");
441         }
442         sb.append( "]\n" );
443         LOGGER.debug( sb.toString() );
444     }
445 
446     /**
447      * {@inheritDoc}
448      */
449     @Override
450     public void execute(Runnable task) {
451         if (shutdown) {
452             rejectTask(task);
453         }
454 
455         // Check that it's a IoEvent task
456         checkTaskType(task);
457 
458         IoEvent event = (IoEvent) task;
459         
460         // Get the associated session
461         IoSession session = event.getSession();
462         
463         // Get the session's queue of events
464         SessionTasksQueue sessionTasksQueue = getSessionTasksQueue(session);
465         Queue<Runnable> tasksQueue = sessionTasksQueue.tasksQueue;
466         
467         boolean offerSession;
468 
469         // propose the new event to the event queue handler. If we
470         // use a throttle queue handler, the message may be rejected
471         // if the maximum size has been reached.
472         boolean offerEvent = eventQueueHandler.accept(this, event);
473         
474         if (offerEvent) {
475             // Ok, the message has been accepted
476             synchronized (tasksQueue) {
477                 // Inject the event into the executor taskQueue
478                 tasksQueue.offer(event);
479                 
480                 if (sessionTasksQueue.processingCompleted) {
481                     sessionTasksQueue.processingCompleted = false;
482                     offerSession = true;
483                 } else {
484                     offerSession = false;
485                 }
486 
487                 if (LOGGER.isDebugEnabled()) {
488                     print(tasksQueue, event);
489                 }
490             }
491         } else {
492             offerSession = false;
493         }
494 
495         if (offerSession) {
496             // As the tasksQueue was empty, the task has been executed
497             // immediately, so we can move the session to the queue
498             // of sessions waiting for completion.
499             waitingSessions.offer(session);
500         }
501 
502         addWorkerIfNecessary();
503 
504         if (offerEvent) {
505             eventQueueHandler.offered(this, event);
506         }
507     }
508 
509     private void rejectTask(Runnable task) {
510         getRejectedExecutionHandler().rejectedExecution(task, this);
511     }
512 
513     private void checkTaskType(Runnable task) {
514         if (!(task instanceof IoEvent)) {
515             throw new IllegalArgumentException("task must be an IoEvent or its subclass.");
516         }
517     }
518 
519     /**
520      * {@inheritDoc}
521      */
522     @Override
523     public int getActiveCount() {
524         synchronized (workers) {
525             return workers.size() - idleWorkers.get();
526         }
527     }
528 
529     /**
530      * {@inheritDoc}
531      */
532     @Override
533     public long getCompletedTaskCount() {
534         synchronized (workers) {
535             long answer = completedTaskCount;
536             for (Worker w: workers) {
537                 answer += w.completedTaskCount;
538             }
539 
540             return answer;
541         }
542     }
543 
544     /**
545      * {@inheritDoc}
546      */
547     @Override
548     public int getLargestPoolSize() {
549         return largestPoolSize;
550     }
551 
552     /**
553      * {@inheritDoc}
554      */
555     @Override
556     public int getPoolSize() {
557         synchronized (workers) {
558             return workers.size();
559         }
560     }
561 
562     /**
563      * {@inheritDoc}
564      */
565     @Override
566     public long getTaskCount() {
567         return getCompletedTaskCount();
568     }
569 
570     /**
571      * {@inheritDoc}
572      */
573     @Override
574     public boolean isTerminating() {
575         synchronized (workers) {
576             return isShutdown() && !isTerminated();
577         }
578     }
579 
580     /**
581      * {@inheritDoc}
582      */
583     @Override
584     public int prestartAllCoreThreads() {
585         int answer = 0;
586         synchronized (workers) {
587             for (int i = super.getCorePoolSize() - workers.size() ; i > 0; i --) {
588                 addWorker();
589                 answer ++;
590             }
591         }
592         return answer;
593     }
594 
595     /**
596      * {@inheritDoc}
597      */
598     @Override
599     public boolean prestartCoreThread() {
600         synchronized (workers) {
601             if (workers.size() < super.getCorePoolSize()) {
602                 addWorker();
603                 return true;
604             } else {
605                 return false;
606             }
607         }
608     }
609 
610     /**
611      * {@inheritDoc}
612      */
613     @Override
614     public BlockingQueue<Runnable> getQueue() {
615         throw new UnsupportedOperationException();
616     }
617 
618     /**
619      * {@inheritDoc}
620      */
621     @Override
622     public void purge() {
623         // Nothing to purge in this implementation.
624     }
625 
626     /**
627      * {@inheritDoc}
628      */
629     @Override
630     public boolean remove(Runnable task) {
631         checkTaskType(task);
632         IoEvent event = (IoEvent) task;
633         IoSession session = event.getSession();
634         SessionTasksQueue sessionTasksQueue = (SessionTasksQueue)session.getAttribute( TASKS_QUEUE );
635         Queue<Runnable> tasksQueue = sessionTasksQueue.tasksQueue;
636         
637         if (sessionTasksQueue == null) {
638             return false;
639         }
640 
641         boolean removed;
642         
643         synchronized (tasksQueue) {
644             removed = tasksQueue.remove(task);
645         }
646 
647         if (removed) {
648             getQueueHandler().polled(this, event);
649         }
650 
651         return removed;
652     }
653 
654     /**
655      * {@inheritDoc}
656      */
657     @Override
658     public int getCorePoolSize() {
659         return super.getCorePoolSize();
660     }
661 
662     /**
663      * {@inheritDoc}
664      */
665     @Override
666     public void setCorePoolSize(int corePoolSize) {
667         if (corePoolSize < 0) {
668             throw new IllegalArgumentException("corePoolSize: " + corePoolSize);
669         }
670         if (corePoolSize > super.getMaximumPoolSize()) {
671             throw new IllegalArgumentException("corePoolSize exceeds maximumPoolSize");
672         }
673 
674         synchronized (workers) {
675             if (super.getCorePoolSize()> corePoolSize) {
676                 for (int i = super.getCorePoolSize() - corePoolSize; i > 0; i --) {
677                     removeWorker();
678                 }
679             }
680             super.setCorePoolSize(corePoolSize);
681         }
682     }
683 
684     private class Worker implements Runnable {
685 
686         private volatile long completedTaskCount;
687         private Thread thread;
688         
689         public void run() {
690             thread = Thread.currentThread();
691 
692             try {
693                 for (;;) {
694                     IoSession session = fetchSession();
695 
696                     idleWorkers.decrementAndGet();
697 
698                     if (session == null) {
699                         synchronized (workers) {
700                             if (workers.size() > getCorePoolSize()) {
701                                 // Remove now to prevent duplicate exit.
702                                 workers.remove(this);
703                                 break;
704                             }
705                         }
706                     }
707 
708                     if (session == EXIT_SIGNAL) {
709                         break;
710                     }
711 
712                     try {
713                         if (session != null) {
714                             runTasks(getSessionTasksQueue(session));
715                         }
716                     } finally {
717                         idleWorkers.incrementAndGet();
718                     }
719                 }
720             } finally {
721                 synchronized (workers) {
722                     workers.remove(this);
723                     OrderedThreadPoolExecutor.this.completedTaskCount += completedTaskCount;
724                     workers.notifyAll();
725                 }
726             }
727         }
728 
729         private IoSession fetchSession() {
730             IoSession session = null;
731             long currentTime = System.currentTimeMillis();
732             long deadline = currentTime + getKeepAliveTime(TimeUnit.MILLISECONDS);
733             for (;;) {
734                 try {
735                     long waitTime = deadline - currentTime;
736                     if (waitTime <= 0) {
737                         break;
738                     }
739 
740                     try {
741                         session = waitingSessions.poll(waitTime, TimeUnit.MILLISECONDS);
742                         break;
743                     } finally {
744                         if (session == null) {
745                             currentTime = System.currentTimeMillis();
746                         }
747                     }
748                 } catch (InterruptedException e) {
749                     // Ignore.
750                     continue;
751                 }
752             }
753             return session;
754         }
755 
756         private void runTasks(SessionTasksQueue sessionTasksQueue) {
757             for (;;) {
758                 Runnable task;
759                 Queue<Runnable> tasksQueue = sessionTasksQueue.tasksQueue;
760                 
761                 synchronized (tasksQueue) {
762                     task = tasksQueue.poll();
763                     
764                     if (task == null) {
765                         sessionTasksQueue.processingCompleted = true;
766                         break;
767                     }
768                 }
769 
770                 eventQueueHandler.polled(OrderedThreadPoolExecutor.this, (IoEvent) task);
771 
772                 runTask(task);
773             }
774         }
775 
776         private void runTask(Runnable task) {
777             beforeExecute(thread, task);
778             boolean ran = false;
779             try {
780                 task.run();
781                 ran = true;
782                 afterExecute(task, null);
783                 completedTaskCount ++;
784             } catch (RuntimeException e) {
785                 if (!ran) {
786                     afterExecute(task, e);
787                 }
788                 throw e;
789             }
790         }
791     }
792     
793     
794     /**
795      * A class used to store the ordered list of events to be processed by the
796      * session, and the current task state.
797      */
798     private class SessionTasksQueue {
799         /**  A queue of ordered event waiting to be processed */ 
800         private final Queue<Runnable> tasksQueue = new ConcurrentLinkedQueue<Runnable>();
801         
802         /** The current task state */
803         private boolean processingCompleted = true;
804     }
805 }