1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 *
19 */
20 package org.apache.mina.core.polling;
21
22 import java.net.SocketAddress;
23 import java.nio.channels.ClosedSelectorException;
24 import java.util.Collections;
25 import java.util.HashMap;
26 import java.util.HashSet;
27 import java.util.Iterator;
28 import java.util.List;
29 import java.util.Map;
30 import java.util.Queue;
31 import java.util.Set;
32 import java.util.concurrent.ConcurrentHashMap;
33 import java.util.concurrent.ConcurrentLinkedQueue;
34 import java.util.concurrent.Executor;
35 import java.util.concurrent.Executors;
36 import java.util.concurrent.Semaphore;
37 import java.util.concurrent.atomic.AtomicReference;
38
39 import org.apache.mina.core.RuntimeIoException;
40 import org.apache.mina.core.filterchain.IoFilter;
41 import org.apache.mina.core.service.AbstractIoAcceptor;
42 import org.apache.mina.core.service.IoAcceptor;
43 import org.apache.mina.core.service.IoHandler;
44 import org.apache.mina.core.service.IoProcessor;
45 import org.apache.mina.core.service.SimpleIoProcessorPool;
46 import org.apache.mina.core.session.AbstractIoSession;
47 import org.apache.mina.core.session.IoSession;
48 import org.apache.mina.core.session.IoSessionConfig;
49 import org.apache.mina.transport.socket.SocketSessionConfig;
50 import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
51 import org.apache.mina.util.ExceptionMonitor;
52
53 /**
54 * A base class for implementing transport using a polling strategy. The
55 * underlying sockets will be checked in an active loop and woke up when an
56 * socket needed to be processed. This class handle the logic behind binding,
57 * accepting and disposing the server sockets. An {@link Executor} will be used
58 * for running client accepting and an {@link AbstractPollingIoProcessor} will
59 * be used for processing client I/O operations like reading, writing and
60 * closing.
61 *
62 * All the low level methods for binding, accepting, closing need to be provided
63 * by the subclassing implementation.
64 *
65 * @see NioSocketAcceptor for a example of implementation
66 *
67 * @author <a href="http://mina.apache.org">Apache MINA Project</a>
68 */
69 public abstract class AbstractPollingIoAcceptor<S extends AbstractIoSession, H> extends AbstractIoAcceptor {
70 /** A lock used to protect the selector to be waked up before it's created */
71 private final Semaphore lock = new Semaphore(1);
72
73 private final IoProcessor<S> processor;
74
75 private final boolean createdProcessor;
76
77 private final Queue<AcceptorOperationFuture> registerQueue = new ConcurrentLinkedQueue<AcceptorOperationFuture>();
78
79 private final Queue<AcceptorOperationFuture> cancelQueue = new ConcurrentLinkedQueue<AcceptorOperationFuture>();
80
81 private final Map<SocketAddress, H> boundHandles = Collections.synchronizedMap(new HashMap<SocketAddress, H>());
82
83 private final ServiceOperationFuture disposalFuture = new ServiceOperationFuture();
84
85 /** A flag set when the acceptor has been created and initialized */
86 private volatile boolean selectable;
87
88 /** The thread responsible of accepting incoming requests */
89 private AtomicReference<Acceptor> acceptorRef = new AtomicReference<Acceptor>();
90
91 protected boolean reuseAddress = false;
92
93 /**
94 * Define the number of socket that can wait to be accepted. Default
95 * to 50 (as in the SocketServer default).
96 */
97 protected int backlog = 50;
98
99 /**
100 * Constructor for {@link AbstractPollingIoAcceptor}. You need to provide a default
101 * session configuration, a class of {@link IoProcessor} which will be instantiated in a
102 * {@link SimpleIoProcessorPool} for better scaling in multiprocessor systems. The default
103 * pool size will be used.
104 *
105 * @see SimpleIoProcessorPool
106 *
107 * @param sessionConfig
108 * the default configuration for the managed {@link IoSession}
109 * @param processorClass a {@link Class} of {@link IoProcessor} for the associated {@link IoSession}
110 * type.
111 */
112 protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Class<? extends IoProcessor<S>> processorClass) {
113 this(sessionConfig, null, new SimpleIoProcessorPool<S>(processorClass), true);
114 }
115
116 /**
117 * Constructor for {@link AbstractPollingIoAcceptor}. You need to provide a default
118 * session configuration, a class of {@link IoProcessor} which will be instantiated in a
119 * {@link SimpleIoProcessorPool} for using multiple thread for better scaling in multiprocessor
120 * systems.
121 *
122 * @see SimpleIoProcessorPool
123 *
124 * @param sessionConfig
125 * the default configuration for the managed {@link IoSession}
126 * @param processorClass a {@link Class} of {@link IoProcessor} for the associated {@link IoSession}
127 * type.
128 * @param processorCount the amount of processor to instantiate for the pool
129 */
130 protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Class<? extends IoProcessor<S>> processorClass,
131 int processorCount) {
132 this(sessionConfig, null, new SimpleIoProcessorPool<S>(processorClass, processorCount), true);
133 }
134
135 /**
136 * Constructor for {@link AbstractPollingIoAcceptor}. You need to provide a default
137 * session configuration, a default {@link Executor} will be created using
138 * {@link Executors#newCachedThreadPool()}.
139 *
140 * {@see AbstractIoService#AbstractIoService(IoSessionConfig, Executor)}
141 *
142 * @param sessionConfig
143 * the default configuration for the managed {@link IoSession}
144 * @param processor the {@link IoProcessor} for processing the {@link IoSession} of this transport, triggering
145 * events to the bound {@link IoHandler} and processing the chains of {@link IoFilter}
146 */
147 protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, IoProcessor<S> processor) {
148 this(sessionConfig, null, processor, false);
149 }
150
151 /**
152 * Constructor for {@link AbstractPollingIoAcceptor}. You need to provide a default
153 * session configuration and an {@link Executor} for handling I/O events. If a
154 * null {@link Executor} is provided, a default one will be created using
155 * {@link Executors#newCachedThreadPool()}.
156 *
157 * {@see AbstractIoService#AbstractIoService(IoSessionConfig, Executor)}
158 *
159 * @param sessionConfig
160 * the default configuration for the managed {@link IoSession}
161 * @param executor
162 * the {@link Executor} used for handling asynchronous execution of I/O
163 * events. Can be <code>null</code>.
164 * @param processor the {@link IoProcessor} for processing the {@link IoSession} of this transport, triggering
165 * events to the bound {@link IoHandler} and processing the chains of {@link IoFilter}
166 */
167 protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Executor executor, IoProcessor<S> processor) {
168 this(sessionConfig, executor, processor, false);
169 }
170
171 /**
172 * Constructor for {@link AbstractPollingIoAcceptor}. You need to provide a default
173 * session configuration and an {@link Executor} for handling I/O events. If a
174 * null {@link Executor} is provided, a default one will be created using
175 * {@link Executors#newCachedThreadPool()}.
176 *
177 * {@see AbstractIoService#AbstractIoService(IoSessionConfig, Executor)}
178 *
179 * @param sessionConfig
180 * the default configuration for the managed {@link IoSession}
181 * @param executor
182 * the {@link Executor} used for handling asynchronous execution of I/O
183 * events. Can be <code>null</code>.
184 * @param processor the {@link IoProcessor} for processing the {@link IoSession} of
185 * this transport, triggering events to the bound {@link IoHandler} and processing
186 * the chains of {@link IoFilter}
187 * @param createdProcessor tagging the processor as automatically created, so it
188 * will be automatically disposed
189 */
190 private AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Executor executor, IoProcessor<S> processor,
191 boolean createdProcessor) {
192 super(sessionConfig, executor);
193
194 if (processor == null) {
195 throw new IllegalArgumentException("processor");
196 }
197
198 this.processor = processor;
199 this.createdProcessor = createdProcessor;
200
201 try {
202 // Initialize the selector
203 init();
204
205 // The selector is now ready, we can switch the
206 // flag to true so that incoming connection can be accepted
207 selectable = true;
208 } catch (RuntimeException e) {
209 throw e;
210 } catch (Exception e) {
211 throw new RuntimeIoException("Failed to initialize.", e);
212 } finally {
213 if (!selectable) {
214 try {
215 destroy();
216 } catch (Exception e) {
217 ExceptionMonitor.getInstance().exceptionCaught(e);
218 }
219 }
220 }
221 }
222
223 /**
224 * Initialize the polling system, will be called at construction time.
225 * @throws Exception any exception thrown by the underlying system calls
226 */
227 protected abstract void init() throws Exception;
228
229 /**
230 * Destroy the polling system, will be called when this {@link IoAcceptor}
231 * implementation will be disposed.
232 * @throws Exception any exception thrown by the underlying systems calls
233 */
234 protected abstract void destroy() throws Exception;
235
236 /**
237 * Check for acceptable connections, interrupt when at least a server is ready for accepting.
238 * All the ready server socket descriptors need to be returned by {@link #selectedHandles()}
239 * @return The number of sockets having got incoming client
240 * @throws Exception any exception thrown by the underlying systems calls
241 */
242 protected abstract int select() throws Exception;
243
244 /**
245 * Interrupt the {@link #select()} method. Used when the poll set need to be modified.
246 */
247 protected abstract void wakeup();
248
249 /**
250 * {@link Iterator} for the set of server sockets found with acceptable incoming connections
251 * during the last {@link #select()} call.
252 * @return the list of server handles ready
253 */
254 protected abstract Iterator<H> selectedHandles();
255
256 /**
257 * Open a server socket for a given local address.
258 * @param localAddress the associated local address
259 * @return the opened server socket
260 * @throws Exception any exception thrown by the underlying systems calls
261 */
262 protected abstract H open(SocketAddress localAddress) throws Exception;
263
264 /**
265 * Get the local address associated with a given server socket
266 * @param handle the server socket
267 * @return the local {@link SocketAddress} associated with this handle
268 * @throws Exception any exception thrown by the underlying systems calls
269 */
270 protected abstract SocketAddress localAddress(H handle) throws Exception;
271
272 /**
273 * Accept a client connection for a server socket and return a new {@link IoSession}
274 * associated with the given {@link IoProcessor}
275 * @param processor the {@link IoProcessor} to associate with the {@link IoSession}
276 * @param handle the server handle
277 * @return the created {@link IoSession}
278 * @throws Exception any exception thrown by the underlying systems calls
279 */
280 protected abstract S accept(IoProcessor<S> processor, H handle) throws Exception;
281
282 /**
283 * Close a server socket.
284 * @param handle the server socket
285 * @throws Exception any exception thrown by the underlying systems calls
286 */
287 protected abstract void close(H handle) throws Exception;
288
289 /**
290 * {@inheritDoc}
291 */
292 @Override
293 protected void dispose0() throws Exception {
294 unbind();
295
296 startupAcceptor();
297 wakeup();
298 }
299
300 /**
301 * {@inheritDoc}
302 */
303 @Override
304 protected final Set<SocketAddress> bindInternal(List<? extends SocketAddress> localAddresses) throws Exception {
305 // Create a bind request as a Future operation. When the selector
306 // have handled the registration, it will signal this future.
307 AcceptorOperationFuture request = new AcceptorOperationFuture(localAddresses);
308
309 // adds the Registration request to the queue for the Workers
310 // to handle
311 registerQueue.add(request);
312
313 // creates the Acceptor instance and has the local
314 // executor kick it off.
315 startupAcceptor();
316
317 // As we just started the acceptor, we have to unblock the select()
318 // in order to process the bind request we just have added to the
319 // registerQueue.
320 try {
321 lock.acquire();
322
323 // Wait a bit to give a chance to the Acceptor thread to do the select()
324 Thread.sleep(10);
325 wakeup();
326 } finally {
327 lock.release();
328 }
329
330 // Now, we wait until this request is completed.
331 request.awaitUninterruptibly();
332
333 if (request.getException() != null) {
334 throw request.getException();
335 }
336
337 // Update the local addresses.
338 // setLocalAddresses() shouldn't be called from the worker thread
339 // because of deadlock.
340 Set<SocketAddress> newLocalAddresses = new HashSet<SocketAddress>();
341
342 for (H handle : boundHandles.values()) {
343 newLocalAddresses.add(localAddress(handle));
344 }
345
346 return newLocalAddresses;
347 }
348
349 /**
350 * This method is called by the doBind() and doUnbind()
351 * methods. If the acceptor is null, the acceptor object will
352 * be created and kicked off by the executor. If the acceptor
353 * object is null, probably already created and this class
354 * is now working, then nothing will happen and the method
355 * will just return.
356 */
357 private void startupAcceptor() throws InterruptedException {
358 // If the acceptor is not ready, clear the queues
359 // TODO : they should already be clean : do we have to do that ?
360 if (!selectable) {
361 registerQueue.clear();
362 cancelQueue.clear();
363 }
364
365 // start the acceptor if not already started
366 Acceptor acceptor = acceptorRef.get();
367
368 if (acceptor == null) {
369 lock.acquire();
370 acceptor = new Acceptor();
371
372 if (acceptorRef.compareAndSet(null, acceptor)) {
373 executeWorker(acceptor);
374 } else {
375 lock.release();
376 }
377 }
378 }
379
380 /**
381 * {@inheritDoc}
382 */
383 @Override
384 protected final void unbind0(List<? extends SocketAddress> localAddresses) throws Exception {
385 AcceptorOperationFuture future = new AcceptorOperationFuture(localAddresses);
386
387 cancelQueue.add(future);
388 startupAcceptor();
389 wakeup();
390
391 future.awaitUninterruptibly();
392 if (future.getException() != null) {
393 throw future.getException();
394 }
395 }
396
397 /**
398 * This class is called by the startupAcceptor() method and is
399 * placed into a NamePreservingRunnable class.
400 * It's a thread accepting incoming connections from clients.
401 * The loop is stopped when all the bound handlers are unbound.
402 */
403 private class Acceptor implements Runnable {
404 public void run() {
405 assert (acceptorRef.get() == this);
406
407 int nHandles = 0;
408
409 // Release the lock
410 lock.release();
411
412 while (selectable) {
413 try {
414 // Detect if we have some keys ready to be processed
415 // The select() will be woke up if some new connection
416 // have occurred, or if the selector has been explicitly
417 // woke up
418 int selected = select();
419
420 // this actually sets the selector to OP_ACCEPT,
421 // and binds to the port on which this class will
422 // listen on
423 nHandles += registerHandles();
424
425 // Now, if the number of registred handles is 0, we can
426 // quit the loop: we don't have any socket listening
427 // for incoming connection.
428 if (nHandles == 0) {
429 acceptorRef.set(null);
430
431 if (registerQueue.isEmpty() && cancelQueue.isEmpty()) {
432 assert (acceptorRef.get() != this);
433 break;
434 }
435
436 if (!acceptorRef.compareAndSet(null, this)) {
437 assert (acceptorRef.get() != this);
438 break;
439 }
440
441 assert (acceptorRef.get() == this);
442 }
443
444 if (selected > 0) {
445 // We have some connection request, let's process
446 // them here.
447 processHandles(selectedHandles());
448 }
449
450 // check to see if any cancellation request has been made.
451 nHandles -= unregisterHandles();
452 } catch (ClosedSelectorException cse) {
453 // If the selector has been closed, we can exit the loop
454 break;
455 } catch (Throwable e) {
456 ExceptionMonitor.getInstance().exceptionCaught(e);
457
458 try {
459 Thread.sleep(1000);
460 } catch (InterruptedException e1) {
461 ExceptionMonitor.getInstance().exceptionCaught(e1);
462 }
463 }
464 }
465
466 // Cleanup all the processors, and shutdown the acceptor.
467 if (selectable && isDisposing()) {
468 selectable = false;
469 try {
470 if (createdProcessor) {
471 processor.dispose();
472 }
473 } finally {
474 try {
475 synchronized (disposalLock) {
476 if (isDisposing()) {
477 destroy();
478 }
479 }
480 } catch (Exception e) {
481 ExceptionMonitor.getInstance().exceptionCaught(e);
482 } finally {
483 disposalFuture.setDone();
484 }
485 }
486 }
487 }
488
489 /**
490 * This method will process new sessions for the Worker class. All
491 * keys that have had their status updates as per the Selector.selectedKeys()
492 * method will be processed here. Only keys that are ready to accept
493 * connections are handled here.
494 * <p/>
495 * Session objects are created by making new instances of SocketSessionImpl
496 * and passing the session object to the SocketIoProcessor class.
497 */
498 @SuppressWarnings("unchecked")
499 private void processHandles(Iterator<H> handles) throws Exception {
500 while (handles.hasNext()) {
501 H handle = handles.next();
502 handles.remove();
503
504 // Associates a new created connection to a processor,
505 // and get back a session
506 S session = accept(processor, handle);
507
508 if (session == null) {
509 continue;
510 }
511
512 initSession(session, null, null);
513
514 // add the session to the SocketIoProcessor
515 session.getProcessor().add(session);
516 }
517 }
518 }
519
520 /**
521 * Sets up the socket communications. Sets items such as:
522 * <p/>
523 * Blocking
524 * Reuse address
525 * Receive buffer size
526 * Bind to listen port
527 * Registers OP_ACCEPT for selector
528 */
529 private int registerHandles() {
530 for (;;) {
531 // The register queue contains the list of services to manage
532 // in this acceptor.
533 AcceptorOperationFuture future = registerQueue.poll();
534
535 if (future == null) {
536 return 0;
537 }
538
539 // We create a temporary map to store the bound handles,
540 // as we may have to remove them all if there is an exception
541 // during the sockets opening.
542 Map<SocketAddress, H> newHandles = new ConcurrentHashMap<SocketAddress, H>();
543 List<SocketAddress> localAddresses = future.getLocalAddresses();
544
545 try {
546 // Process all the addresses
547 for (SocketAddress a : localAddresses) {
548 H handle = open(a);
549 newHandles.put(localAddress(handle), handle);
550 }
551
552 // Everything went ok, we can now update the map storing
553 // all the bound sockets.
554 boundHandles.putAll(newHandles);
555
556 // and notify.
557 future.setDone();
558 return newHandles.size();
559 } catch (Exception e) {
560 // We store the exception in the future
561 future.setException(e);
562 } finally {
563 // Roll back if failed to bind all addresses.
564 if (future.getException() != null) {
565 for (H handle : newHandles.values()) {
566 try {
567 close(handle);
568 } catch (Exception e) {
569 ExceptionMonitor.getInstance().exceptionCaught(e);
570 }
571 }
572
573 // TODO : add some comment : what is the wakeup() waking up ?
574 wakeup();
575 }
576 }
577 }
578 }
579
580 /**
581 * This method just checks to see if anything has been placed into the
582 * cancellation queue. The only thing that should be in the cancelQueue
583 * is CancellationRequest objects and the only place this happens is in
584 * the doUnbind() method.
585 */
586 private int unregisterHandles() {
587 int cancelledHandles = 0;
588 for (;;) {
589 AcceptorOperationFuture future = cancelQueue.poll();
590 if (future == null) {
591 break;
592 }
593
594 // close the channels
595 for (SocketAddress a : future.getLocalAddresses()) {
596 H handle = boundHandles.remove(a);
597
598 if (handle == null) {
599 continue;
600 }
601
602 try {
603 close(handle);
604 wakeup(); // wake up again to trigger thread death
605 } catch (Throwable e) {
606 ExceptionMonitor.getInstance().exceptionCaught(e);
607 } finally {
608 cancelledHandles++;
609 }
610 }
611
612 future.setDone();
613 }
614
615 return cancelledHandles;
616 }
617
618 /**
619 * {@inheritDoc}
620 */
621 public final IoSession newSession(SocketAddress remoteAddress, SocketAddress localAddress) {
622 throw new UnsupportedOperationException();
623 }
624
625 /**
626 * {@inheritDoc}
627 */
628 public int getBacklog() {
629 return backlog;
630 }
631
632 /**
633 * {@inheritDoc}
634 */
635 public void setBacklog(int backlog) {
636 synchronized (bindLock) {
637 if (isActive()) {
638 throw new IllegalStateException("backlog can't be set while the acceptor is bound.");
639 }
640
641 this.backlog = backlog;
642 }
643 }
644
645 /**
646 * {@inheritDoc}
647 */
648 public boolean isReuseAddress() {
649 return reuseAddress;
650 }
651
652 /**
653 * {@inheritDoc}
654 */
655 public void setReuseAddress(boolean reuseAddress) {
656 synchronized (bindLock) {
657 if (isActive()) {
658 throw new IllegalStateException("backlog can't be set while the acceptor is bound.");
659 }
660
661 this.reuseAddress = reuseAddress;
662 }
663 }
664
665 /**
666 * {@inheritDoc}
667 */
668 public SocketSessionConfig getSessionConfig() {
669 return (SocketSessionConfig)sessionConfig;
670 }
671 }