View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.transport.socket.nio;
21  
22  import java.net.InetSocketAddress;
23  import java.net.ServerSocket;
24  import java.net.SocketAddress;
25  import java.nio.channels.SelectionKey;
26  import java.nio.channels.Selector;
27  import java.nio.channels.ServerSocketChannel;
28  import java.nio.channels.SocketChannel;
29  import java.util.Collection;
30  import java.util.Iterator;
31  import java.util.concurrent.Executor;
32  
33  import org.apache.mina.core.polling.AbstractPollingIoAcceptor;
34  import org.apache.mina.core.service.IoAcceptor;
35  import org.apache.mina.core.service.IoProcessor;
36  import org.apache.mina.core.service.SimpleIoProcessorPool;
37  import org.apache.mina.core.service.TransportMetadata;
38  import org.apache.mina.transport.socket.DefaultSocketSessionConfig;
39  import org.apache.mina.transport.socket.SocketAcceptor;
40  import org.apache.mina.transport.socket.SocketSessionConfig;
41  
42  /**
43   * {@link IoAcceptor} for socket transport (TCP/IP).  This class
44   * handles incoming TCP/IP based socket connections.
45   *
46   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
47   */
48  public final class NioSocketAcceptor
49          extends AbstractPollingIoAcceptor<NioSession, ServerSocketChannel>
50          implements SocketAcceptor {
51  
52      /** 
53       * Define the number of socket that can wait to be accepted. Default
54       * to 50 (as in the SocketServer default).
55       */
56      private int backlog = 50;
57  
58      private boolean reuseAddress = false;
59  
60      private volatile Selector selector;
61  
62      /**
63       * Constructor for {@link NioSocketAcceptor} using default parameters (multiple thread model).
64       */
65      public NioSocketAcceptor() {
66          super(new DefaultSocketSessionConfig(), NioProcessor.class);
67          ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
68      }
69  
70      /**
71       * Constructor for {@link NioSocketAcceptor} using default parameters, and 
72       * given number of {@link NioProcessor} for multithreading I/O operations.
73       * 
74       * @param processorCount the number of processor to create and place in a
75       * {@link SimpleIoProcessorPool} 
76       */
77      public NioSocketAcceptor(int processorCount) {
78          super(new DefaultSocketSessionConfig(), NioProcessor.class, processorCount);
79          ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
80      }
81  
82      /**
83      *  Constructor for {@link NioSocketAcceptor} with default configuration but a
84       *  specific {@link IoProcessor}, useful for sharing the same processor over multiple
85       *  {@link IoService} of the same type.
86       * @param processor the processor to use for managing I/O events
87       */
88      public NioSocketAcceptor(IoProcessor<NioSession> processor) {
89          super(new DefaultSocketSessionConfig(), processor);
90          ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
91      }
92  
93      /**
94       *  Constructor for {@link NioSocketAcceptor} with a given {@link Executor} for handling 
95       *  connection events and a given {@link IoProcessor} for handling I/O events, useful for 
96       *  sharing the same processor and executor over multiple {@link IoService} of the same type.
97       * @param executor the executor for connection
98       * @param processor the processor for I/O operations
99       */
100     public NioSocketAcceptor(Executor executor, IoProcessor<NioSession> processor) {
101         super(new DefaultSocketSessionConfig(), executor, processor);
102         ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
103     }
104 
105     /**
106      * {@inheritDoc}
107      */
108     @Override
109     protected void init() throws Exception {
110         selector = Selector.open();
111     }
112     
113     /**
114      * {@inheritDoc}
115      */
116     @Override
117     protected void destroy() throws Exception {
118         if (selector != null) {
119             selector.close();
120         }
121     }
122 
123     /**
124      * {@inheritDoc}
125      */
126     public TransportMetadata getTransportMetadata() {
127         return NioSocketSession.METADATA;
128     }
129 
130     /**
131      * {@inheritDoc}
132      */
133     @Override
134     public SocketSessionConfig getSessionConfig() {
135         return (SocketSessionConfig) super.getSessionConfig();
136     }
137 
138     /**
139      * {@inheritDoc}
140      */
141     @Override
142     public InetSocketAddress getLocalAddress() {
143         return (InetSocketAddress) super.getLocalAddress();
144     }
145 
146     /**
147      * {@inheritDoc}
148      */
149     @Override
150     public InetSocketAddress getDefaultLocalAddress() {
151         return (InetSocketAddress) super.getDefaultLocalAddress();
152     }
153 
154     /**
155      * {@inheritDoc}
156      */
157     public void setDefaultLocalAddress(InetSocketAddress localAddress) {
158         setDefaultLocalAddress((SocketAddress) localAddress);
159     }
160 
161     /**
162      * {@inheritDoc}
163      */
164     public boolean isReuseAddress() {
165         return reuseAddress;
166     }
167 
168     /**
169      * {@inheritDoc}
170      */
171     public void setReuseAddress(boolean reuseAddress) {
172         synchronized (bindLock) {
173             if (isActive()) {
174                 throw new IllegalStateException(
175                         "reuseAddress can't be set while the acceptor is bound.");
176             }
177 
178             this.reuseAddress = reuseAddress;
179         }
180     }
181 
182     /**
183      * {@inheritDoc}
184      */
185     public int getBacklog() {
186         return backlog;
187     }
188 
189     /**
190      * {@inheritDoc}
191      */
192     public void setBacklog(int backlog) {
193         synchronized (bindLock) {
194             if (isActive()) {
195                 throw new IllegalStateException(
196                         "backlog can't be set while the acceptor is bound.");
197             }
198 
199             this.backlog = backlog;
200         }
201     }
202 
203     /**
204      * {@inheritDoc}
205      */
206     @Override
207     protected NioSession accept(IoProcessor<NioSession> processor,
208             ServerSocketChannel handle) throws Exception {
209 
210         SelectionKey key = handle.keyFor(selector);
211         
212         if ((key == null) || (!key.isValid()) || (!key.isAcceptable()) ) {
213             return null;
214         }
215 
216         // accept the connection from the client
217         SocketChannel ch = handle.accept();
218         
219         if (ch == null) {
220             return null;
221         }
222 
223         return new NioSocketSession(this, processor, ch);
224     }
225 
226     /**
227      * {@inheritDoc}
228      */
229     @Override
230     protected ServerSocketChannel open(SocketAddress localAddress)
231             throws Exception {
232         // Creates the listening ServerSocket
233         ServerSocketChannel channel = ServerSocketChannel.open();
234         
235         boolean success = false;
236         
237         try {
238             // This is a non blocking socket channel
239             channel.configureBlocking(false);
240         
241             // Configure the server socket,
242             ServerSocket socket = channel.socket();
243             
244             // Set the reuseAddress flag accordingly with the setting
245             socket.setReuseAddress(isReuseAddress());
246             
247             // XXX: Do we need to provide this property? (I think we need to remove it.)
248             socket.setReceiveBufferSize(getSessionConfig().getReceiveBufferSize());
249             
250             // and bind.
251             socket.bind(localAddress, getBacklog());
252             
253             // Register the channel within the selector for ACCEPT event
254             channel.register(selector, SelectionKey.OP_ACCEPT);
255             success = true;
256         } finally {
257             if (!success) {
258                 close(channel);
259             }
260         }
261         return channel;
262     }
263 
264     /**
265      * {@inheritDoc}
266      */
267     @Override
268     protected SocketAddress localAddress(ServerSocketChannel handle)
269             throws Exception {
270         return handle.socket().getLocalSocketAddress();
271     }
272 
273     /**
274       * Check if we have at least one key whose corresponding channels is 
275       * ready for I/O operations.
276       *
277       * This method performs a blocking selection operation. 
278       * It returns only after at least one channel is selected, 
279       * this selector's wakeup method is invoked, or the current thread 
280       * is interrupted, whichever comes first.
281       * 
282       * @return The number of keys having their ready-operation set updated
283       * @throws IOException If an I/O error occurs
284       * @throws ClosedSelectorException If this selector is closed 
285       */
286     @Override
287     protected int select() throws Exception {
288         return selector.select();
289     }
290 
291     /**
292      * {@inheritDoc}
293      */
294     @Override
295     protected Iterator<ServerSocketChannel> selectedHandles() {
296         return new ServerSocketChannelIterator(selector.selectedKeys());
297     }
298 
299     /**
300      * {@inheritDoc}
301      */
302     @Override
303     protected void close(ServerSocketChannel handle) throws Exception {
304         SelectionKey key = handle.keyFor(selector);
305         
306         if (key != null) {
307             key.cancel();
308         }
309         
310         handle.close();
311     }
312 
313     /**
314      * {@inheritDoc}
315      */
316     @Override
317     protected void wakeup() {
318         selector.wakeup();
319     }
320 
321     /**
322      * Defines an iterator for the selected-key Set returned by the 
323      * selector.selectedKeys(). It replaces the SelectionKey operator.
324      */
325     private static class ServerSocketChannelIterator implements Iterator<ServerSocketChannel> {
326         /** The selected-key iterator */
327         private final Iterator<SelectionKey> iterator;
328 
329         /**
330          * Build a SocketChannel iterator which will return a SocketChannel instead of
331          * a SelectionKey.
332          * 
333          * @param selectedKeys The selector selected-key set 
334          */
335         private ServerSocketChannelIterator(Collection<SelectionKey> selectedKeys) {
336             iterator = selectedKeys.iterator();
337         }
338 
339         /**
340          * Tells if there are more SockectChannel left in the iterator
341          * @return <code>true</code> if there is at least one more 
342          * SockectChannel object to read
343          */
344         public boolean hasNext() {
345             return iterator.hasNext();
346         }
347 
348         /**
349          * Get the next SocketChannel in the operator we have built from
350          * the selected-key et for this selector.
351          * 
352          * @return The next SocketChannel in the iterator
353          */
354         public ServerSocketChannel next() {
355             SelectionKey key = iterator.next();
356             
357             if ( key.isValid() && key.isAcceptable() ) {
358                 return (ServerSocketChannel) key.channel();
359             }
360 
361             return null;
362         }
363 
364         /**
365          * Remove the current SocketChannel from the iterator 
366          */
367         public void remove() {
368             iterator.remove();
369         }
370     }
371 }