View Javadoc
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.core.service;
21  
22  import java.lang.reflect.Constructor;
23  import java.nio.channels.spi.SelectorProvider;
24  import java.util.Arrays;
25  import java.util.concurrent.Executor;
26  import java.util.concurrent.ExecutorService;
27  import java.util.concurrent.Executors;
28  import java.util.concurrent.ThreadPoolExecutor;
29  
30  import org.apache.mina.core.RuntimeIoException;
31  import org.apache.mina.core.session.AbstractIoSession;
32  import org.apache.mina.core.session.AttributeKey;
33  import org.apache.mina.core.session.IoSession;
34  import org.apache.mina.core.write.WriteRequest;
35  import org.slf4j.Logger;
36  import org.slf4j.LoggerFactory;
37  
38  /**
39   * An {@link IoProcessor} pool that distributes {@link IoSession}s into one or more
40   * {@link IoProcessor}s. Most current transport implementations use this pool internally
41   * to perform better in a multi-core environment, and therefore, you won't need to 
42   * use this pool directly unless you are running multiple {@link IoService}s in the
43   * same JVM.
44   * <p>
45   * If you are running multiple {@link IoService}s, you could want to share the pool
46   * among all services.  To do so, you can create a new {@link SimpleIoProcessorPool}
47   * instance by yourself and provide the pool as a constructor parameter when you
48   * create the services.
49   * <p>
50   * This pool uses Java reflection API to create multiple {@link IoProcessor} instances.
51   * It tries to instantiate the processor in the following order:
52   * <ol>
53   * <li>A public constructor with one {@link ExecutorService} parameter.</li>
54   * <li>A public constructor with one {@link Executor} parameter.</li>
55   * <li>A public default constructor</li>
56   * </ol>
57   * The following is an example for the NIO socket transport:
58   * <pre><code>
59   * // Create a shared pool.
60   * SimpleIoProcessorPool&lt;NioSession&gt; pool = 
61   *         new SimpleIoProcessorPool&lt;NioSession&gt;(NioProcessor.class, 16);
62   * 
63   * // Create two services that share the same pool.
64   * SocketAcceptor acceptor = new NioSocketAcceptor(pool);
65   * SocketConnector connector = new NioSocketConnector(pool);
66   * 
67   * ...
68   * 
69   * // Release related resources.
70   * connector.dispose();
71   * acceptor.dispose();
72   * pool.dispose();
73   * </code></pre>
74   * 
75   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
76   * 
77   * @param <S> the type of the {@link IoSession} to be managed by the specified
78   *            {@link IoProcessor}.
79   */
80  public class SimpleIoProcessorPool<S extends AbstractIoSession> implements IoProcessor<S> {
81      /** A logger for this class */
82      private final static Logger LOGGER = LoggerFactory.getLogger(SimpleIoProcessorPool.class);
83  
84      /** The default pool size, when no size is provided. */
85      private static final int DEFAULT_SIZE = Runtime.getRuntime().availableProcessors() + 1;
86  
87      /** A key used to store the processor pool in the session's Attributes */
88      private static final AttributeKey PROCESSOR = new AttributeKey(SimpleIoProcessorPool.class, "processor");
89  
90      /** The pool table */
91      private final IoProcessor<S>[] pool;
92  
93      /** The contained  which is passed to the IoProcessor when they are created */
94      private final Executor executor;
95  
96      /** A flag set to true if we had to create an executor */
97      private final boolean createdExecutor;
98  
99      /** A lock to protect the disposal against concurrent calls */
100     private final Object disposalLock = new Object();
101 
102     /** A flg set to true if the IoProcessor in the pool are being disposed */
103     private volatile boolean disposing;
104 
105     /** A flag set to true if all the IoProcessor contained in the pool have been disposed */
106     private volatile boolean disposed;
107 
108     /**
109      * Creates a new instance of SimpleIoProcessorPool with a default
110      * size of NbCPUs +1.
111      *
112      * @param processorType The type of IoProcessor to use
113      */
114     public SimpleIoProcessorPool(Class<? extends IoProcessor<S>> processorType) {
115         this(processorType, null, DEFAULT_SIZE, null);
116     }
117 
118     /**
119      * Creates a new instance of SimpleIoProcessorPool with a defined
120      * number of IoProcessors in the pool
121      *
122      * @param processorType The type of IoProcessor to use
123      * @param size The number of IoProcessor in the pool
124      */
125     public SimpleIoProcessorPool(Class<? extends IoProcessor<S>> processorType, int size) {
126         this(processorType, null, size, null);
127     }
128 
129     /**
130      * Creates a new instance of SimpleIoProcessorPool with a defined
131      * number of IoProcessors in the pool
132      *
133      * @param processorType The type of IoProcessor to use
134      * @param size The number of IoProcessor in the pool
135      * @param selectorProvider The SelectorProvider to use
136      */
137     public SimpleIoProcessorPool(Class<? extends IoProcessor<S>> processorType, int size, SelectorProvider selectorProvider) {
138         this(processorType, null, size, selectorProvider);
139     }
140 
141     /**
142      * Creates a new instance of SimpleIoProcessorPool with an executor
143      *
144      * @param processorType The type of IoProcessor to use
145      * @param executor The {@link Executor}
146      */
147     public SimpleIoProcessorPool(Class<? extends IoProcessor<S>> processorType, Executor executor) {
148         this(processorType, executor, DEFAULT_SIZE, null);
149     }
150 
151     /**
152      * Creates a new instance of SimpleIoProcessorPool with an executor
153      *
154      * @param processorType The type of IoProcessor to use
155      * @param executor The {@link Executor}
156      * @param size The number of IoProcessor in the pool
157      */
158     @SuppressWarnings("unchecked")
159     public SimpleIoProcessorPool(Class<? extends IoProcessor<S>> processorType, Executor executor, int size, SelectorProvider selectorProvider) {
160         if (processorType == null) {
161             throw new IllegalArgumentException("processorType");
162         }
163 
164         if (size <= 0) {
165             throw new IllegalArgumentException("size: " + size + " (expected: positive integer)");
166         }
167 
168         // Create the executor if none is provided
169         createdExecutor = (executor == null);
170 
171         if (createdExecutor) {
172             this.executor = Executors.newCachedThreadPool();
173             // Set a default reject handler
174             ((ThreadPoolExecutor) this.executor).setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
175         } else {
176             this.executor = executor;
177         }
178 
179         pool = new IoProcessor[size];
180 
181         boolean success = false;
182         Constructor<? extends IoProcessor<S>> processorConstructor = null;
183         boolean usesExecutorArg = true;
184 
185         try {
186             // We create at least one processor
187             try {
188                 try {
189                     processorConstructor = processorType.getConstructor(ExecutorService.class);
190                     pool[0] = processorConstructor.newInstance(this.executor);
191                 } catch (NoSuchMethodException e1) {
192                     // To the next step...
193                     try {
194                         if(selectorProvider==null) {
195                             processorConstructor = processorType.getConstructor(Executor.class);
196                             pool[0] = processorConstructor.newInstance(this.executor);
197                         } else {
198                             processorConstructor = processorType.getConstructor(Executor.class, SelectorProvider.class);
199                             pool[0] = processorConstructor.newInstance(this.executor,selectorProvider);
200                         }
201                     } catch (NoSuchMethodException e2) {
202                         // To the next step...
203                         try {
204                             processorConstructor = processorType.getConstructor();
205                             usesExecutorArg = false;
206                             pool[0] = processorConstructor.newInstance();
207                         } catch (NoSuchMethodException e3) {
208                             // To the next step...
209                         }
210                     }
211                 }
212             } catch (RuntimeException re) {
213                 LOGGER.error("Cannot create an IoProcessor :{}", re.getMessage());
214                 throw re;
215             } catch (Exception e) {
216                 String msg = "Failed to create a new instance of " + processorType.getName() + ":" + e.getMessage();
217                 LOGGER.error(msg, e);
218                 throw new RuntimeIoException(msg, e);
219             }
220 
221             if (processorConstructor == null) {
222                 // Raise an exception if no proper constructor is found.
223                 String msg = String.valueOf(processorType) + " must have a public constructor with one "
224                         + ExecutorService.class.getSimpleName() + " parameter, a public constructor with one "
225                         + Executor.class.getSimpleName() + " parameter or a public default constructor.";
226                 LOGGER.error(msg);
227                 throw new IllegalArgumentException(msg);
228             }
229 
230             // Constructor found now use it for all subsequent instantiations
231             for (int i = 1; i < pool.length; i++) {
232                 try {
233                     if (usesExecutorArg) {
234                         if(selectorProvider==null) {
235                             pool[i] = processorConstructor.newInstance(this.executor);
236                         } else {
237                             pool[i] = processorConstructor.newInstance(this.executor, selectorProvider);
238                         }
239                     } else {
240                         pool[i] = processorConstructor.newInstance();
241                     }
242                 } catch (Exception e) {
243                     // Won't happen because it has been done previously
244                 }
245             }
246 
247             success = true;
248         } finally {
249             if (!success) {
250                 dispose();
251             }
252         }
253     }
254 
255     /**
256      * {@inheritDoc}
257      */
258     public final void add(S session) {
259         getProcessor(session).add(session);
260     }
261 
262     /**
263      * {@inheritDoc}
264      */
265     public final void flush(S session) {
266         getProcessor(session).flush(session);
267     }
268 
269     /**
270      * {@inheritDoc}
271      */
272     public final void write(S session, WriteRequest writeRequest) {
273         getProcessor(session).write(session, writeRequest);
274     }
275 
276     /**
277      * {@inheritDoc}
278      */
279     public final void remove(S session) {
280         getProcessor(session).remove(session);
281     }
282 
283     /**
284      * {@inheritDoc}
285      */
286     public final void updateTrafficControl(S session) {
287         getProcessor(session).updateTrafficControl(session);
288     }
289 
290     /**
291      * {@inheritDoc}
292      */
293     public boolean isDisposed() {
294         return disposed;
295     }
296 
297     /**
298      * {@inheritDoc}
299      */
300     public boolean isDisposing() {
301         return disposing;
302     }
303 
304     /**
305      * {@inheritDoc}
306      */
307     public final void dispose() {
308         if (disposed) {
309             return;
310         }
311 
312         synchronized (disposalLock) {
313             if (!disposing) {
314                 disposing = true;
315 
316                 for (IoProcessor<S> ioProcessor : pool) {
317                     if (ioProcessor == null) {
318                         // Special case if the pool has not been initialized properly
319                         continue;
320                     }
321 
322                     if (ioProcessor.isDisposing()) {
323                         continue;
324                     }
325 
326                     try {
327                         ioProcessor.dispose();
328                     } catch (Exception e) {
329                         LOGGER.warn("Failed to dispose the {} IoProcessor.", ioProcessor.getClass().getSimpleName(), e);
330                     }
331                 }
332 
333                 if (createdExecutor) {
334                     ((ExecutorService) executor).shutdown();
335                 }
336             }
337 
338             Arrays.fill(pool, null);
339             disposed = true;
340         }
341     }
342 
343     /**
344      * Find the processor associated to a session. If it hasen't be stored into
345      * the session's attributes, pick a new processor and stores it.
346      */
347     @SuppressWarnings("unchecked")
348     private IoProcessor<S> getProcessor(S session) {
349         IoProcessor<S> processor = (IoProcessor<S>) session.getAttribute(PROCESSOR);
350 
351         if (processor == null) {
352             if (disposed || disposing) {
353                 throw new IllegalStateException("A disposed processor cannot be accessed.");
354             }
355 
356             processor = pool[Math.abs((int) session.getId()) % pool.length];
357 
358             if (processor == null) {
359                 throw new IllegalStateException("A disposed processor cannot be accessed.");
360             }
361 
362             session.setAttributeIfAbsent(PROCESSOR, processor);
363         }
364 
365         return processor;
366     }
367 }