View Javadoc
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.filter.executor;
21  
22  import java.util.ArrayList;
23  import java.util.HashSet;
24  import java.util.List;
25  import java.util.Set;
26  import java.util.concurrent.Executors;
27  import java.util.concurrent.LinkedBlockingQueue;
28  import java.util.concurrent.RejectedExecutionHandler;
29  import java.util.concurrent.ThreadFactory;
30  import java.util.concurrent.ThreadPoolExecutor;
31  import java.util.concurrent.TimeUnit;
32  import java.util.concurrent.atomic.AtomicInteger;
33  import java.util.concurrent.atomic.AtomicLong;
34  
35  import org.apache.mina.core.session.IoEvent;
36  
37  /**
38   * A {@link ThreadPoolExecutor} that does not maintain the order of {@link IoEvent}s.
39   * This means more than one event handler methods can be invoked at the same
40   * time with mixed order.  For example, let's assume that messageReceived, messageSent,
41   * and sessionClosed events are fired.
42   * <ul>
43   * <li>All event handler methods can be called simultaneously.
44   *     (e.g. messageReceived and messageSent can be invoked at the same time.)</li>
45   * <li>The event order can be mixed up.
46   *     (e.g. sessionClosed or messageSent can be invoked before messageReceived
47   *           is invoked.)</li>
48   * </ul>
49   * If you need to maintain the order of events per session, please use
50   * {@link OrderedThreadPoolExecutor}.
51   *
52   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
53   * @org.apache.xbean.XBean
54   */
55  public class UnorderedThreadPoolExecutor extends ThreadPoolExecutor {
56  
57      private static final Runnable EXIT_SIGNAL = new Runnable() {
58          public void run() {
59              throw new Error("This method shouldn't be called. " + "Please file a bug report.");
60          }
61      };
62  
63      private final Set<Worker> workers = new HashSet<Worker>();
64  
65      private volatile int corePoolSize;
66  
67      private volatile int maximumPoolSize;
68  
69      private volatile int largestPoolSize;
70  
71      private final AtomicInteger idleWorkers = new AtomicInteger();
72  
73      private long completedTaskCount;
74  
75      private volatile boolean shutdown;
76  
77      private final IoEventQueueHandler queueHandler;
78  
79      public UnorderedThreadPoolExecutor() {
80          this(16);
81      }
82  
83      public UnorderedThreadPoolExecutor(int maximumPoolSize) {
84          this(0, maximumPoolSize);
85      }
86  
87      public UnorderedThreadPoolExecutor(int corePoolSize, int maximumPoolSize) {
88          this(corePoolSize, maximumPoolSize, 30, TimeUnit.SECONDS);
89      }
90  
91      public UnorderedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit) {
92          this(corePoolSize, maximumPoolSize, keepAliveTime, unit, Executors.defaultThreadFactory());
93      }
94  
95      public UnorderedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
96              IoEventQueueHandler queueHandler) {
97          this(corePoolSize, maximumPoolSize, keepAliveTime, unit, Executors.defaultThreadFactory(), queueHandler);
98      }
99  
100     public UnorderedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
101             ThreadFactory threadFactory) {
102         this(corePoolSize, maximumPoolSize, keepAliveTime, unit, threadFactory, null);
103     }
104 
105     public UnorderedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
106             ThreadFactory threadFactory, IoEventQueueHandler queueHandler) {
107         super(0, 1, keepAliveTime, unit, new LinkedBlockingQueue<Runnable>(), threadFactory, new AbortPolicy());
108         if (corePoolSize < 0) {
109             throw new IllegalArgumentException("corePoolSize: " + corePoolSize);
110         }
111 
112         if (maximumPoolSize == 0 || maximumPoolSize < corePoolSize) {
113             throw new IllegalArgumentException("maximumPoolSize: " + maximumPoolSize);
114         }
115 
116         if (queueHandler == null) {
117             queueHandler = IoEventQueueHandler.NOOP;
118         }
119 
120         this.corePoolSize = corePoolSize;
121         this.maximumPoolSize = maximumPoolSize;
122         this.queueHandler = queueHandler;
123     }
124 
125     public IoEventQueueHandler getQueueHandler() {
126         return queueHandler;
127     }
128 
129     @Override
130     public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {
131         // Ignore the request.  It must always be AbortPolicy.
132     }
133 
134     private void addWorker() {
135         synchronized (workers) {
136             if (workers.size() >= maximumPoolSize) {
137                 return;
138             }
139 
140             Worker worker = new Worker();
141             Thread thread = getThreadFactory().newThread(worker);
142             idleWorkers.incrementAndGet();
143             thread.start();
144             workers.add(worker);
145 
146             if (workers.size() > largestPoolSize) {
147                 largestPoolSize = workers.size();
148             }
149         }
150     }
151 
152     private void addWorkerIfNecessary() {
153         if (idleWorkers.get() == 0) {
154             synchronized (workers) {
155                 if (workers.isEmpty() || idleWorkers.get() == 0) {
156                     addWorker();
157                 }
158             }
159         }
160     }
161 
162     private void removeWorker() {
163         synchronized (workers) {
164             if (workers.size() <= corePoolSize) {
165                 return;
166             }
167             getQueue().offer(EXIT_SIGNAL);
168         }
169     }
170 
171     @Override
172     public int getMaximumPoolSize() {
173         return maximumPoolSize;
174     }
175 
176     @Override
177     public void setMaximumPoolSize(int maximumPoolSize) {
178         if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize) {
179             throw new IllegalArgumentException("maximumPoolSize: " + maximumPoolSize);
180         }
181 
182         synchronized (workers) {
183             this.maximumPoolSize = maximumPoolSize;
184             int difference = workers.size() - maximumPoolSize;
185             while (difference > 0) {
186                 removeWorker();
187                 --difference;
188             }
189         }
190     }
191 
192     @Override
193     public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
194 
195         long deadline = System.currentTimeMillis() + unit.toMillis(timeout);
196 
197         synchronized (workers) {
198             while (!isTerminated()) {
199                 long waitTime = deadline - System.currentTimeMillis();
200                 if (waitTime <= 0) {
201                     break;
202                 }
203 
204                 workers.wait(waitTime);
205             }
206         }
207         return isTerminated();
208     }
209 
210     @Override
211     public boolean isShutdown() {
212         return shutdown;
213     }
214 
215     @Override
216     public boolean isTerminated() {
217         if (!shutdown) {
218             return false;
219         }
220 
221         synchronized (workers) {
222             return workers.isEmpty();
223         }
224     }
225 
226     @Override
227     public void shutdown() {
228         if (shutdown) {
229             return;
230         }
231 
232         shutdown = true;
233 
234         synchronized (workers) {
235             for (int i = workers.size(); i > 0; i--) {
236                 getQueue().offer(EXIT_SIGNAL);
237             }
238         }
239     }
240 
241     @Override
242     public List<Runnable> shutdownNow() {
243         shutdown();
244 
245         List<Runnable> answer = new ArrayList<Runnable>();
246         Runnable task;
247         while ((task = getQueue().poll()) != null) {
248             if (task == EXIT_SIGNAL) {
249                 getQueue().offer(EXIT_SIGNAL);
250                 Thread.yield(); // Let others take the signal.
251                 continue;
252             }
253 
254             getQueueHandler().polled(this, (IoEvent) task);
255             answer.add(task);
256         }
257 
258         return answer;
259     }
260 
261     @Override
262     public void execute(Runnable task) {
263         if (shutdown) {
264             rejectTask(task);
265         }
266 
267         checkTaskType(task);
268 
269         IoEvent e = (IoEvent) task;
270         boolean offeredEvent = queueHandler.accept(this, e);
271         
272         if (offeredEvent) {
273             getQueue().offer(e);
274         }
275 
276         addWorkerIfNecessary();
277 
278         if (offeredEvent) {
279             queueHandler.offered(this, e);
280         }
281     }
282 
283     private void rejectTask(Runnable task) {
284         getRejectedExecutionHandler().rejectedExecution(task, this);
285     }
286 
287     private void checkTaskType(Runnable task) {
288         if (!(task instanceof IoEvent)) {
289             throw new IllegalArgumentException("task must be an IoEvent or its subclass.");
290         }
291     }
292 
293     @Override
294     public int getActiveCount() {
295         synchronized (workers) {
296             return workers.size() - idleWorkers.get();
297         }
298     }
299 
300     @Override
301     public long getCompletedTaskCount() {
302         synchronized (workers) {
303             long answer = completedTaskCount;
304             for (Worker w : workers) {
305                 answer += w.completedTaskCount.get();
306             }
307 
308             return answer;
309         }
310     }
311 
312     @Override
313     public int getLargestPoolSize() {
314         return largestPoolSize;
315     }
316 
317     @Override
318     public int getPoolSize() {
319         synchronized (workers) {
320             return workers.size();
321         }
322     }
323 
324     @Override
325     public long getTaskCount() {
326         return getCompletedTaskCount();
327     }
328 
329     @Override
330     public boolean isTerminating() {
331         synchronized (workers) {
332             return isShutdown() && !isTerminated();
333         }
334     }
335 
336     @Override
337     public int prestartAllCoreThreads() {
338         int answer = 0;
339         synchronized (workers) {
340             for (int i = corePoolSize - workers.size(); i > 0; i--) {
341                 addWorker();
342                 answer++;
343             }
344         }
345         return answer;
346     }
347 
348     @Override
349     public boolean prestartCoreThread() {
350         synchronized (workers) {
351             if (workers.size() < corePoolSize) {
352                 addWorker();
353                 return true;
354             }
355 
356             return false;
357         }
358     }
359 
360     @Override
361     public void purge() {
362         // Nothing to purge in this implementation.
363     }
364 
365     @Override
366     public boolean remove(Runnable task) {
367         boolean removed = super.remove(task);
368         if (removed) {
369             getQueueHandler().polled(this, (IoEvent) task);
370         }
371         return removed;
372     }
373 
374     @Override
375     public int getCorePoolSize() {
376         return corePoolSize;
377     }
378 
379     @Override
380     public void setCorePoolSize(int corePoolSize) {
381         if (corePoolSize < 0) {
382             throw new IllegalArgumentException("corePoolSize: " + corePoolSize);
383         }
384         if (corePoolSize > maximumPoolSize) {
385             throw new IllegalArgumentException("corePoolSize exceeds maximumPoolSize");
386         }
387 
388         synchronized (workers) {
389             if (this.corePoolSize > corePoolSize) {
390                 for (int i = this.corePoolSize - corePoolSize; i > 0; i--) {
391                     removeWorker();
392                 }
393             }
394             this.corePoolSize = corePoolSize;
395         }
396     }
397 
398     private class Worker implements Runnable {
399 
400         private AtomicLong completedTaskCount = new AtomicLong(0);
401 
402         private Thread thread;
403 
404         public void run() {
405             thread = Thread.currentThread();
406 
407             try {
408                 for (;;) {
409                     Runnable task = fetchTask();
410 
411                     idleWorkers.decrementAndGet();
412 
413                     if (task == null) {
414                         synchronized (workers) {
415                             if (workers.size() > corePoolSize) {
416                                 // Remove now to prevent duplicate exit.
417                                 workers.remove(this);
418                                 break;
419                             }
420                         }
421                     }
422 
423                     if (task == EXIT_SIGNAL) {
424                         break;
425                     }
426 
427                     try {
428                         if (task != null) {
429                             queueHandler.polled(UnorderedThreadPoolExecutor.this, (IoEvent) task);
430                             runTask(task);
431                         }
432                     } finally {
433                         idleWorkers.incrementAndGet();
434                     }
435                 }
436             } finally {
437                 synchronized (workers) {
438                     workers.remove(this);
439                     UnorderedThreadPoolExecutor.this.completedTaskCount += completedTaskCount.get();
440                     workers.notifyAll();
441                 }
442             }
443         }
444 
445         private Runnable fetchTask() {
446             Runnable task = null;
447             long currentTime = System.currentTimeMillis();
448             long deadline = currentTime + getKeepAliveTime(TimeUnit.MILLISECONDS);
449             for (;;) {
450                 try {
451                     long waitTime = deadline - currentTime;
452                     if (waitTime <= 0) {
453                         break;
454                     }
455 
456                     try {
457                         task = getQueue().poll(waitTime, TimeUnit.MILLISECONDS);
458                         break;
459                     } finally {
460                         if (task == null) {
461                             currentTime = System.currentTimeMillis();
462                         }
463                     }
464                 } catch (InterruptedException e) {
465                     // Ignore.
466                     continue;
467                 }
468             }
469             return task;
470         }
471 
472         private void runTask(Runnable task) {
473             beforeExecute(thread, task);
474             boolean ran = false;
475             try {
476                 task.run();
477                 ran = true;
478                 afterExecute(task, null);
479                 completedTaskCount.incrementAndGet();
480             } catch (RuntimeException e) {
481                 if (!ran) {
482                     afterExecute(task, e);
483                 }
484                 throw e;
485             }
486         }
487     }
488 }