View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.core.service;
21  
22  import java.lang.reflect.Constructor;
23  import java.util.concurrent.Executor;
24  import java.util.concurrent.ExecutorService;
25  import java.util.concurrent.Executors;
26  import java.util.concurrent.atomic.AtomicInteger;
27  
28  import org.apache.mina.core.RuntimeIoException;
29  import org.apache.mina.core.session.AbstractIoSession;
30  import org.apache.mina.core.session.AttributeKey;
31  import org.apache.mina.core.session.IoSession;
32  import org.slf4j.Logger;
33  import org.slf4j.LoggerFactory;
34  
35  /**
36   * An {@link IoProcessor} pool that distributes {@link IoSession}s into one or more
37   * {@link IoProcessor}s. Most current transport implementations use this pool internally
38   * to perform better in a multi-core environment, and therefore, you won't need to 
39   * use this pool directly unless you are running multiple {@link IoService}s in the
40   * same JVM.
41   * <p>
42   * If you are running multiple {@link IoService}s, you could want to share the pool
43   * among all services.  To do so, you can create a new {@link SimpleIoProcessorPool}
44   * instance by yourself and provide the pool as a constructor parameter when you
45   * create the services.
46   * <p>
47   * This pool uses Java reflection API to create multiple {@link IoProcessor} instances.
48   * It tries to instantiate the processor in the following order:
49   * <ol>
50   * <li>A public constructor with one {@link ExecutorService} parameter.</li>
51   * <li>A public constructor with one {@link Executor} parameter.</li>
52   * <li>A public default constructor</li>
53   * </ol>
54   * The following is an example for the NIO socket transport:
55   * <pre><code>
56   * // Create a shared pool.
57   * SimpleIoProcessorPool&lt;NioSession&gt; pool = 
58   *         new SimpleIoProcessorPool&lt;NioSession&gt;(NioProcessor.class, 16);
59   * 
60   * // Create two services that share the same pool.
61   * SocketAcceptor acceptor = new NioSocketAcceptor(pool);
62   * SocketConnector connector = new NioSocketConnector(pool);
63   * 
64   * ...
65   * 
66   * // Release related resources.
67   * connector.dispose();
68   * acceptor.dispose();
69   * pool.dispose();
70   * </code></pre>
71   * 
72   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
73   * 
74   * @param <T> the type of the {@link IoSession} to be managed by the specified
75   *            {@link IoProcessor}.
76   */
77  public class SimpleIoProcessorPool<T extends AbstractIoSession> implements
78          IoProcessor<T> {
79  
80      private static final int DEFAULT_SIZE = Runtime.getRuntime()
81              .availableProcessors() + 1;
82  
83      private static final AttributeKey PROCESSOR = new AttributeKey(
84              SimpleIoProcessorPool.class, "processor");
85  
86      private final static Logger LOGGER = LoggerFactory
87              .getLogger(SimpleIoProcessorPool.class);
88  
89      private final IoProcessor<T>[] pool;
90  
91      private final AtomicInteger processorDistributor = new AtomicInteger();
92  
93      private final Executor executor;
94  
95      private final boolean createdExecutor;
96  
97      private final Object disposalLock = new Object();
98  
99      private volatile boolean disposing;
100 
101     private volatile boolean disposed;
102 
103     public SimpleIoProcessorPool(Class<? extends IoProcessor<T>> processorType) {
104         this(processorType, null, DEFAULT_SIZE);
105     }
106 
107     public SimpleIoProcessorPool(Class<? extends IoProcessor<T>> processorType,
108             int size) {
109         this(processorType, null, size);
110     }
111 
112     public SimpleIoProcessorPool(Class<? extends IoProcessor<T>> processorType,
113             Executor executor) {
114         this(processorType, executor, DEFAULT_SIZE);
115     }
116 
117     @SuppressWarnings("unchecked")
118     public SimpleIoProcessorPool(Class<? extends IoProcessor<T>> processorType,
119             Executor executor, int size) {
120         if (processorType == null) {
121             throw new NullPointerException("processorType");
122         }
123         if (size <= 0) {
124             throw new IllegalArgumentException("size: " + size
125                     + " (expected: positive integer)");
126         }
127 
128         if (executor == null) {
129             this.executor = executor = Executors.newCachedThreadPool();
130             this.createdExecutor = true;
131         } else {
132             this.executor = executor;
133             this.createdExecutor = false;
134         }
135 
136         pool = new IoProcessor[size];
137 
138         boolean success = false;
139         Constructor<? extends IoProcessor<T>> processorConstructor = null;
140         boolean usesExecutorArg = true;
141 
142         try {
143             // We create at least one processor
144             try {
145                 try {
146                     processorConstructor = processorType
147                             .getConstructor(ExecutorService.class);
148                     pool[0] = processorConstructor.newInstance(executor);
149                 } catch (NoSuchMethodException e) {
150                     // To the next step...
151                 }
152 
153                 try {
154                     processorConstructor = processorType
155                             .getConstructor(Executor.class);
156                     pool[0] = processorConstructor.newInstance(executor);
157                 } catch (NoSuchMethodException e) {
158                     // To the next step...
159                 }
160 
161                 try {
162                     processorConstructor = processorType.getConstructor();
163                     usesExecutorArg = false;
164                     pool[0] = processorConstructor.newInstance();
165                 } catch (NoSuchMethodException e) {
166                     // To the next step...
167                 }
168             } catch (RuntimeException e) {
169                 throw e;
170             } catch (Exception e) {
171                 throw new RuntimeIoException(
172                         "Failed to create a new instance of "
173                                 + processorType.getName(), e);
174             }
175 
176             if (processorConstructor == null) {
177                 // Raise an exception if no proper constructor is found.
178                 throw new IllegalArgumentException(String
179                         .valueOf(processorType)
180                         + " must have a public constructor "
181                         + "with one "
182                         + ExecutorService.class.getSimpleName()
183                         + " parameter, "
184                         + "a public constructor with one "
185                         + Executor.class.getSimpleName()
186                         + " parameter or a public default constructor.");
187             }
188 
189             // Constructor found now use it for all subsequent instantiations
190             for (int i = 1; i < pool.length; i++) {
191                 try {
192                     if (usesExecutorArg) {
193                         pool[i] = processorConstructor.newInstance(executor);
194                     } else {
195                         pool[i] = processorConstructor.newInstance();
196                     }
197                 } catch (Exception e) {
198                     // Won't happen because it has been done previously
199                 }
200             }
201             success = true;
202         } finally {
203             if (!success) {
204                 dispose();
205             }
206         }
207     }
208 
209     public final void add(T session) {
210         getProcessor(session).add(session);
211     }
212 
213     public final void flush(T session) {
214         getProcessor(session).flush(session);
215     }
216 
217     public final void remove(T session) {
218         getProcessor(session).remove(session);
219     }
220 
221     public final void updateTrafficControl(T session) {
222         getProcessor(session).updateTrafficControl(session);
223     }
224 
225     public boolean isDisposed() {
226         return disposed;
227     }
228 
229     public boolean isDisposing() {
230         return disposing;
231     }
232 
233     public final void dispose() {
234         if (disposed) {
235             return;
236         }
237 
238         synchronized (disposalLock) {
239             if (!disposing) {
240                 disposing = true;
241                 for (int i = pool.length - 1; i >= 0; i--) {
242                     if (pool[i] == null || pool[i].isDisposing()) {
243                         continue;
244                     }
245 
246                     try {
247                         pool[i].dispose();
248                     } catch (Exception e) {
249                         LOGGER.warn("Failed to dispose a "
250                                 + pool[i].getClass().getSimpleName()
251                                 + " at index " + i + ".", e);
252                     } finally {
253                         pool[i] = null;
254                     }
255                 }
256 
257                 if (createdExecutor) {
258                     ((ExecutorService) executor).shutdown();
259                 }
260             }
261         }
262 
263         disposed = true;
264     }
265 
266     @SuppressWarnings("unchecked")
267     private IoProcessor<T> getProcessor(T session) {
268         IoProcessor<T> p = (IoProcessor<T>) session.getAttribute(PROCESSOR);
269         if (p == null) {
270             p = nextProcessor();
271             IoProcessor<T> oldp = (IoProcessor<T>) session
272                     .setAttributeIfAbsent(PROCESSOR, p);
273             if (oldp != null) {
274                 p = oldp;
275             }
276         }
277 
278         return p;
279     }
280 
281     private IoProcessor<T> nextProcessor() {
282         if (disposed) {
283             throw new IllegalStateException(
284                     "A disposed processor cannot be accessed.");
285         }
286         return pool[Math.abs(processorDistributor.getAndIncrement())
287                 % pool.length];
288     }
289 }