View Javadoc
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.transport.socket.nio;
21  
22  import java.io.IOException;
23  import java.net.InetSocketAddress;
24  import java.net.ServerSocket;
25  import java.net.SocketAddress;
26  import java.nio.channels.SelectionKey;
27  import java.nio.channels.Selector;
28  import java.nio.channels.ServerSocketChannel;
29  import java.nio.channels.SocketChannel;
30  import java.nio.channels.spi.SelectorProvider;
31  import java.util.Collection;
32  import java.util.Iterator;
33  import java.util.concurrent.Executor;
34  
35  import org.apache.mina.core.polling.AbstractPollingIoAcceptor;
36  import org.apache.mina.core.service.IoAcceptor;
37  import org.apache.mina.core.service.IoProcessor;
38  import org.apache.mina.core.service.IoService;
39  import org.apache.mina.core.service.SimpleIoProcessorPool;
40  import org.apache.mina.core.service.TransportMetadata;
41  import org.apache.mina.transport.socket.DefaultSocketSessionConfig;
42  import org.apache.mina.transport.socket.SocketAcceptor;
43  
44  /**
45   * {@link IoAcceptor} for socket transport (TCP/IP).  This class
46   * handles incoming TCP/IP based socket connections.
47   *
48   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
49   */
50  public final class NioSocketAcceptor extends AbstractPollingIoAcceptor<NioSession, ServerSocketChannel>
51  implements SocketAcceptor {
52  
53      private volatile Selector selector;
54      private volatile SelectorProvider selectorProvider = null;
55  
56      /**
57       * Constructor for {@link NioSocketAcceptor} using default parameters (multiple thread model).
58       */
59      public NioSocketAcceptor() {
60          super(new DefaultSocketSessionConfig(), NioProcessor.class);
61          ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
62      }
63  
64      /**
65       * Constructor for {@link NioSocketAcceptor} using default parameters, and
66       * given number of {@link NioProcessor} for multithreading I/O operations.
67       * 
68       * @param processorCount the number of processor to create and place in a
69       * {@link SimpleIoProcessorPool}
70       */
71      public NioSocketAcceptor(int processorCount) {
72          super(new DefaultSocketSessionConfig(), NioProcessor.class, processorCount);
73          ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
74      }
75  
76      /**
77       *  Constructor for {@link NioSocketAcceptor} with default configuration but a
78       *  specific {@link IoProcessor}, useful for sharing the same processor over multiple
79       *  {@link IoService} of the same type.
80       * @param processor the processor to use for managing I/O events
81       */
82      public NioSocketAcceptor(IoProcessor<NioSession> processor) {
83          super(new DefaultSocketSessionConfig(), processor);
84          ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
85      }
86  
87      /**
88       *  Constructor for {@link NioSocketAcceptor} with a given {@link Executor} for handling
89       *  connection events and a given {@link IoProcessor} for handling I/O events, useful for
90       *  sharing the same processor and executor over multiple {@link IoService} of the same type.
91       * @param executor the executor for connection
92       * @param processor the processor for I/O operations
93       */
94      public NioSocketAcceptor(Executor executor, IoProcessor<NioSession> processor) {
95          super(new DefaultSocketSessionConfig(), executor, processor);
96          ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
97      }
98  
99      /**
100      * Constructor for {@link NioSocketAcceptor} using default parameters, and
101      * given number of {@link NioProcessor} for multithreading I/O operations, and
102      * a custom SelectorProvider for NIO
103      *
104      * @param processorCount the number of processor to create and place in a
105      * @param selectorProvider teh SelectorProvider to use
106      * {@link SimpleIoProcessorPool}
107      */
108     public NioSocketAcceptor(int processorCount, SelectorProvider selectorProvider) {
109         super(new DefaultSocketSessionConfig(), NioProcessor.class, processorCount, selectorProvider);
110         ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
111         this.selectorProvider = selectorProvider;
112     }
113 
114     /**
115      * {@inheritDoc}
116      */
117     @Override
118     protected void init() throws Exception {
119         selector = Selector.open();
120     }
121 
122     /**
123      * {@inheritDoc}
124      */
125     @Override
126     protected void init(SelectorProvider selectorProvider) throws Exception {
127         this.selectorProvider = selectorProvider;
128 
129         if (selectorProvider == null) {
130             selector = Selector.open();
131         } else {
132             selector = selectorProvider.openSelector();
133         }
134     }
135 
136     /**
137      * {@inheritDoc}
138      */
139     @Override
140     protected void destroy() throws Exception {
141         if (selector != null) {
142             selector.close();
143         }
144     }
145 
146     /**
147      * {@inheritDoc}
148      */
149     public TransportMetadata getTransportMetadata() {
150         return NioSocketSession.METADATA;
151     }
152 
153     /**
154      * {@inheritDoc}
155      */
156     @Override
157     public InetSocketAddress getLocalAddress() {
158         return (InetSocketAddress) super.getLocalAddress();
159     }
160 
161     /**
162      * {@inheritDoc}
163      */
164     @Override
165     public InetSocketAddress getDefaultLocalAddress() {
166         return (InetSocketAddress) super.getDefaultLocalAddress();
167     }
168 
169     /**
170      * {@inheritDoc}
171      */
172     public void setDefaultLocalAddress(InetSocketAddress localAddress) {
173         setDefaultLocalAddress((SocketAddress) localAddress);
174     }
175 
176     /**
177      * {@inheritDoc}
178      */
179     @Override
180     protected NioSession accept(IoProcessor<NioSession> processor, ServerSocketChannel handle) throws Exception {
181 
182         SelectionKey key = null;
183 
184         if (handle != null) {
185             key = handle.keyFor(selector);
186         }
187 
188         if ((key == null) || (!key.isValid()) || (!key.isAcceptable())) {
189             return null;
190         }
191 
192         // accept the connection from the client
193         try {
194             SocketChannel ch = handle.accept();
195     
196             if (ch == null) {
197                 return null;
198             }
199 
200             return new NioSocketSession(this, processor, ch);
201         } catch (Throwable t) {
202             if(t.getMessage().equals("Too many open files")) {
203                 LOGGER.error("Error Calling Accept on Socket - Sleeping Acceptor Thread. Check the ulimit parameter", t);
204                 try {
205                     // Sleep 50 ms, so that the select does not spin like crazy doing nothing but eating CPU
206                     // This is typically what will happen if we don't have any more File handle on the server
207                     // Check the ulimit parameter
208                     // NOTE : this is a workaround, there is no way we can handle this exception in any smarter way...
209                     Thread.sleep(50L);
210                 } catch (InterruptedException ie) {
211                     // Nothing to do
212                 }
213             } else {
214                 throw t;
215             }
216 
217             // No session when we have met an exception
218             return null;
219         }
220     }
221 
222     /**
223      * {@inheritDoc}
224      */
225     @Override
226     protected ServerSocketChannel open(SocketAddress localAddress) throws Exception {
227         // Creates the listening ServerSocket
228 
229         ServerSocketChannel channel = null;
230 
231         if (selectorProvider != null) {
232             channel = selectorProvider.openServerSocketChannel();
233         } else {
234             channel = ServerSocketChannel.open();
235         }
236 
237         boolean success = false;
238 
239         try {
240             // This is a non blocking socket channel
241             channel.configureBlocking(false);
242 
243             // Configure the server socket,
244             ServerSocket socket = channel.socket();
245 
246             // Set the reuseAddress flag accordingly with the setting
247             socket.setReuseAddress(isReuseAddress());
248 
249             // and bind.
250             try {
251                 socket.bind(localAddress, getBacklog());
252             } catch (IOException ioe) {
253                 // Add some info regarding the address we try to bind to the
254                 // message
255                 String newMessage = "Error while binding on " + localAddress + "\n" + "original message : "
256                         + ioe.getMessage();
257                 Exception e = new IOException(newMessage);
258                 e.initCause(ioe.getCause());
259 
260                 // And close the channel
261                 channel.close();
262 
263                 throw e;
264             }
265 
266             // Register the channel within the selector for ACCEPT event
267             channel.register(selector, SelectionKey.OP_ACCEPT);
268             success = true;
269         } finally {
270             if (!success) {
271                 close(channel);
272             }
273         }
274         return channel;
275     }
276 
277     /**
278      * {@inheritDoc}
279      */
280     @Override
281     protected SocketAddress localAddress(ServerSocketChannel handle) throws Exception {
282         return handle.socket().getLocalSocketAddress();
283     }
284 
285     /**
286      * Check if we have at least one key whose corresponding channels is
287      * ready for I/O operations.
288      *
289      * This method performs a blocking selection operation.
290      * It returns only after at least one channel is selected,
291      * this selector's wakeup method is invoked, or the current thread
292      * is interrupted, whichever comes first.
293      * 
294      * @return The number of keys having their ready-operation set updated
295      * @throws IOException If an I/O error occurs
296      */
297     @Override
298     protected int select() throws Exception {
299         return selector.select();
300     }
301 
302     /**
303      * {@inheritDoc}
304      */
305     @Override
306     protected Iterator<ServerSocketChannel> selectedHandles() {
307         return new ServerSocketChannelIterator(selector.selectedKeys());
308     }
309 
310     /**
311      * {@inheritDoc}
312      */
313     @Override
314     protected void close(ServerSocketChannel handle) throws Exception {
315         SelectionKey key = handle.keyFor(selector);
316 
317         if (key != null) {
318             key.cancel();
319         }
320 
321         handle.close();
322     }
323 
324     /**
325      * {@inheritDoc}
326      */
327     @Override
328     protected void wakeup() {
329         selector.wakeup();
330     }
331 
332     /**
333      * Defines an iterator for the selected-key Set returned by the
334      * selector.selectedKeys(). It replaces the SelectionKey operator.
335      */
336     private static class ServerSocketChannelIterator implements Iterator<ServerSocketChannel> {
337         /** The selected-key iterator */
338         private final Iterator<SelectionKey> iterator;
339 
340         /**
341          * Build a SocketChannel iterator which will return a SocketChannel instead of
342          * a SelectionKey.
343          * 
344          * @param selectedKeys The selector selected-key set
345          */
346         private ServerSocketChannelIterator(Collection<SelectionKey> selectedKeys) {
347             iterator = selectedKeys.iterator();
348         }
349 
350         /**
351          * Tells if there are more SockectChannel left in the iterator
352          * @return <tt>true</tt> if there is at least one more
353          * SockectChannel object to read
354          */
355         public boolean hasNext() {
356             return iterator.hasNext();
357         }
358 
359         /**
360          * Get the next SocketChannel in the operator we have built from
361          * the selected-key et for this selector.
362          * 
363          * @return The next SocketChannel in the iterator
364          */
365         public ServerSocketChannel next() {
366             SelectionKey key = iterator.next();
367 
368             if (key.isValid() && key.isAcceptable()) {
369                 return (ServerSocketChannel) key.channel();
370             }
371 
372             return null;
373         }
374 
375         /**
376          * Remove the current SocketChannel from the iterator
377          */
378         public void remove() {
379             iterator.remove();
380         }
381     }
382 }