View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.core.service;
21  
22  import java.lang.reflect.Constructor;
23  import java.util.Arrays;
24  import java.util.concurrent.Executor;
25  import java.util.concurrent.ExecutorService;
26  import java.util.concurrent.Executors;
27  
28  import org.apache.mina.core.RuntimeIoException;
29  import org.apache.mina.core.session.AbstractIoSession;
30  import org.apache.mina.core.session.AttributeKey;
31  import org.apache.mina.core.session.IoSession;
32  import org.slf4j.Logger;
33  import org.slf4j.LoggerFactory;
34  
35  /**
36   * An {@link IoProcessor} pool that distributes {@link IoSession}s into one or more
37   * {@link IoProcessor}s. Most current transport implementations use this pool internally
38   * to perform better in a multi-core environment, and therefore, you won't need to 
39   * use this pool directly unless you are running multiple {@link IoService}s in the
40   * same JVM.
41   * <p>
42   * If you are running multiple {@link IoService}s, you could want to share the pool
43   * among all services.  To do so, you can create a new {@link SimpleIoProcessorPool}
44   * instance by yourself and provide the pool as a constructor parameter when you
45   * create the services.
46   * <p>
47   * This pool uses Java reflection API to create multiple {@link IoProcessor} instances.
48   * It tries to instantiate the processor in the following order:
49   * <ol>
50   * <li>A public constructor with one {@link ExecutorService} parameter.</li>
51   * <li>A public constructor with one {@link Executor} parameter.</li>
52   * <li>A public default constructor</li>
53   * </ol>
54   * The following is an example for the NIO socket transport:
55   * <pre><code>
56   * // Create a shared pool.
57   * SimpleIoProcessorPool&lt;NioSession&gt; pool = 
58   *         new SimpleIoProcessorPool&lt;NioSession&gt;(NioProcessor.class, 16);
59   * 
60   * // Create two services that share the same pool.
61   * SocketAcceptor acceptor = new NioSocketAcceptor(pool);
62   * SocketConnector connector = new NioSocketConnector(pool);
63   * 
64   * ...
65   * 
66   * // Release related resources.
67   * connector.dispose();
68   * acceptor.dispose();
69   * pool.dispose();
70   * </code></pre>
71   * 
72   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
73   * 
74   * @param <S> the type of the {@link IoSession} to be managed by the specified
75   *            {@link IoProcessor}.
76   */
77  public class SimpleIoProcessorPool<S extends AbstractIoSession> implements IoProcessor<S> {
78      /** A logger for this class */
79      private final static Logger LOGGER = LoggerFactory.getLogger(SimpleIoProcessorPool.class);
80  
81      /** The default pool size, when no size is provided. */
82      private static final int DEFAULT_SIZE = Runtime.getRuntime().availableProcessors() + 1;
83  
84      /** A key used to store the processor pool in the session's Attributes */
85      private static final AttributeKey PROCESSOR = new AttributeKey( SimpleIoProcessorPool.class, "processor");
86  
87      /** The pool table */
88      private final IoProcessor<S>[] pool;
89  
90      /** The contained  which is passed to the IoProcessor when they are created */
91      private final Executor executor;
92  
93      /** A flag set to true if we had to create an executor */
94      private final boolean createdExecutor;
95  
96      /** A lock to protect the disposal against concurrent calls */
97      private final Object disposalLock = new Object();
98  
99      /** A flg set to true if the IoProcessor in the pool are being disposed */
100     private volatile boolean disposing;
101 
102     /** A flag set to true if all the IoProcessor contained in the pool have been disposed */
103     private volatile boolean disposed;
104 
105     /**
106      * Creates a new instance of SimpleIoProcessorPool with a default
107      * size of NbCPUs +1.
108      *
109      * @param processorType The type of IoProcessor to use
110      */
111     public SimpleIoProcessorPool(Class<? extends IoProcessor<S>> processorType) {
112         this(processorType, null, DEFAULT_SIZE);
113     }
114 
115     /**
116      * Creates a new instance of SimpleIoProcessorPool with a defined
117      * number of IoProcessors in the pool
118      *
119      * @param processorType The type of IoProcessor to use
120      * @param size The number of IoProcessor in the pool
121      */
122     public SimpleIoProcessorPool(Class<? extends IoProcessor<S>> processorType, int size) {
123         this(processorType, null, size);
124     }
125 
126     /**
127      * Creates a new instance of SimpleIoProcessorPool with an executor
128      *
129      * @param processorType The type of IoProcessor to use
130      * @param executor The {@link Executor}
131      */
132     public SimpleIoProcessorPool(Class<? extends IoProcessor<S>> processorType, Executor executor) {
133         this(processorType, executor, DEFAULT_SIZE);
134     }
135 
136     /**
137      * Creates a new instance of SimpleIoProcessorPool with an executor
138      *
139      * @param processorType The type of IoProcessor to use
140      * @param executor The {@link Executor}
141      * @param size The number of IoProcessor in the pool
142      */
143     @SuppressWarnings("unchecked")
144     public SimpleIoProcessorPool(Class<? extends IoProcessor<S>> processorType,
145             Executor executor, int size) {
146         if (processorType == null) {
147             throw new IllegalArgumentException("processorType");
148         }
149 
150         if (size <= 0) {
151             throw new IllegalArgumentException("size: " + size
152                     + " (expected: positive integer)");
153         }
154 
155         // Create the executor if none is provided
156         createdExecutor = (executor == null);
157         
158         if (createdExecutor) {
159             this.executor = Executors.newCachedThreadPool();
160         } else {
161             this.executor = executor;
162         }
163 
164         pool = new IoProcessor[size];
165 
166         boolean success = false;
167         Constructor<? extends IoProcessor<S>> processorConstructor = null;
168         boolean usesExecutorArg = true;
169 
170         try {
171             // We create at least one processor
172             try {
173                 try {
174                     processorConstructor = processorType.getConstructor(ExecutorService.class);
175                     pool[0] = processorConstructor.newInstance(this.executor);
176                 } catch (NoSuchMethodException e1) {
177                     // To the next step...
178                     try {
179                         processorConstructor = processorType.getConstructor(Executor.class);
180                         pool[0] = processorConstructor.newInstance(this.executor);
181                     } catch (NoSuchMethodException e2) {
182                         // To the next step...
183                         try {
184                             processorConstructor = processorType.getConstructor();
185                             usesExecutorArg = false;
186                             pool[0] = processorConstructor.newInstance();
187                         } catch (NoSuchMethodException e3) {
188                             // To the next step...
189                         }
190                     }
191                 }
192             } catch (RuntimeException re) {
193                 LOGGER.error("Cannot create an IoProcessor :{}", re.getMessage());
194                 throw re;
195             } catch (Exception e) {
196                 String msg = "Failed to create a new instance of " + processorType.getName() + ":" + e.getMessage();
197                 LOGGER.error(msg, e);
198                 throw new RuntimeIoException(msg , e);
199             }
200 
201             if (processorConstructor == null) {
202                 // Raise an exception if no proper constructor is found.
203                 String msg = String.valueOf(processorType)
204                     + " must have a public constructor with one "
205                     + ExecutorService.class.getSimpleName()
206                     + " parameter, a public constructor with one "
207                     + Executor.class.getSimpleName()
208                     + " parameter or a public default constructor.";
209                 LOGGER.error(msg);
210                 throw new IllegalArgumentException(msg);
211             }
212 
213             // Constructor found now use it for all subsequent instantiations
214             for (int i = 1; i < pool.length; i++) {
215                 try {
216                     if (usesExecutorArg) {
217                         pool[i] = processorConstructor.newInstance(this.executor);
218                     } else {
219                         pool[i] = processorConstructor.newInstance();
220                     }
221                 } catch (Exception e) {
222                     // Won't happen because it has been done previously
223                 }
224             }
225             
226             success = true;
227         } finally {
228             if (!success) {
229                 dispose();
230             }
231         }
232     }
233 
234     /**
235      * {@inheritDoc}
236      */
237     public final void add(S session) {
238         getProcessor(session).add(session);
239     }
240 
241     /**
242      * {@inheritDoc}
243      */
244     public final void flush(S session) {
245         getProcessor(session).flush(session);
246     }
247 
248     /**
249      * {@inheritDoc}
250      */
251     public final void remove(S session) {
252         getProcessor(session).remove(session);
253     }
254 
255     /**
256      * {@inheritDoc}
257      */
258     public final void updateTrafficControl(S session) {
259         getProcessor(session).updateTrafficControl(session);
260     }
261 
262     /**
263      * {@inheritDoc}
264      */
265     public boolean isDisposed() {
266         return disposed;
267     }
268 
269     /**
270      * {@inheritDoc}
271      */
272     public boolean isDisposing() {
273         return disposing;
274     }
275 
276     /**
277      * {@inheritDoc}
278      */
279     public final void dispose() {
280         if (disposed) {
281             return;
282         }
283 
284         synchronized (disposalLock) {
285             if (!disposing) {
286                 disposing = true;
287                 
288                 for (IoProcessor<S> ioProcessor : pool) {
289                     if (ioProcessor == null) {
290                         // Special case if the pool has not been initialized properly
291                         continue;
292                     }
293                     
294                     if (ioProcessor.isDisposing()) {
295                         continue;
296                     }
297 
298                     try {
299                         ioProcessor.dispose();
300                     } catch (Exception e) {
301                         LOGGER.warn("Failed to dispose the {} IoProcessor.", ioProcessor.getClass().getSimpleName(), e);
302                     }
303                 }
304 
305                 if (createdExecutor) {
306                     ((ExecutorService) executor).shutdown();
307                 }
308             }
309 
310             Arrays.fill(pool, null);
311             disposed = true;
312         }
313     }
314 
315     /**
316      * Find the processor associated to a session. If it hasen't be stored into
317      * the session's attributes, pick a new processor and stores it.
318      */
319     @SuppressWarnings("unchecked")
320     private IoProcessor<S> getProcessor(S session) {
321         IoProcessor<S> processor = (IoProcessor<S>) session.getAttribute(PROCESSOR);
322         
323         if (processor == null) {
324             if (disposed || disposing) {
325                 throw new IllegalStateException("A disposed processor cannot be accessed.");
326             }
327 
328             processor = pool[Math.abs((int) session.getId()) % pool.length];
329             
330             if (processor == null) {
331                 throw new IllegalStateException("A disposed processor cannot be accessed.");
332             }
333             
334             session.setAttributeIfAbsent(PROCESSOR, processor);
335         }
336 
337         return processor;
338     }
339 }