View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.core.service;
21  
22  import java.lang.reflect.Constructor;
23  import java.util.Arrays;
24  import java.util.concurrent.Executor;
25  import java.util.concurrent.ExecutorService;
26  import java.util.concurrent.Executors;
27  import java.util.concurrent.ThreadPoolExecutor;
28  
29  import org.apache.mina.core.RuntimeIoException;
30  import org.apache.mina.core.session.AbstractIoSession;
31  import org.apache.mina.core.session.AttributeKey;
32  import org.apache.mina.core.session.IoSession;
33  import org.apache.mina.core.write.WriteRequest;
34  import org.slf4j.Logger;
35  import org.slf4j.LoggerFactory;
36  
37  /**
38   * An {@link IoProcessor} pool that distributes {@link IoSession}s into one or more
39   * {@link IoProcessor}s. Most current transport implementations use this pool internally
40   * to perform better in a multi-core environment, and therefore, you won't need to 
41   * use this pool directly unless you are running multiple {@link IoService}s in the
42   * same JVM.
43   * <p>
44   * If you are running multiple {@link IoService}s, you could want to share the pool
45   * among all services.  To do so, you can create a new {@link SimpleIoProcessorPool}
46   * instance by yourself and provide the pool as a constructor parameter when you
47   * create the services.
48   * <p>
49   * This pool uses Java reflection API to create multiple {@link IoProcessor} instances.
50   * It tries to instantiate the processor in the following order:
51   * <ol>
52   * <li>A public constructor with one {@link ExecutorService} parameter.</li>
53   * <li>A public constructor with one {@link Executor} parameter.</li>
54   * <li>A public default constructor</li>
55   * </ol>
56   * The following is an example for the NIO socket transport:
57   * <pre><code>
58   * // Create a shared pool.
59   * SimpleIoProcessorPool&lt;NioSession&gt; pool = 
60   *         new SimpleIoProcessorPool&lt;NioSession&gt;(NioProcessor.class, 16);
61   * 
62   * // Create two services that share the same pool.
63   * SocketAcceptor acceptor = new NioSocketAcceptor(pool);
64   * SocketConnector connector = new NioSocketConnector(pool);
65   * 
66   * ...
67   * 
68   * // Release related resources.
69   * connector.dispose();
70   * acceptor.dispose();
71   * pool.dispose();
72   * </code></pre>
73   * 
74   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
75   * 
76   * @param <S> the type of the {@link IoSession} to be managed by the specified
77   *            {@link IoProcessor}.
78   */
79  public class SimpleIoProcessorPool<S extends AbstractIoSession> implements IoProcessor<S> {
80      /** A logger for this class */
81      private final static Logger LOGGER = LoggerFactory.getLogger(SimpleIoProcessorPool.class);
82  
83      /** The default pool size, when no size is provided. */
84      private static final int DEFAULT_SIZE = Runtime.getRuntime().availableProcessors() + 1;
85  
86      /** A key used to store the processor pool in the session's Attributes */
87      private static final AttributeKey PROCESSOR = new AttributeKey(SimpleIoProcessorPool.class, "processor");
88  
89      /** The pool table */
90      private final IoProcessor<S>[] pool;
91  
92      /** The contained  which is passed to the IoProcessor when they are created */
93      private final Executor executor;
94  
95      /** A flag set to true if we had to create an executor */
96      private final boolean createdExecutor;
97  
98      /** A lock to protect the disposal against concurrent calls */
99      private final Object disposalLock = new Object();
100 
101     /** A flg set to true if the IoProcessor in the pool are being disposed */
102     private volatile boolean disposing;
103 
104     /** A flag set to true if all the IoProcessor contained in the pool have been disposed */
105     private volatile boolean disposed;
106 
107     /**
108      * Creates a new instance of SimpleIoProcessorPool with a default
109      * size of NbCPUs +1.
110      *
111      * @param processorType The type of IoProcessor to use
112      */
113     public SimpleIoProcessorPool(Class<? extends IoProcessor<S>> processorType) {
114         this(processorType, null, DEFAULT_SIZE);
115     }
116 
117     /**
118      * Creates a new instance of SimpleIoProcessorPool with a defined
119      * number of IoProcessors in the pool
120      *
121      * @param processorType The type of IoProcessor to use
122      * @param size The number of IoProcessor in the pool
123      */
124     public SimpleIoProcessorPool(Class<? extends IoProcessor<S>> processorType, int size) {
125         this(processorType, null, size);
126     }
127 
128     /**
129      * Creates a new instance of SimpleIoProcessorPool with an executor
130      *
131      * @param processorType The type of IoProcessor to use
132      * @param executor The {@link Executor}
133      */
134     public SimpleIoProcessorPool(Class<? extends IoProcessor<S>> processorType, Executor executor) {
135         this(processorType, executor, DEFAULT_SIZE);
136     }
137 
138     /**
139      * Creates a new instance of SimpleIoProcessorPool with an executor
140      *
141      * @param processorType The type of IoProcessor to use
142      * @param executor The {@link Executor}
143      * @param size The number of IoProcessor in the pool
144      */
145     @SuppressWarnings("unchecked")
146     public SimpleIoProcessorPool(Class<? extends IoProcessor<S>> processorType, Executor executor, int size) {
147         if (processorType == null) {
148             throw new IllegalArgumentException("processorType");
149         }
150 
151         if (size <= 0) {
152             throw new IllegalArgumentException("size: " + size + " (expected: positive integer)");
153         }
154 
155         // Create the executor if none is provided
156         createdExecutor = (executor == null);
157 
158         if (createdExecutor) {
159             this.executor = Executors.newCachedThreadPool();
160             // Set a default reject handler
161             ((ThreadPoolExecutor) this.executor).setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
162         } else {
163             this.executor = executor;
164         }
165 
166         pool = new IoProcessor[size];
167 
168         boolean success = false;
169         Constructor<? extends IoProcessor<S>> processorConstructor = null;
170         boolean usesExecutorArg = true;
171 
172         try {
173             // We create at least one processor
174             try {
175                 try {
176                     processorConstructor = processorType.getConstructor(ExecutorService.class);
177                     pool[0] = processorConstructor.newInstance(this.executor);
178                 } catch (NoSuchMethodException e1) {
179                     // To the next step...
180                     try {
181                         processorConstructor = processorType.getConstructor(Executor.class);
182                         pool[0] = processorConstructor.newInstance(this.executor);
183                     } catch (NoSuchMethodException e2) {
184                         // To the next step...
185                         try {
186                             processorConstructor = processorType.getConstructor();
187                             usesExecutorArg = false;
188                             pool[0] = processorConstructor.newInstance();
189                         } catch (NoSuchMethodException e3) {
190                             // To the next step...
191                         }
192                     }
193                 }
194             } catch (RuntimeException re) {
195                 LOGGER.error("Cannot create an IoProcessor :{}", re.getMessage());
196                 throw re;
197             } catch (Exception e) {
198                 String msg = "Failed to create a new instance of " + processorType.getName() + ":" + e.getMessage();
199                 LOGGER.error(msg, e);
200                 throw new RuntimeIoException(msg, e);
201             }
202 
203             if (processorConstructor == null) {
204                 // Raise an exception if no proper constructor is found.
205                 String msg = String.valueOf(processorType) + " must have a public constructor with one "
206                         + ExecutorService.class.getSimpleName() + " parameter, a public constructor with one "
207                         + Executor.class.getSimpleName() + " parameter or a public default constructor.";
208                 LOGGER.error(msg);
209                 throw new IllegalArgumentException(msg);
210             }
211 
212             // Constructor found now use it for all subsequent instantiations
213             for (int i = 1; i < pool.length; i++) {
214                 try {
215                     if (usesExecutorArg) {
216                         pool[i] = processorConstructor.newInstance(this.executor);
217                     } else {
218                         pool[i] = processorConstructor.newInstance();
219                     }
220                 } catch (Exception e) {
221                     // Won't happen because it has been done previously
222                 }
223             }
224 
225             success = true;
226         } finally {
227             if (!success) {
228                 dispose();
229             }
230         }
231     }
232 
233     /**
234      * {@inheritDoc}
235      */
236     public final void add(S session) {
237         getProcessor(session).add(session);
238     }
239 
240     /**
241      * {@inheritDoc}
242      */
243     public final void flush(S session) {
244         getProcessor(session).flush(session);
245     }
246 
247     /**
248      * {@inheritDoc}
249      */
250     public final void write(S session, WriteRequest writeRequest) {
251         getProcessor(session).write(session, writeRequest);
252     }
253 
254     /**
255      * {@inheritDoc}
256      */
257     public final void remove(S session) {
258         getProcessor(session).remove(session);
259     }
260 
261     /**
262      * {@inheritDoc}
263      */
264     public final void updateTrafficControl(S session) {
265         getProcessor(session).updateTrafficControl(session);
266     }
267 
268     /**
269      * {@inheritDoc}
270      */
271     public boolean isDisposed() {
272         return disposed;
273     }
274 
275     /**
276      * {@inheritDoc}
277      */
278     public boolean isDisposing() {
279         return disposing;
280     }
281 
282     /**
283      * {@inheritDoc}
284      */
285     public final void dispose() {
286         if (disposed) {
287             return;
288         }
289 
290         synchronized (disposalLock) {
291             if (!disposing) {
292                 disposing = true;
293 
294                 for (IoProcessor<S> ioProcessor : pool) {
295                     if (ioProcessor == null) {
296                         // Special case if the pool has not been initialized properly
297                         continue;
298                     }
299 
300                     if (ioProcessor.isDisposing()) {
301                         continue;
302                     }
303 
304                     try {
305                         ioProcessor.dispose();
306                     } catch (Exception e) {
307                         LOGGER.warn("Failed to dispose the {} IoProcessor.", ioProcessor.getClass().getSimpleName(), e);
308                     }
309                 }
310 
311                 if (createdExecutor) {
312                     ((ExecutorService) executor).shutdown();
313                 }
314             }
315 
316             Arrays.fill(pool, null);
317             disposed = true;
318         }
319     }
320 
321     /**
322      * Find the processor associated to a session. If it hasen't be stored into
323      * the session's attributes, pick a new processor and stores it.
324      */
325     @SuppressWarnings("unchecked")
326     private IoProcessor<S> getProcessor(S session) {
327         IoProcessor<S> processor = (IoProcessor<S>) session.getAttribute(PROCESSOR);
328 
329         if (processor == null) {
330             if (disposed || disposing) {
331                 throw new IllegalStateException("A disposed processor cannot be accessed.");
332             }
333 
334             processor = pool[Math.abs((int) session.getId()) % pool.length];
335 
336             if (processor == null) {
337                 throw new IllegalStateException("A disposed processor cannot be accessed.");
338             }
339 
340             session.setAttributeIfAbsent(PROCESSOR, processor);
341         }
342 
343         return processor;
344     }
345 }