View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.filter.executor;
21  
22  import java.util.ArrayList;
23  import java.util.HashSet;
24  import java.util.List;
25  import java.util.Set;
26  import java.util.concurrent.Executors;
27  import java.util.concurrent.LinkedBlockingQueue;
28  import java.util.concurrent.RejectedExecutionHandler;
29  import java.util.concurrent.ThreadFactory;
30  import java.util.concurrent.ThreadPoolExecutor;
31  import java.util.concurrent.TimeUnit;
32  import java.util.concurrent.atomic.AtomicInteger;
33  
34  import org.apache.mina.core.session.IoEvent;
35  
36  /**
37   * A {@link ThreadPoolExecutor} that does not maintain the order of {@link IoEvent}s.
38   * This means more than one event handler methods can be invoked at the same
39   * time with mixed order.  For example, let's assume that messageReceived, messageSent,
40   * and sessionClosed events are fired.
41   * <ul>
42   * <li>All event handler methods can be called simultaneously.
43   *     (e.g. messageReceived and messageSent can be invoked at the same time.)</li>
44   * <li>The event order can be mixed up.
45   *     (e.g. sessionClosed or messageSent can be invoked before messageReceived
46   *           is invoked.)</li>
47   * </ul>
48   * If you need to maintain the order of events per session, please use
49   * {@link OrderedThreadPoolExecutor}.
50   *
51   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
52   * @org.apache.xbean.XBean
53   */
54  public class UnorderedThreadPoolExecutor extends ThreadPoolExecutor {
55  
56      private static final Runnable EXIT_SIGNAL = new Runnable() {
57          public void run() {
58              throw new Error("This method shouldn't be called. " + "Please file a bug report.");
59          }
60      };
61  
62      private final Set<Worker> workers = new HashSet<Worker>();
63  
64      private volatile int corePoolSize;
65  
66      private volatile int maximumPoolSize;
67  
68      private volatile int largestPoolSize;
69  
70      private final AtomicInteger idleWorkers = new AtomicInteger();
71  
72      private long completedTaskCount;
73  
74      private volatile boolean shutdown;
75  
76      private final IoEventQueueHandler queueHandler;
77  
78      public UnorderedThreadPoolExecutor() {
79          this(16);
80      }
81  
82      public UnorderedThreadPoolExecutor(int maximumPoolSize) {
83          this(0, maximumPoolSize);
84      }
85  
86      public UnorderedThreadPoolExecutor(int corePoolSize, int maximumPoolSize) {
87          this(corePoolSize, maximumPoolSize, 30, TimeUnit.SECONDS);
88      }
89  
90      public UnorderedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit) {
91          this(corePoolSize, maximumPoolSize, keepAliveTime, unit, Executors.defaultThreadFactory());
92      }
93  
94      public UnorderedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
95              IoEventQueueHandler queueHandler) {
96          this(corePoolSize, maximumPoolSize, keepAliveTime, unit, Executors.defaultThreadFactory(), queueHandler);
97      }
98  
99      public UnorderedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
100             ThreadFactory threadFactory) {
101         this(corePoolSize, maximumPoolSize, keepAliveTime, unit, threadFactory, null);
102     }
103 
104     public UnorderedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
105             ThreadFactory threadFactory, IoEventQueueHandler queueHandler) {
106         super(0, 1, keepAliveTime, unit, new LinkedBlockingQueue<Runnable>(), threadFactory, new AbortPolicy());
107         if (corePoolSize < 0) {
108             throw new IllegalArgumentException("corePoolSize: " + corePoolSize);
109         }
110 
111         if (maximumPoolSize == 0 || maximumPoolSize < corePoolSize) {
112             throw new IllegalArgumentException("maximumPoolSize: " + maximumPoolSize);
113         }
114 
115         if (queueHandler == null) {
116             queueHandler = IoEventQueueHandler.NOOP;
117         }
118 
119         this.corePoolSize = corePoolSize;
120         this.maximumPoolSize = maximumPoolSize;
121         this.queueHandler = queueHandler;
122     }
123 
124     public IoEventQueueHandler getQueueHandler() {
125         return queueHandler;
126     }
127 
128     @Override
129     public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {
130         // Ignore the request.  It must always be AbortPolicy.
131     }
132 
133     private void addWorker() {
134         synchronized (workers) {
135             if (workers.size() >= maximumPoolSize) {
136                 return;
137             }
138 
139             Worker worker = new Worker();
140             Thread thread = getThreadFactory().newThread(worker);
141             idleWorkers.incrementAndGet();
142             thread.start();
143             workers.add(worker);
144 
145             if (workers.size() > largestPoolSize) {
146                 largestPoolSize = workers.size();
147             }
148         }
149     }
150 
151     private void addWorkerIfNecessary() {
152         if (idleWorkers.get() == 0) {
153             synchronized (workers) {
154                 if (workers.isEmpty() || idleWorkers.get() == 0) {
155                     addWorker();
156                 }
157             }
158         }
159     }
160 
161     private void removeWorker() {
162         synchronized (workers) {
163             if (workers.size() <= corePoolSize) {
164                 return;
165             }
166             getQueue().offer(EXIT_SIGNAL);
167         }
168     }
169 
170     @Override
171     public int getMaximumPoolSize() {
172         return maximumPoolSize;
173     }
174 
175     @Override
176     public void setMaximumPoolSize(int maximumPoolSize) {
177         if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize) {
178             throw new IllegalArgumentException("maximumPoolSize: " + maximumPoolSize);
179         }
180 
181         synchronized (workers) {
182             this.maximumPoolSize = maximumPoolSize;
183             int difference = workers.size() - maximumPoolSize;
184             while (difference > 0) {
185                 removeWorker();
186                 --difference;
187             }
188         }
189     }
190 
191     @Override
192     public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
193 
194         long deadline = System.currentTimeMillis() + unit.toMillis(timeout);
195 
196         synchronized (workers) {
197             while (!isTerminated()) {
198                 long waitTime = deadline - System.currentTimeMillis();
199                 if (waitTime <= 0) {
200                     break;
201                 }
202 
203                 workers.wait(waitTime);
204             }
205         }
206         return isTerminated();
207     }
208 
209     @Override
210     public boolean isShutdown() {
211         return shutdown;
212     }
213 
214     @Override
215     public boolean isTerminated() {
216         if (!shutdown) {
217             return false;
218         }
219 
220         synchronized (workers) {
221             return workers.isEmpty();
222         }
223     }
224 
225     @Override
226     public void shutdown() {
227         if (shutdown) {
228             return;
229         }
230 
231         shutdown = true;
232 
233         synchronized (workers) {
234             for (int i = workers.size(); i > 0; i--) {
235                 getQueue().offer(EXIT_SIGNAL);
236             }
237         }
238     }
239 
240     @Override
241     public List<Runnable> shutdownNow() {
242         shutdown();
243 
244         List<Runnable> answer = new ArrayList<Runnable>();
245         Runnable task;
246         while ((task = getQueue().poll()) != null) {
247             if (task == EXIT_SIGNAL) {
248                 getQueue().offer(EXIT_SIGNAL);
249                 Thread.yield(); // Let others take the signal.
250                 continue;
251             }
252 
253             getQueueHandler().polled(this, (IoEvent) task);
254             answer.add(task);
255         }
256 
257         return answer;
258     }
259 
260     @Override
261     public void execute(Runnable task) {
262         if (shutdown) {
263             rejectTask(task);
264         }
265 
266         checkTaskType(task);
267 
268         IoEvent e = (IoEvent) task;
269         boolean offeredEvent = queueHandler.accept(this, e);
270         if (offeredEvent) {
271             getQueue().offer(e);
272         }
273 
274         addWorkerIfNecessary();
275 
276         if (offeredEvent) {
277             queueHandler.offered(this, e);
278         }
279     }
280 
281     private void rejectTask(Runnable task) {
282         getRejectedExecutionHandler().rejectedExecution(task, this);
283     }
284 
285     private void checkTaskType(Runnable task) {
286         if (!(task instanceof IoEvent)) {
287             throw new IllegalArgumentException("task must be an IoEvent or its subclass.");
288         }
289     }
290 
291     @Override
292     public int getActiveCount() {
293         synchronized (workers) {
294             return workers.size() - idleWorkers.get();
295         }
296     }
297 
298     @Override
299     public long getCompletedTaskCount() {
300         synchronized (workers) {
301             long answer = completedTaskCount;
302             for (Worker w : workers) {
303                 answer += w.completedTaskCount;
304             }
305 
306             return answer;
307         }
308     }
309 
310     @Override
311     public int getLargestPoolSize() {
312         return largestPoolSize;
313     }
314 
315     @Override
316     public int getPoolSize() {
317         synchronized (workers) {
318             return workers.size();
319         }
320     }
321 
322     @Override
323     public long getTaskCount() {
324         return getCompletedTaskCount();
325     }
326 
327     @Override
328     public boolean isTerminating() {
329         synchronized (workers) {
330             return isShutdown() && !isTerminated();
331         }
332     }
333 
334     @Override
335     public int prestartAllCoreThreads() {
336         int answer = 0;
337         synchronized (workers) {
338             for (int i = corePoolSize - workers.size(); i > 0; i--) {
339                 addWorker();
340                 answer++;
341             }
342         }
343         return answer;
344     }
345 
346     @Override
347     public boolean prestartCoreThread() {
348         synchronized (workers) {
349             if (workers.size() < corePoolSize) {
350                 addWorker();
351                 return true;
352             }
353 
354             return false;
355         }
356     }
357 
358     @Override
359     public void purge() {
360         // Nothing to purge in this implementation.
361     }
362 
363     @Override
364     public boolean remove(Runnable task) {
365         boolean removed = super.remove(task);
366         if (removed) {
367             getQueueHandler().polled(this, (IoEvent) task);
368         }
369         return removed;
370     }
371 
372     @Override
373     public int getCorePoolSize() {
374         return corePoolSize;
375     }
376 
377     @Override
378     public void setCorePoolSize(int corePoolSize) {
379         if (corePoolSize < 0) {
380             throw new IllegalArgumentException("corePoolSize: " + corePoolSize);
381         }
382         if (corePoolSize > maximumPoolSize) {
383             throw new IllegalArgumentException("corePoolSize exceeds maximumPoolSize");
384         }
385 
386         synchronized (workers) {
387             if (this.corePoolSize > corePoolSize) {
388                 for (int i = this.corePoolSize - corePoolSize; i > 0; i--) {
389                     removeWorker();
390                 }
391             }
392             this.corePoolSize = corePoolSize;
393         }
394     }
395 
396     private class Worker implements Runnable {
397 
398         private volatile long completedTaskCount;
399 
400         private Thread thread;
401 
402         public void run() {
403             thread = Thread.currentThread();
404 
405             try {
406                 for (;;) {
407                     Runnable task = fetchTask();
408 
409                     idleWorkers.decrementAndGet();
410 
411                     if (task == null) {
412                         synchronized (workers) {
413                             if (workers.size() > corePoolSize) {
414                                 // Remove now to prevent duplicate exit.
415                                 workers.remove(this);
416                                 break;
417                             }
418                         }
419                     }
420 
421                     if (task == EXIT_SIGNAL) {
422                         break;
423                     }
424 
425                     try {
426                         if (task != null) {
427                             queueHandler.polled(UnorderedThreadPoolExecutor.this, (IoEvent) task);
428                             runTask(task);
429                         }
430                     } finally {
431                         idleWorkers.incrementAndGet();
432                     }
433                 }
434             } finally {
435                 synchronized (workers) {
436                     workers.remove(this);
437                     UnorderedThreadPoolExecutor.this.completedTaskCount += completedTaskCount;
438                     workers.notifyAll();
439                 }
440             }
441         }
442 
443         private Runnable fetchTask() {
444             Runnable task = null;
445             long currentTime = System.currentTimeMillis();
446             long deadline = currentTime + getKeepAliveTime(TimeUnit.MILLISECONDS);
447             for (;;) {
448                 try {
449                     long waitTime = deadline - currentTime;
450                     if (waitTime <= 0) {
451                         break;
452                     }
453 
454                     try {
455                         task = getQueue().poll(waitTime, TimeUnit.MILLISECONDS);
456                         break;
457                     } finally {
458                         if (task == null) {
459                             currentTime = System.currentTimeMillis();
460                         }
461                     }
462                 } catch (InterruptedException e) {
463                     // Ignore.
464                     continue;
465                 }
466             }
467             return task;
468         }
469 
470         private void runTask(Runnable task) {
471             beforeExecute(thread, task);
472             boolean ran = false;
473             try {
474                 task.run();
475                 ran = true;
476                 afterExecute(task, null);
477                 completedTaskCount++;
478             } catch (RuntimeException e) {
479                 if (!ran) {
480                     afterExecute(task, e);
481                 }
482                 throw e;
483             }
484         }
485     }
486 }