View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.transport.socket.nio;
21  
22  import java.net.InetSocketAddress;
23  import java.net.SocketAddress;
24  import java.nio.channels.SelectionKey;
25  import java.nio.channels.Selector;
26  import java.nio.channels.SocketChannel;
27  import java.util.Collection;
28  import java.util.Iterator;
29  import java.util.concurrent.Executor;
30  
31  import org.apache.mina.core.polling.AbstractPollingIoConnector;
32  import org.apache.mina.core.service.IoConnector;
33  import org.apache.mina.core.service.IoProcessor;
34  import org.apache.mina.core.service.IoService;
35  import org.apache.mina.core.service.SimpleIoProcessorPool;
36  import org.apache.mina.core.service.TransportMetadata;
37  import org.apache.mina.transport.socket.DefaultSocketSessionConfig;
38  import org.apache.mina.transport.socket.SocketConnector;
39  import org.apache.mina.transport.socket.SocketSessionConfig;
40  
41  /**
42   * {@link IoConnector} for socket transport (TCP/IP).
43   *
44   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
45   */
46  public final class NioSocketConnector extends AbstractPollingIoConnector<NioSession, SocketChannel> implements
47          SocketConnector {
48  
49      private volatile Selector selector;
50  
51      /**
52       * Constructor for {@link NioSocketConnector} with default configuration (multiple thread model).
53       */
54      public NioSocketConnector() {
55          super(new DefaultSocketSessionConfig(), NioProcessor.class);
56          ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
57      }
58  
59      /**
60       * Constructor for {@link NioSocketConnector} with default configuration, and 
61       * given number of {@link NioProcessor} for multithreading I/O operations
62       * @param processorCount the number of processor to create and place in a
63       * {@link SimpleIoProcessorPool} 
64       */
65      public NioSocketConnector(int processorCount) {
66          super(new DefaultSocketSessionConfig(), NioProcessor.class, processorCount);
67          ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
68      }
69  
70      /**
71       *  Constructor for {@link NioSocketConnector} with default configuration but a
72       *  specific {@link IoProcessor}, useful for sharing the same processor over multiple
73       *  {@link IoService} of the same type.
74       * @param processor the processor to use for managing I/O events
75       */
76      public NioSocketConnector(IoProcessor<NioSession> processor) {
77          super(new DefaultSocketSessionConfig(), processor);
78          ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
79      }
80  
81      /**
82       *  Constructor for {@link NioSocketConnector} with a given {@link Executor} for handling 
83       *  connection events and a given {@link IoProcessor} for handling I/O events, useful for sharing 
84       *  the same processor and executor over multiple {@link IoService} of the same type.
85       * @param executor the executor for connection
86       * @param processor the processor for I/O operations
87       */
88      public NioSocketConnector(Executor executor, IoProcessor<NioSession> processor) {
89          super(new DefaultSocketSessionConfig(), executor, processor);
90          ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
91      }
92  
93      /**
94       * Constructor for {@link NioSocketConnector} with default configuration which will use a built-in 
95       * thread pool executor to manage the given number of processor instances. The processor class must have 
96       * a constructor that accepts ExecutorService or Executor as its single argument, or, failing that, a 
97       * no-arg constructor.
98       * 
99       * @param processorClass the processor class.
100      * @param processorCount the number of processors to instantiate.
101      * @see org.apache.mina.core.service.SimpleIoProcessorPool#SimpleIoProcessorPool(Class, Executor, int)
102      * @since 2.0.0-M4
103      */
104     public NioSocketConnector(Class<? extends IoProcessor<NioSession>> processorClass, int processorCount) {
105         super(new DefaultSocketSessionConfig(), processorClass, processorCount);
106     }
107 
108     /**
109      * Constructor for {@link NioSocketConnector} with default configuration with default configuration which will use a built-in 
110      * thread pool executor to manage the default number of processor instances. The processor class must have 
111      * a constructor that accepts ExecutorService or Executor as its single argument, or, failing that, a 
112      * no-arg constructor. The default number of instances is equal to the number of processor cores 
113      * in the system, plus one.
114      * 
115      * @param processorClass the processor class.
116      * @see org.apache.mina.core.service.SimpleIoProcessorPool#SimpleIoProcessorPool(Class, Executor, int)
117      * @see org.apache.mina.core.service.SimpleIoProcessorPool#DEFAULT_SIZE
118      * @since 2.0.0-M4
119      */
120     public NioSocketConnector(Class<? extends IoProcessor<NioSession>> processorClass) {
121         super(new DefaultSocketSessionConfig(), processorClass);
122     }
123 
124     /**
125      * {@inheritDoc}
126      */
127     @Override
128     protected void init() throws Exception {
129         this.selector = Selector.open();
130     }
131 
132     /**
133      * {@inheritDoc}
134      */
135     @Override
136     protected void destroy() throws Exception {
137         if (selector != null) {
138             selector.close();
139         }
140     }
141 
142     /**
143      * {@inheritDoc}
144      */
145     public TransportMetadata getTransportMetadata() {
146         return NioSocketSession.METADATA;
147     }
148 
149     /**
150      * {@inheritDoc}
151      */
152     public SocketSessionConfig getSessionConfig() {
153         return (SocketSessionConfig) sessionConfig;
154     }
155 
156     /**
157      * {@inheritDoc}
158      */
159     @Override
160     public InetSocketAddress getDefaultRemoteAddress() {
161         return (InetSocketAddress) super.getDefaultRemoteAddress();
162     }
163 
164     /**
165      * {@inheritDoc}
166      */
167     public void setDefaultRemoteAddress(InetSocketAddress defaultRemoteAddress) {
168         super.setDefaultRemoteAddress(defaultRemoteAddress);
169     }
170 
171     /**
172      * {@inheritDoc}
173      */
174     @Override
175     protected Iterator<SocketChannel> allHandles() {
176         return new SocketChannelIterator(selector.keys());
177     }
178 
179     /**
180      * {@inheritDoc}
181      */
182     @Override
183     protected boolean connect(SocketChannel handle, SocketAddress remoteAddress) throws Exception {
184         return handle.connect(remoteAddress);
185     }
186 
187     /**
188      * {@inheritDoc}
189      */
190     @Override
191     protected ConnectionRequest getConnectionRequest(SocketChannel handle) {
192         SelectionKey key = handle.keyFor(selector);
193 
194         if ((key == null) || (!key.isValid())) {
195             return null;
196         }
197 
198         return (ConnectionRequest) key.attachment();
199     }
200 
201     /**
202      * {@inheritDoc}
203      */
204     @Override
205     protected void close(SocketChannel handle) throws Exception {
206         SelectionKey key = handle.keyFor(selector);
207 
208         if (key != null) {
209             key.cancel();
210         }
211 
212         handle.close();
213     }
214 
215     /**
216      * {@inheritDoc}
217      */
218     @Override
219     protected boolean finishConnect(SocketChannel handle) throws Exception {
220         if (handle.finishConnect()) {
221             SelectionKey key = handle.keyFor(selector);
222 
223             if (key != null) {
224                 key.cancel();
225             }
226 
227             return true;
228         }
229 
230         return false;
231     }
232 
233     /**
234      * {@inheritDoc}
235      */
236     @Override
237     protected SocketChannel newHandle(SocketAddress localAddress) throws Exception {
238         SocketChannel ch = SocketChannel.open();
239 
240         int receiveBufferSize = (getSessionConfig()).getReceiveBufferSize();
241         if (receiveBufferSize > 65535) {
242             ch.socket().setReceiveBufferSize(receiveBufferSize);
243         }
244 
245         if (localAddress != null) {
246             ch.socket().bind(localAddress);
247         }
248         ch.configureBlocking(false);
249         return ch;
250     }
251 
252     /**
253      * {@inheritDoc}
254      */
255     @Override
256     protected NioSession newSession(IoProcessor<NioSession> processor, SocketChannel handle) {
257         return new NioSocketSession(this, processor, handle);
258     }
259 
260     /**
261      * {@inheritDoc}
262      */
263     @Override
264     protected void register(SocketChannel handle, ConnectionRequest request) throws Exception {
265         handle.register(selector, SelectionKey.OP_CONNECT, request);
266     }
267 
268     /**
269      * {@inheritDoc}
270      */
271     @Override
272     protected int select(int timeout) throws Exception {
273         return selector.select(timeout);
274     }
275 
276     /**
277      * {@inheritDoc}
278      */
279     @Override
280     protected Iterator<SocketChannel> selectedHandles() {
281         return new SocketChannelIterator(selector.selectedKeys());
282     }
283 
284     /**
285      * {@inheritDoc}
286      */
287     @Override
288     protected void wakeup() {
289         selector.wakeup();
290     }
291 
292     private static class SocketChannelIterator implements Iterator<SocketChannel> {
293 
294         private final Iterator<SelectionKey> i;
295 
296         private SocketChannelIterator(Collection<SelectionKey> selectedKeys) {
297             this.i = selectedKeys.iterator();
298         }
299 
300         /**
301          * {@inheritDoc}
302          */
303         public boolean hasNext() {
304             return i.hasNext();
305         }
306 
307         /**
308          * {@inheritDoc}
309          */
310         public SocketChannel next() {
311             SelectionKey key = i.next();
312             return (SocketChannel) key.channel();
313         }
314 
315         /**
316          * {@inheritDoc}
317          */
318         public void remove() {
319             i.remove();
320         }
321     }
322 }