1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 *
19 */
20 package org.apache.mina.core.polling;
21
22 import java.net.ConnectException;
23 import java.net.SocketAddress;
24 import java.nio.channels.ClosedSelectorException;
25 import java.util.Iterator;
26 import java.util.Queue;
27 import java.util.concurrent.ConcurrentLinkedQueue;
28 import java.util.concurrent.Executor;
29 import java.util.concurrent.Executors;
30 import java.util.concurrent.atomic.AtomicReference;
31
32 import org.apache.mina.core.RuntimeIoException;
33 import org.apache.mina.core.filterchain.IoFilter;
34 import org.apache.mina.core.future.ConnectFuture;
35 import org.apache.mina.core.future.DefaultConnectFuture;
36 import org.apache.mina.core.service.AbstractIoConnector;
37 import org.apache.mina.core.service.IoConnector;
38 import org.apache.mina.core.service.IoHandler;
39 import org.apache.mina.core.service.IoProcessor;
40 import org.apache.mina.core.service.SimpleIoProcessorPool;
41 import org.apache.mina.core.session.AbstractIoSession;
42 import org.apache.mina.core.session.IoSession;
43 import org.apache.mina.core.session.IoSessionConfig;
44 import org.apache.mina.core.session.IoSessionInitializer;
45 import org.apache.mina.transport.socket.nio.NioSocketConnector;
46 import org.apache.mina.util.ExceptionMonitor;
47
48 /**
49 * A base class for implementing client transport using a polling strategy. The
50 * underlying sockets will be checked in an active loop and woke up when an
51 * socket needed to be processed. This class handle the logic behind binding,
52 * connecting and disposing the client sockets. A {@link Executor} will be used
53 * for running client connection, and an {@link AbstractPollingIoProcessor} will
54 * be used for processing connected client I/O operations like reading, writing
55 * and closing.
56 *
57 * All the low level methods for binding, connecting, closing need to be
58 * provided by the subclassing implementation.
59 *
60 * @see NioSocketConnector for a example of implementation
61 *
62 * @author <a href="http://mina.apache.org">Apache MINA Project</a>
63 */
64 public abstract class AbstractPollingIoConnector<T extends AbstractIoSession, H> extends AbstractIoConnector {
65
66 private final Queue<ConnectionRequest> connectQueue = new ConcurrentLinkedQueue<ConnectionRequest>();
67
68 private final Queue<ConnectionRequest> cancelQueue = new ConcurrentLinkedQueue<ConnectionRequest>();
69
70 private final IoProcessor<T> processor;
71
72 private final boolean createdProcessor;
73
74 private final ServiceOperationFuture disposalFuture = new ServiceOperationFuture();
75
76 private volatile boolean selectable;
77
78 /** The connector thread */
79 private final AtomicReference<Connector> connectorRef = new AtomicReference<Connector>();
80
81 /**
82 * Constructor for {@link AbstractPollingIoConnector}. You need to provide a default
83 * session configuration, a class of {@link IoProcessor} which will be instantiated in a
84 * {@link SimpleIoProcessorPool} for better scaling in multiprocessor systems. The default
85 * pool size will be used.
86 *
87 * @see SimpleIoProcessorPool
88 *
89 * @param sessionConfig
90 * the default configuration for the managed {@link IoSession}
91 * @param processorClass a {@link Class} of {@link IoProcessor} for the associated {@link IoSession}
92 * type.
93 */
94 protected AbstractPollingIoConnector(IoSessionConfig sessionConfig, Class<? extends IoProcessor<T>> processorClass) {
95 this(sessionConfig, null, new SimpleIoProcessorPool<T>(processorClass), true);
96 }
97
98 /**
99 * Constructor for {@link AbstractPollingIoConnector}. You need to provide a default
100 * session configuration, a class of {@link IoProcessor} which will be instantiated in a
101 * {@link SimpleIoProcessorPool} for using multiple thread for better scaling in multiprocessor
102 * systems.
103 *
104 * @see SimpleIoProcessorPool
105 *
106 * @param sessionConfig
107 * the default configuration for the managed {@link IoSession}
108 * @param processorClass a {@link Class} of {@link IoProcessor} for the associated {@link IoSession}
109 * type.
110 * @param processorCount the amount of processor to instantiate for the pool
111 */
112 protected AbstractPollingIoConnector(IoSessionConfig sessionConfig, Class<? extends IoProcessor<T>> processorClass,
113 int processorCount) {
114 this(sessionConfig, null, new SimpleIoProcessorPool<T>(processorClass, processorCount), true);
115 }
116
117 /**
118 * Constructor for {@link AbstractPollingIoConnector}. You need to provide a default
119 * session configuration, a default {@link Executor} will be created using
120 * {@link Executors#newCachedThreadPool()}.
121 *
122 * {@see AbstractIoService#AbstractIoService(IoSessionConfig, Executor)}
123 *
124 * @param sessionConfig
125 * the default configuration for the managed {@link IoSession}
126 * @param processor the {@link IoProcessor} for processing the {@link IoSession} of this transport, triggering
127 * events to the bound {@link IoHandler} and processing the chains of {@link IoFilter}
128 */
129 protected AbstractPollingIoConnector(IoSessionConfig sessionConfig, IoProcessor<T> processor) {
130 this(sessionConfig, null, processor, false);
131 }
132
133 /**
134 * Constructor for {@link AbstractPollingIoConnector}. You need to provide a default
135 * session configuration and an {@link Executor} for handling I/O events. If
136 * null {@link Executor} is provided, a default one will be created using
137 * {@link Executors#newCachedThreadPool()}.
138 *
139 * {@see AbstractIoService#AbstractIoService(IoSessionConfig, Executor)}
140 *
141 * @param sessionConfig
142 * the default configuration for the managed {@link IoSession}
143 * @param executor
144 * the {@link Executor} used for handling asynchronous execution of I/O
145 * events. Can be <code>null</code>.
146 * @param processor the {@link IoProcessor} for processing the {@link IoSession} of this transport, triggering
147 * events to the bound {@link IoHandler} and processing the chains of {@link IoFilter}
148 */
149 protected AbstractPollingIoConnector(IoSessionConfig sessionConfig, Executor executor, IoProcessor<T> processor) {
150 this(sessionConfig, executor, processor, false);
151 }
152
153 /**
154 * Constructor for {@link AbstractPollingIoAcceptor}. You need to provide a default
155 * session configuration and an {@link Executor} for handling I/O events. If
156 * null {@link Executor} is provided, a default one will be created using
157 * {@link Executors#newCachedThreadPool()}.
158 *
159 * {@see AbstractIoService#AbstractIoService(IoSessionConfig, Executor)}
160 *
161 * @param sessionConfig
162 * the default configuration for the managed {@link IoSession}
163 * @param executor
164 * the {@link Executor} used for handling asynchronous execution of I/O
165 * events. Can be <code>null</code>.
166 * @param processor the {@link IoProcessor} for processing the {@link IoSession} of this transport, triggering
167 * events to the bound {@link IoHandler} and processing the chains of {@link IoFilter}
168 * @param createdProcessor tagging the processor as automatically created, so it will be automatically disposed
169 */
170 private AbstractPollingIoConnector(IoSessionConfig sessionConfig, Executor executor, IoProcessor<T> processor,
171 boolean createdProcessor) {
172 super(sessionConfig, executor);
173
174 if (processor == null) {
175 throw new IllegalArgumentException("processor");
176 }
177
178 this.processor = processor;
179 this.createdProcessor = createdProcessor;
180
181 try {
182 init();
183 selectable = true;
184 } catch (RuntimeException e) {
185 throw e;
186 } catch (Exception e) {
187 throw new RuntimeIoException("Failed to initialize.", e);
188 } finally {
189 if (!selectable) {
190 try {
191 destroy();
192 } catch (Exception e) {
193 ExceptionMonitor.getInstance().exceptionCaught(e);
194 }
195 }
196 }
197 }
198
199 /**
200 * Initialize the polling system, will be called at construction time.
201 * @throws Exception any exception thrown by the underlying system calls
202 */
203 protected abstract void init() throws Exception;
204
205 /**
206 * Destroy the polling system, will be called when this {@link IoConnector}
207 * implementation will be disposed.
208 * @throws Exception any exception thrown by the underlying systems calls
209 */
210 protected abstract void destroy() throws Exception;
211
212 /**
213 * Create a new client socket handle from a local {@link SocketAddress}
214 * @param localAddress the socket address for binding the new client socket
215 * @return a new client socket handle
216 * @throws Exception any exception thrown by the underlying systems calls
217 */
218 protected abstract H newHandle(SocketAddress localAddress) throws Exception;
219
220 /**
221 * Connect a newly created client socket handle to a remote {@link SocketAddress}.
222 * This operation is non-blocking, so at end of the call the socket can be still in connection
223 * process.
224 * @param handle the client socket handle
225 * @param remoteAddress the remote address where to connect
226 * @return <tt>true</tt> if a connection was established, <tt>false</tt> if this client socket
227 * is in non-blocking mode and the connection operation is in progress
228 * @throws Exception
229 */
230 protected abstract boolean connect(H handle, SocketAddress remoteAddress) throws Exception;
231
232 /**
233 * Finish the connection process of a client socket after it was marked as ready to process
234 * by the {@link #select(int)} call. The socket will be connected or reported as connection
235 * failed.
236 * @param handle the client socket handle to finsh to connect
237 * @return true if the socket is connected
238 * @throws Exception any exception thrown by the underlying systems calls
239 */
240 protected abstract boolean finishConnect(H handle) throws Exception;
241
242 /**
243 * Create a new {@link IoSession} from a connected socket client handle.
244 * Will assign the created {@link IoSession} to the given {@link IoProcessor} for
245 * managing future I/O events.
246 * @param processor the processor in charge of this session
247 * @param handle the newly connected client socket handle
248 * @return a new {@link IoSession}
249 * @throws Exception any exception thrown by the underlying systems calls
250 */
251 protected abstract T newSession(IoProcessor<T> processor, H handle) throws Exception;
252
253 /**
254 * Close a client socket.
255 * @param handle the client socket
256 * @throws Exception any exception thrown by the underlying systems calls
257 */
258 protected abstract void close(H handle) throws Exception;
259
260 /**
261 * Interrupt the {@link #select()} method. Used when the poll set need to be modified.
262 */
263 protected abstract void wakeup();
264
265 /**
266 * Check for connected sockets, interrupt when at least a connection is processed (connected or
267 * failed to connect). All the client socket descriptors processed need to be returned by
268 * {@link #selectedHandles()}
269 * @return The number of socket having received some data
270 * @throws Exception any exception thrown by the underlying systems calls
271 */
272 protected abstract int select(int timeout) throws Exception;
273
274 /**
275 * {@link Iterator} for the set of client sockets found connected or
276 * failed to connect during the last {@link #select()} call.
277 * @return the list of client socket handles to process
278 */
279 protected abstract Iterator<H> selectedHandles();
280
281 /**
282 * {@link Iterator} for all the client sockets polled for connection.
283 * @return the list of client sockets currently polled for connection
284 */
285 protected abstract Iterator<H> allHandles();
286
287 /**
288 * Register a new client socket for connection, add it to connection polling
289 * @param handle client socket handle
290 * @param request the associated {@link ConnectionRequest}
291 * @throws Exception any exception thrown by the underlying systems calls
292 */
293 protected abstract void register(H handle, ConnectionRequest request) throws Exception;
294
295 /**
296 * get the {@link ConnectionRequest} for a given client socket handle
297 * @param handle the socket client handle
298 * @return the connection request if the socket is connecting otherwise <code>null</code>
299 */
300 protected abstract ConnectionRequest getConnectionRequest(H handle);
301
302 /**
303 * {@inheritDoc}
304 */
305 @Override
306 protected final void dispose0() throws Exception {
307 startupWorker();
308 wakeup();
309 }
310
311 /**
312 * {@inheritDoc}
313 */
314 @Override
315 @SuppressWarnings("unchecked")
316 protected final ConnectFuture connect0(SocketAddress remoteAddress, SocketAddress localAddress,
317 IoSessionInitializer<? extends ConnectFuture> sessionInitializer) {
318 H handle = null;
319 boolean success = false;
320 try {
321 handle = newHandle(localAddress);
322 if (connect(handle, remoteAddress)) {
323 ConnectFuture future = new DefaultConnectFuture();
324 T session = newSession(processor, handle);
325 initSession(session, future, sessionInitializer);
326 // Forward the remaining process to the IoProcessor.
327 session.getProcessor().add(session);
328 success = true;
329 return future;
330 }
331
332 success = true;
333 } catch (Exception e) {
334 return DefaultConnectFuture.newFailedFuture(e);
335 } finally {
336 if (!success && handle != null) {
337 try {
338 close(handle);
339 } catch (Exception e) {
340 ExceptionMonitor.getInstance().exceptionCaught(e);
341 }
342 }
343 }
344
345 ConnectionRequest request = new ConnectionRequest(handle, sessionInitializer);
346 connectQueue.add(request);
347 startupWorker();
348 wakeup();
349
350 return request;
351 }
352
353 private void startupWorker() {
354 if (!selectable) {
355 connectQueue.clear();
356 cancelQueue.clear();
357 }
358
359 Connector connector = connectorRef.get();
360
361 if (connector == null) {
362 connector = new Connector();
363
364 if (connectorRef.compareAndSet(null, connector)) {
365 executeWorker(connector);
366 }
367 }
368 }
369
370 private int registerNew() {
371 int nHandles = 0;
372 for (;;) {
373 ConnectionRequest req = connectQueue.poll();
374 if (req == null) {
375 break;
376 }
377
378 H handle = req.handle;
379 try {
380 register(handle, req);
381 nHandles++;
382 } catch (Exception e) {
383 req.setException(e);
384 try {
385 close(handle);
386 } catch (Exception e2) {
387 ExceptionMonitor.getInstance().exceptionCaught(e2);
388 }
389 }
390 }
391 return nHandles;
392 }
393
394 private int cancelKeys() {
395 int nHandles = 0;
396
397 for (;;) {
398 ConnectionRequest req = cancelQueue.poll();
399
400 if (req == null) {
401 break;
402 }
403
404 H handle = req.handle;
405
406 try {
407 close(handle);
408 } catch (Exception e) {
409 ExceptionMonitor.getInstance().exceptionCaught(e);
410 } finally {
411 nHandles++;
412 }
413 }
414
415 if (nHandles > 0) {
416 wakeup();
417 }
418
419 return nHandles;
420 }
421
422 /**
423 * Process the incoming connections, creating a new session for each
424 * valid connection.
425 */
426 private int processConnections(Iterator<H> handlers) {
427 int nHandles = 0;
428
429 // Loop on each connection request
430 while (handlers.hasNext()) {
431 H handle = handlers.next();
432 handlers.remove();
433
434 ConnectionRequest connectionRequest = getConnectionRequest(handle);
435
436 if (connectionRequest == null) {
437 continue;
438 }
439
440 boolean success = false;
441 try {
442 if (finishConnect(handle)) {
443 T session = newSession(processor, handle);
444 initSession(session, connectionRequest, connectionRequest.getSessionInitializer());
445 // Forward the remaining process to the IoProcessor.
446 session.getProcessor().add(session);
447 nHandles++;
448 }
449 success = true;
450 } catch (Throwable e) {
451 connectionRequest.setException(e);
452 } finally {
453 if (!success) {
454 // The connection failed, we have to cancel it.
455 cancelQueue.offer(connectionRequest);
456 }
457 }
458 }
459 return nHandles;
460 }
461
462 private void processTimedOutSessions(Iterator<H> handles) {
463 long currentTime = System.currentTimeMillis();
464
465 while (handles.hasNext()) {
466 H handle = handles.next();
467 ConnectionRequest connectionRequest = getConnectionRequest(handle);
468
469 if ((connectionRequest != null) && (currentTime >= connectionRequest.deadline)) {
470 connectionRequest.setException(new ConnectException("Connection timed out."));
471 cancelQueue.offer(connectionRequest);
472 }
473 }
474 }
475
476 private class Connector implements Runnable {
477
478 public void run() {
479 assert (connectorRef.get() == this);
480
481 int nHandles = 0;
482
483 while (selectable) {
484 try {
485 // the timeout for select shall be smaller of the connect
486 // timeout or 1 second...
487 int timeout = (int) Math.min(getConnectTimeoutMillis(), 1000L);
488 int selected = select(timeout);
489
490 nHandles += registerNew();
491
492 // get a chance to get out of the connector loop, if we don't have any more handles
493 if (nHandles == 0) {
494 connectorRef.set(null);
495
496 if (connectQueue.isEmpty()) {
497 assert (connectorRef.get() != this);
498 break;
499 }
500
501 if (!connectorRef.compareAndSet(null, this)) {
502 assert (connectorRef.get() != this);
503 break;
504 }
505
506 assert (connectorRef.get() == this);
507 }
508
509 if (selected > 0) {
510 nHandles -= processConnections(selectedHandles());
511 }
512
513 processTimedOutSessions(allHandles());
514
515 nHandles -= cancelKeys();
516 } catch (ClosedSelectorException cse) {
517 // If the selector has been closed, we can exit the loop
518 break;
519 } catch (Throwable e) {
520 ExceptionMonitor.getInstance().exceptionCaught(e);
521
522 try {
523 Thread.sleep(1000);
524 } catch (InterruptedException e1) {
525 ExceptionMonitor.getInstance().exceptionCaught(e1);
526 }
527 }
528 }
529
530 if (selectable && isDisposing()) {
531 selectable = false;
532 try {
533 if (createdProcessor) {
534 processor.dispose();
535 }
536 } finally {
537 try {
538 synchronized (disposalLock) {
539 if (isDisposing()) {
540 destroy();
541 }
542 }
543 } catch (Exception e) {
544 ExceptionMonitor.getInstance().exceptionCaught(e);
545 } finally {
546 disposalFuture.setDone();
547 }
548 }
549 }
550 }
551 }
552
553 public final class ConnectionRequest extends DefaultConnectFuture {
554 private final H handle;
555
556 private final long deadline;
557
558 private final IoSessionInitializer<? extends ConnectFuture> sessionInitializer;
559
560 public ConnectionRequest(H handle, IoSessionInitializer<? extends ConnectFuture> callback) {
561 this.handle = handle;
562 long timeout = getConnectTimeoutMillis();
563 if (timeout <= 0L) {
564 this.deadline = Long.MAX_VALUE;
565 } else {
566 this.deadline = System.currentTimeMillis() + timeout;
567 }
568 this.sessionInitializer = callback;
569 }
570
571 public H getHandle() {
572 return handle;
573 }
574
575 public long getDeadline() {
576 return deadline;
577 }
578
579 public IoSessionInitializer<? extends ConnectFuture> getSessionInitializer() {
580 return sessionInitializer;
581 }
582
583 @Override
584 public void cancel() {
585 if (!isDone()) {
586 super.cancel();
587 cancelQueue.add(this);
588 startupWorker();
589 wakeup();
590 }
591 }
592 }
593 }