1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.transport.socket.nio;
21
22 import java.net.InetSocketAddress;
23 import java.net.SocketAddress;
24 import java.nio.channels.SelectionKey;
25 import java.nio.channels.Selector;
26 import java.nio.channels.SocketChannel;
27 import java.util.Collection;
28 import java.util.Iterator;
29 import java.util.concurrent.Executor;
30
31 import org.apache.mina.core.polling.AbstractPollingIoConnector;
32 import org.apache.mina.core.service.IoConnector;
33 import org.apache.mina.core.service.IoProcessor;
34 import org.apache.mina.core.service.IoService;
35 import org.apache.mina.core.service.SimpleIoProcessorPool;
36 import org.apache.mina.core.service.TransportMetadata;
37 import org.apache.mina.transport.socket.DefaultSocketSessionConfig;
38 import org.apache.mina.transport.socket.SocketConnector;
39 import org.apache.mina.transport.socket.SocketSessionConfig;
40
41
42
43
44
45
46 public final class NioSocketConnector extends AbstractPollingIoConnector<NioSession, SocketChannel> implements
47 SocketConnector {
48
49 private volatile Selector selector;
50
51
52
53
54 public NioSocketConnector() {
55 super(new DefaultSocketSessionConfig(), NioProcessor.class);
56 ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
57 }
58
59
60
61
62
63
64
65 public NioSocketConnector(int processorCount) {
66 super(new DefaultSocketSessionConfig(), NioProcessor.class, processorCount);
67 ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
68 }
69
70
71
72
73
74
75
76 public NioSocketConnector(IoProcessor<NioSession> processor) {
77 super(new DefaultSocketSessionConfig(), processor);
78 ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
79 }
80
81
82
83
84
85
86
87
88 public NioSocketConnector(Executor executor, IoProcessor<NioSession> processor) {
89 super(new DefaultSocketSessionConfig(), executor, processor);
90 ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
91 }
92
93
94
95
96
97
98
99
100
101
102
103
104 public NioSocketConnector(Class<? extends IoProcessor<NioSession>> processorClass, int processorCount) {
105 super(new DefaultSocketSessionConfig(), processorClass, processorCount);
106 }
107
108
109
110
111
112
113
114
115
116
117
118
119
120 public NioSocketConnector(Class<? extends IoProcessor<NioSession>> processorClass) {
121 super(new DefaultSocketSessionConfig(), processorClass);
122 }
123
124
125
126
127 @Override
128 protected void init() throws Exception {
129 this.selector = Selector.open();
130 }
131
132
133
134
135 @Override
136 protected void destroy() throws Exception {
137 if (selector != null) {
138 selector.close();
139 }
140 }
141
142
143
144
145 public TransportMetadata getTransportMetadata() {
146 return NioSocketSession.METADATA;
147 }
148
149
150
151
152 public SocketSessionConfig getSessionConfig() {
153 return (SocketSessionConfig) sessionConfig;
154 }
155
156
157
158
159 @Override
160 public InetSocketAddress getDefaultRemoteAddress() {
161 return (InetSocketAddress) super.getDefaultRemoteAddress();
162 }
163
164
165
166
167 public void setDefaultRemoteAddress(InetSocketAddress defaultRemoteAddress) {
168 super.setDefaultRemoteAddress(defaultRemoteAddress);
169 }
170
171
172
173
174 @Override
175 protected Iterator<SocketChannel> allHandles() {
176 return new SocketChannelIterator(selector.keys());
177 }
178
179
180
181
182 @Override
183 protected boolean connect(SocketChannel handle, SocketAddress remoteAddress) throws Exception {
184 return handle.connect(remoteAddress);
185 }
186
187
188
189
190 @Override
191 protected ConnectionRequest getConnectionRequest(SocketChannel handle) {
192 SelectionKey key = handle.keyFor(selector);
193
194 if ((key == null) || (!key.isValid())) {
195 return null;
196 }
197
198 return (ConnectionRequest) key.attachment();
199 }
200
201
202
203
204 @Override
205 protected void close(SocketChannel handle) throws Exception {
206 SelectionKey key = handle.keyFor(selector);
207
208 if (key != null) {
209 key.cancel();
210 }
211
212 handle.close();
213 }
214
215
216
217
218 @Override
219 protected boolean finishConnect(SocketChannel handle) throws Exception {
220 if (handle.finishConnect()) {
221 SelectionKey key = handle.keyFor(selector);
222
223 if (key != null) {
224 key.cancel();
225 }
226
227 return true;
228 }
229
230 return false;
231 }
232
233
234
235
236 @Override
237 protected SocketChannel newHandle(SocketAddress localAddress) throws Exception {
238 SocketChannel ch = SocketChannel.open();
239
240 int receiveBufferSize = (getSessionConfig()).getReceiveBufferSize();
241 if (receiveBufferSize > 65535) {
242 ch.socket().setReceiveBufferSize(receiveBufferSize);
243 }
244
245 if (localAddress != null) {
246 ch.socket().bind(localAddress);
247 }
248 ch.configureBlocking(false);
249 return ch;
250 }
251
252
253
254
255 @Override
256 protected NioSession newSession(IoProcessor<NioSession> processor, SocketChannel handle) {
257 return new NioSocketSession(this, processor, handle);
258 }
259
260
261
262
263 @Override
264 protected void register(SocketChannel handle, ConnectionRequest request) throws Exception {
265 handle.register(selector, SelectionKey.OP_CONNECT, request);
266 }
267
268
269
270
271 @Override
272 protected int select(int timeout) throws Exception {
273 return selector.select(timeout);
274 }
275
276
277
278
279 @Override
280 protected Iterator<SocketChannel> selectedHandles() {
281 return new SocketChannelIterator(selector.selectedKeys());
282 }
283
284
285
286
287 @Override
288 protected void wakeup() {
289 selector.wakeup();
290 }
291
292 private static class SocketChannelIterator implements Iterator<SocketChannel> {
293
294 private final Iterator<SelectionKey> i;
295
296 private SocketChannelIterator(Collection<SelectionKey> selectedKeys) {
297 this.i = selectedKeys.iterator();
298 }
299
300
301
302
303 public boolean hasNext() {
304 return i.hasNext();
305 }
306
307
308
309
310 public SocketChannel next() {
311 SelectionKey key = i.next();
312 return (SocketChannel) key.channel();
313 }
314
315
316
317
318 public void remove() {
319 i.remove();
320 }
321 }
322 }