View Javadoc
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.core.service;
21  
22  import java.lang.reflect.Constructor;
23  import java.nio.channels.spi.SelectorProvider;
24  import java.util.Arrays;
25  import java.util.concurrent.Executor;
26  import java.util.concurrent.ExecutorService;
27  import java.util.concurrent.Executors;
28  import java.util.concurrent.ThreadPoolExecutor;
29  
30  import org.apache.mina.core.RuntimeIoException;
31  import org.apache.mina.core.session.AbstractIoSession;
32  import org.apache.mina.core.session.AttributeKey;
33  import org.apache.mina.core.session.IoSession;
34  import org.apache.mina.core.write.WriteRequest;
35  import org.slf4j.Logger;
36  import org.slf4j.LoggerFactory;
37  
38  /**
39   * An {@link IoProcessor} pool that distributes {@link IoSession}s into one or more
40   * {@link IoProcessor}s. Most current transport implementations use this pool internally
41   * to perform better in a multi-core environment, and therefore, you won't need to 
42   * use this pool directly unless you are running multiple {@link IoService}s in the
43   * same JVM.
44   * <p>
45   * If you are running multiple {@link IoService}s, you could want to share the pool
46   * among all services.  To do so, you can create a new {@link SimpleIoProcessorPool}
47   * instance by yourself and provide the pool as a constructor parameter when you
48   * create the services.
49   * <p>
50   * This pool uses Java reflection API to create multiple {@link IoProcessor} instances.
51   * It tries to instantiate the processor in the following order:
52   * <ol>
53   * <li>A public constructor with one {@link ExecutorService} parameter.</li>
54   * <li>A public constructor with one {@link Executor} parameter.</li>
55   * <li>A public default constructor</li>
56   * </ol>
57   * The following is an example for the NIO socket transport:
58   * <pre><code>
59   * // Create a shared pool.
60   * SimpleIoProcessorPool&lt;NioSession&gt; pool = 
61   *         new SimpleIoProcessorPool&lt;NioSession&gt;(NioProcessor.class, 16);
62   * 
63   * // Create two services that share the same pool.
64   * SocketAcceptor acceptor = new NioSocketAcceptor(pool);
65   * SocketConnector connector = new NioSocketConnector(pool);
66   * 
67   * ...
68   * 
69   * // Release related resources.
70   * connector.dispose();
71   * acceptor.dispose();
72   * pool.dispose();
73   * </code></pre>
74   * 
75   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
76   * 
77   * @param <S> the type of the {@link IoSession} to be managed by the specified
78   *            {@link IoProcessor}.
79   */
80  public class SimpleIoProcessorPool<S extends AbstractIoSession> implements IoProcessor<S> {
81      /** A logger for this class */
82      private final static Logger LOGGER = LoggerFactory.getLogger(SimpleIoProcessorPool.class);
83  
84      /** The default pool size, when no size is provided. */
85      private static final int DEFAULT_SIZE = Runtime.getRuntime().availableProcessors() + 1;
86  
87      /** A key used to store the processor pool in the session's Attributes */
88      private static final AttributeKey PROCESSOR = new AttributeKey(SimpleIoProcessorPool.class, "processor");
89  
90      /** The pool table */
91      private final IoProcessor<S>[] pool;
92  
93      /** The contained  which is passed to the IoProcessor when they are created */
94      private final Executor executor;
95  
96      /** A flag set to true if we had to create an executor */
97      private final boolean createdExecutor;
98  
99      /** A lock to protect the disposal against concurrent calls */
100     private final Object disposalLock = new Object();
101 
102     /** A flg set to true if the IoProcessor in the pool are being disposed */
103     private volatile boolean disposing;
104 
105     /** A flag set to true if all the IoProcessor contained in the pool have been disposed */
106     private volatile boolean disposed;
107 
108     /**
109      * Creates a new instance of SimpleIoProcessorPool with a default
110      * size of NbCPUs +1.
111      *
112      * @param processorType The type of IoProcessor to use
113      */
114     public SimpleIoProcessorPool(Class<? extends IoProcessor<S>> processorType) {
115         this(processorType, null, DEFAULT_SIZE, null);
116     }
117 
118     /**
119      * Creates a new instance of SimpleIoProcessorPool with a defined
120      * number of IoProcessors in the pool
121      *
122      * @param processorType The type of IoProcessor to use
123      * @param size The number of IoProcessor in the pool
124      */
125     public SimpleIoProcessorPool(Class<? extends IoProcessor<S>> processorType, int size) {
126         this(processorType, null, size, null);
127     }
128 
129     /**
130      * Creates a new instance of SimpleIoProcessorPool with a defined
131      * number of IoProcessors in the pool
132      *
133      * @param processorType The type of IoProcessor to use
134      * @param size The number of IoProcessor in the pool
135      * @param selectorProvider The SelectorProvider to use
136      */
137     public SimpleIoProcessorPool(Class<? extends IoProcessor<S>> processorType, int size, SelectorProvider selectorProvider) {
138         this(processorType, null, size, selectorProvider);
139     }
140 
141     /**
142      * Creates a new instance of SimpleIoProcessorPool with an executor
143      *
144      * @param processorType The type of IoProcessor to use
145      * @param executor The {@link Executor}
146      */
147     public SimpleIoProcessorPool(Class<? extends IoProcessor<S>> processorType, Executor executor) {
148         this(processorType, executor, DEFAULT_SIZE, null);
149     }
150 
151     /**
152      * Creates a new instance of SimpleIoProcessorPool with an executor
153      *
154      * @param processorType The type of IoProcessor to use
155      * @param executor The {@link Executor}
156      * @param size The number of IoProcessor in the pool
157      * @param selectorProvider The SelectorProvider to used
158      */
159     @SuppressWarnings("unchecked")
160     public SimpleIoProcessorPool(Class<? extends IoProcessor<S>> processorType, Executor executor, int size, 
161             SelectorProvider selectorProvider) {
162         if (processorType == null) {
163             throw new IllegalArgumentException("processorType");
164         }
165 
166         if (size <= 0) {
167             throw new IllegalArgumentException("size: " + size + " (expected: positive integer)");
168         }
169 
170         // Create the executor if none is provided
171         createdExecutor = (executor == null);
172 
173         if (createdExecutor) {
174             this.executor = Executors.newCachedThreadPool();
175             // Set a default reject handler
176             ((ThreadPoolExecutor) this.executor).setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
177         } else {
178             this.executor = executor;
179         }
180 
181         pool = new IoProcessor[size];
182 
183         boolean success = false;
184         Constructor<? extends IoProcessor<S>> processorConstructor = null;
185         boolean usesExecutorArg = true;
186 
187         try {
188             // We create at least one processor
189             try {
190                 try {
191                     processorConstructor = processorType.getConstructor(ExecutorService.class);
192                     pool[0] = processorConstructor.newInstance(this.executor);
193                 } catch (NoSuchMethodException e1) {
194                     // To the next step...
195                     try {
196                         if(selectorProvider==null) {
197                             processorConstructor = processorType.getConstructor(Executor.class);
198                             pool[0] = processorConstructor.newInstance(this.executor);
199                         } else {
200                             processorConstructor = processorType.getConstructor(Executor.class, SelectorProvider.class);
201                             pool[0] = processorConstructor.newInstance(this.executor,selectorProvider);
202                         }
203                     } catch (NoSuchMethodException e2) {
204                         // To the next step...
205                         try {
206                             processorConstructor = processorType.getConstructor();
207                             usesExecutorArg = false;
208                             pool[0] = processorConstructor.newInstance();
209                         } catch (NoSuchMethodException e3) {
210                             // To the next step...
211                         }
212                     }
213                 }
214             } catch (RuntimeException re) {
215                 LOGGER.error("Cannot create an IoProcessor :{}", re.getMessage());
216                 throw re;
217             } catch (Exception e) {
218                 String msg = "Failed to create a new instance of " + processorType.getName() + ":" + e.getMessage();
219                 LOGGER.error(msg, e);
220                 throw new RuntimeIoException(msg, e);
221             }
222 
223             if (processorConstructor == null) {
224                 // Raise an exception if no proper constructor is found.
225                 String msg = String.valueOf(processorType) + " must have a public constructor with one "
226                         + ExecutorService.class.getSimpleName() + " parameter, a public constructor with one "
227                         + Executor.class.getSimpleName() + " parameter or a public default constructor.";
228                 LOGGER.error(msg);
229                 throw new IllegalArgumentException(msg);
230             }
231 
232             // Constructor found now use it for all subsequent instantiations
233             for (int i = 1; i < pool.length; i++) {
234                 try {
235                     if (usesExecutorArg) {
236                         if(selectorProvider==null) {
237                             pool[i] = processorConstructor.newInstance(this.executor);
238                         } else {
239                             pool[i] = processorConstructor.newInstance(this.executor, selectorProvider);
240                         }
241                     } else {
242                         pool[i] = processorConstructor.newInstance();
243                     }
244                 } catch (Exception e) {
245                     // Won't happen because it has been done previously
246                 }
247             }
248 
249             success = true;
250         } finally {
251             if (!success) {
252                 dispose();
253             }
254         }
255     }
256 
257     /**
258      * {@inheritDoc}
259      */
260     public final void add(S session) {
261         getProcessor(session).add(session);
262     }
263 
264     /**
265      * {@inheritDoc}
266      */
267     public final void flush(S session) {
268         getProcessor(session).flush(session);
269     }
270 
271     /**
272      * {@inheritDoc}
273      */
274     public final void write(S session, WriteRequest writeRequest) {
275         getProcessor(session).write(session, writeRequest);
276     }
277 
278     /**
279      * {@inheritDoc}
280      */
281     public final void remove(S session) {
282         getProcessor(session).remove(session);
283     }
284 
285     /**
286      * {@inheritDoc}
287      */
288     public final void updateTrafficControl(S session) {
289         getProcessor(session).updateTrafficControl(session);
290     }
291 
292     /**
293      * {@inheritDoc}
294      */
295     public boolean isDisposed() {
296         return disposed;
297     }
298 
299     /**
300      * {@inheritDoc}
301      */
302     public boolean isDisposing() {
303         return disposing;
304     }
305 
306     /**
307      * {@inheritDoc}
308      */
309     public final void dispose() {
310         if (disposed) {
311             return;
312         }
313 
314         synchronized (disposalLock) {
315             if (!disposing) {
316                 disposing = true;
317 
318                 for (IoProcessor<S> ioProcessor : pool) {
319                     if (ioProcessor == null) {
320                         // Special case if the pool has not been initialized properly
321                         continue;
322                     }
323 
324                     if (ioProcessor.isDisposing()) {
325                         continue;
326                     }
327 
328                     try {
329                         ioProcessor.dispose();
330                     } catch (Exception e) {
331                         LOGGER.warn("Failed to dispose the {} IoProcessor.", ioProcessor.getClass().getSimpleName(), e);
332                     }
333                 }
334 
335                 if (createdExecutor) {
336                     ((ExecutorService) executor).shutdown();
337                 }
338             }
339 
340             Arrays.fill(pool, null);
341             disposed = true;
342         }
343     }
344 
345     /**
346      * Find the processor associated to a session. If it hasen't be stored into
347      * the session's attributes, pick a new processor and stores it.
348      */
349     @SuppressWarnings("unchecked")
350     private IoProcessor<S> getProcessor(S session) {
351         IoProcessor<S> processor = (IoProcessor<S>) session.getAttribute(PROCESSOR);
352 
353         if (processor == null) {
354             if (disposed || disposing) {
355                 throw new IllegalStateException("A disposed processor cannot be accessed.");
356             }
357 
358             processor = pool[Math.abs((int) session.getId()) % pool.length];
359 
360             if (processor == null) {
361                 throw new IllegalStateException("A disposed processor cannot be accessed.");
362             }
363 
364             session.setAttributeIfAbsent(PROCESSOR, processor);
365         }
366 
367         return processor;
368     }
369 }