View Javadoc
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.filter.executor;
21  
22  import java.util.ArrayList;
23  import java.util.HashSet;
24  import java.util.List;
25  import java.util.Set;
26  import java.util.concurrent.Executors;
27  import java.util.concurrent.LinkedBlockingQueue;
28  import java.util.concurrent.RejectedExecutionHandler;
29  import java.util.concurrent.ThreadFactory;
30  import java.util.concurrent.ThreadPoolExecutor;
31  import java.util.concurrent.TimeUnit;
32  import java.util.concurrent.atomic.AtomicInteger;
33  
34  import org.apache.mina.core.session.IoEvent;
35  
36  /**
37   * A {@link ThreadPoolExecutor} that does not maintain the order of {@link IoEvent}s.
38   * This means more than one event handler methods can be invoked at the same
39   * time with mixed order.  For example, let's assume that messageReceived, messageSent,
40   * and sessionClosed events are fired.
41   * <ul>
42   * <li>All event handler methods can be called simultaneously.
43   *     (e.g. messageReceived and messageSent can be invoked at the same time.)</li>
44   * <li>The event order can be mixed up.
45   *     (e.g. sessionClosed or messageSent can be invoked before messageReceived
46   *           is invoked.)</li>
47   * </ul>
48   * If you need to maintain the order of events per session, please use
49   * {@link OrderedThreadPoolExecutor}.
50   *
51   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
52   * @org.apache.xbean.XBean
53   */
54  public class UnorderedThreadPoolExecutor extends ThreadPoolExecutor {
55  
56      private static final Runnable EXIT_SIGNAL = new Runnable() {
57          public void run() {
58              throw new Error("This method shouldn't be called. " + "Please file a bug report.");
59          }
60      };
61  
62      private final Set<Worker> workers = new HashSet<Worker>();
63  
64      private volatile int corePoolSize;
65  
66      private volatile int maximumPoolSize;
67  
68      private volatile int largestPoolSize;
69  
70      private final AtomicInteger idleWorkers = new AtomicInteger();
71  
72      private long completedTaskCount;
73  
74      private volatile boolean shutdown;
75  
76      private final IoEventQueueHandler queueHandler;
77  
78      public UnorderedThreadPoolExecutor() {
79          this(16);
80      }
81  
82      public UnorderedThreadPoolExecutor(int maximumPoolSize) {
83          this(0, maximumPoolSize);
84      }
85  
86      public UnorderedThreadPoolExecutor(int corePoolSize, int maximumPoolSize) {
87          this(corePoolSize, maximumPoolSize, 30, TimeUnit.SECONDS);
88      }
89  
90      public UnorderedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit) {
91          this(corePoolSize, maximumPoolSize, keepAliveTime, unit, Executors.defaultThreadFactory());
92      }
93  
94      public UnorderedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
95              IoEventQueueHandler queueHandler) {
96          this(corePoolSize, maximumPoolSize, keepAliveTime, unit, Executors.defaultThreadFactory(), queueHandler);
97      }
98  
99      public UnorderedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
100             ThreadFactory threadFactory) {
101         this(corePoolSize, maximumPoolSize, keepAliveTime, unit, threadFactory, null);
102     }
103 
104     public UnorderedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
105             ThreadFactory threadFactory, IoEventQueueHandler queueHandler) {
106         super(0, 1, keepAliveTime, unit, new LinkedBlockingQueue<Runnable>(), threadFactory, new AbortPolicy());
107         if (corePoolSize < 0) {
108             throw new IllegalArgumentException("corePoolSize: " + corePoolSize);
109         }
110 
111         if (maximumPoolSize == 0 || maximumPoolSize < corePoolSize) {
112             throw new IllegalArgumentException("maximumPoolSize: " + maximumPoolSize);
113         }
114 
115         if (queueHandler == null) {
116             queueHandler = IoEventQueueHandler.NOOP;
117         }
118 
119         this.corePoolSize = corePoolSize;
120         this.maximumPoolSize = maximumPoolSize;
121         this.queueHandler = queueHandler;
122     }
123 
124     public IoEventQueueHandler getQueueHandler() {
125         return queueHandler;
126     }
127 
128     @Override
129     public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {
130         // Ignore the request.  It must always be AbortPolicy.
131     }
132 
133     private void addWorker() {
134         synchronized (workers) {
135             if (workers.size() >= maximumPoolSize) {
136                 return;
137             }
138 
139             Worker worker = new Worker();
140             Thread thread = getThreadFactory().newThread(worker);
141             idleWorkers.incrementAndGet();
142             thread.start();
143             workers.add(worker);
144 
145             if (workers.size() > largestPoolSize) {
146                 largestPoolSize = workers.size();
147             }
148         }
149     }
150 
151     private void addWorkerIfNecessary() {
152         if (idleWorkers.get() == 0) {
153             synchronized (workers) {
154                 if (workers.isEmpty() || idleWorkers.get() == 0) {
155                     addWorker();
156                 }
157             }
158         }
159     }
160 
161     private void removeWorker() {
162         synchronized (workers) {
163             if (workers.size() <= corePoolSize) {
164                 return;
165             }
166             getQueue().offer(EXIT_SIGNAL);
167         }
168     }
169 
170     @Override
171     public int getMaximumPoolSize() {
172         return maximumPoolSize;
173     }
174 
175     @Override
176     public void setMaximumPoolSize(int maximumPoolSize) {
177         if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize) {
178             throw new IllegalArgumentException("maximumPoolSize: " + maximumPoolSize);
179         }
180 
181         synchronized (workers) {
182             this.maximumPoolSize = maximumPoolSize;
183             int difference = workers.size() - maximumPoolSize;
184             while (difference > 0) {
185                 removeWorker();
186                 --difference;
187             }
188         }
189     }
190 
191     @Override
192     public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
193 
194         long deadline = System.currentTimeMillis() + unit.toMillis(timeout);
195 
196         synchronized (workers) {
197             while (!isTerminated()) {
198                 long waitTime = deadline - System.currentTimeMillis();
199                 if (waitTime <= 0) {
200                     break;
201                 }
202 
203                 workers.wait(waitTime);
204             }
205         }
206         return isTerminated();
207     }
208 
209     @Override
210     public boolean isShutdown() {
211         return shutdown;
212     }
213 
214     @Override
215     public boolean isTerminated() {
216         if (!shutdown) {
217             return false;
218         }
219 
220         synchronized (workers) {
221             return workers.isEmpty();
222         }
223     }
224 
225     @Override
226     public void shutdown() {
227         if (shutdown) {
228             return;
229         }
230 
231         shutdown = true;
232 
233         synchronized (workers) {
234             for (int i = workers.size(); i > 0; i--) {
235                 getQueue().offer(EXIT_SIGNAL);
236             }
237         }
238     }
239 
240     @Override
241     public List<Runnable> shutdownNow() {
242         shutdown();
243 
244         List<Runnable> answer = new ArrayList<Runnable>();
245         Runnable task;
246         while ((task = getQueue().poll()) != null) {
247             if (task == EXIT_SIGNAL) {
248                 getQueue().offer(EXIT_SIGNAL);
249                 Thread.yield(); // Let others take the signal.
250                 continue;
251             }
252 
253             getQueueHandler().polled(this, (IoEvent) task);
254             answer.add(task);
255         }
256 
257         return answer;
258     }
259 
260     @Override
261     public void execute(Runnable task) {
262         if (shutdown) {
263             rejectTask(task);
264         }
265 
266         checkTaskType(task);
267 
268         IoEvent e = (IoEvent) task;
269         boolean offeredEvent = queueHandler.accept(this, e);
270         
271         if (offeredEvent) {
272             getQueue().offer(e);
273         }
274 
275         addWorkerIfNecessary();
276 
277         if (offeredEvent) {
278             queueHandler.offered(this, e);
279         }
280     }
281 
282     private void rejectTask(Runnable task) {
283         getRejectedExecutionHandler().rejectedExecution(task, this);
284     }
285 
286     private void checkTaskType(Runnable task) {
287         if (!(task instanceof IoEvent)) {
288             throw new IllegalArgumentException("task must be an IoEvent or its subclass.");
289         }
290     }
291 
292     @Override
293     public int getActiveCount() {
294         synchronized (workers) {
295             return workers.size() - idleWorkers.get();
296         }
297     }
298 
299     @Override
300     public long getCompletedTaskCount() {
301         synchronized (workers) {
302             long answer = completedTaskCount;
303             for (Worker w : workers) {
304                 answer += w.completedTaskCount;
305             }
306 
307             return answer;
308         }
309     }
310 
311     @Override
312     public int getLargestPoolSize() {
313         return largestPoolSize;
314     }
315 
316     @Override
317     public int getPoolSize() {
318         synchronized (workers) {
319             return workers.size();
320         }
321     }
322 
323     @Override
324     public long getTaskCount() {
325         return getCompletedTaskCount();
326     }
327 
328     @Override
329     public boolean isTerminating() {
330         synchronized (workers) {
331             return isShutdown() && !isTerminated();
332         }
333     }
334 
335     @Override
336     public int prestartAllCoreThreads() {
337         int answer = 0;
338         synchronized (workers) {
339             for (int i = corePoolSize - workers.size(); i > 0; i--) {
340                 addWorker();
341                 answer++;
342             }
343         }
344         return answer;
345     }
346 
347     @Override
348     public boolean prestartCoreThread() {
349         synchronized (workers) {
350             if (workers.size() < corePoolSize) {
351                 addWorker();
352                 return true;
353             }
354 
355             return false;
356         }
357     }
358 
359     @Override
360     public void purge() {
361         // Nothing to purge in this implementation.
362     }
363 
364     @Override
365     public boolean remove(Runnable task) {
366         boolean removed = super.remove(task);
367         if (removed) {
368             getQueueHandler().polled(this, (IoEvent) task);
369         }
370         return removed;
371     }
372 
373     @Override
374     public int getCorePoolSize() {
375         return corePoolSize;
376     }
377 
378     @Override
379     public void setCorePoolSize(int corePoolSize) {
380         if (corePoolSize < 0) {
381             throw new IllegalArgumentException("corePoolSize: " + corePoolSize);
382         }
383         if (corePoolSize > maximumPoolSize) {
384             throw new IllegalArgumentException("corePoolSize exceeds maximumPoolSize");
385         }
386 
387         synchronized (workers) {
388             if (this.corePoolSize > corePoolSize) {
389                 for (int i = this.corePoolSize - corePoolSize; i > 0; i--) {
390                     removeWorker();
391                 }
392             }
393             this.corePoolSize = corePoolSize;
394         }
395     }
396 
397     private class Worker implements Runnable {
398 
399         private volatile long completedTaskCount;
400 
401         private Thread thread;
402 
403         public void run() {
404             thread = Thread.currentThread();
405 
406             try {
407                 for (;;) {
408                     Runnable task = fetchTask();
409 
410                     idleWorkers.decrementAndGet();
411 
412                     if (task == null) {
413                         synchronized (workers) {
414                             if (workers.size() > corePoolSize) {
415                                 // Remove now to prevent duplicate exit.
416                                 workers.remove(this);
417                                 break;
418                             }
419                         }
420                     }
421 
422                     if (task == EXIT_SIGNAL) {
423                         break;
424                     }
425 
426                     try {
427                         if (task != null) {
428                             queueHandler.polled(UnorderedThreadPoolExecutor.this, (IoEvent) task);
429                             runTask(task);
430                         }
431                     } finally {
432                         idleWorkers.incrementAndGet();
433                     }
434                 }
435             } finally {
436                 synchronized (workers) {
437                     workers.remove(this);
438                     UnorderedThreadPoolExecutor.this.completedTaskCount += completedTaskCount;
439                     workers.notifyAll();
440                 }
441             }
442         }
443 
444         private Runnable fetchTask() {
445             Runnable task = null;
446             long currentTime = System.currentTimeMillis();
447             long deadline = currentTime + getKeepAliveTime(TimeUnit.MILLISECONDS);
448             for (;;) {
449                 try {
450                     long waitTime = deadline - currentTime;
451                     if (waitTime <= 0) {
452                         break;
453                     }
454 
455                     try {
456                         task = getQueue().poll(waitTime, TimeUnit.MILLISECONDS);
457                         break;
458                     } finally {
459                         if (task == null) {
460                             currentTime = System.currentTimeMillis();
461                         }
462                     }
463                 } catch (InterruptedException e) {
464                     // Ignore.
465                     continue;
466                 }
467             }
468             return task;
469         }
470 
471         private void runTask(Runnable task) {
472             beforeExecute(thread, task);
473             boolean ran = false;
474             try {
475                 task.run();
476                 ran = true;
477                 afterExecute(task, null);
478                 completedTaskCount++;
479             } catch (RuntimeException e) {
480                 if (!ran) {
481                     afterExecute(task, e);
482                 }
483                 throw e;
484             }
485         }
486     }
487 }