View Javadoc
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.core.polling;
21  
22  import java.net.SocketAddress;
23  import java.nio.channels.ClosedSelectorException;
24  import java.nio.channels.spi.SelectorProvider;
25  import java.util.Collections;
26  import java.util.HashMap;
27  import java.util.HashSet;
28  import java.util.Iterator;
29  import java.util.List;
30  import java.util.Map;
31  import java.util.Queue;
32  import java.util.Set;
33  import java.util.concurrent.ConcurrentHashMap;
34  import java.util.concurrent.ConcurrentLinkedQueue;
35  import java.util.concurrent.Executor;
36  import java.util.concurrent.Executors;
37  import java.util.concurrent.Semaphore;
38  import java.util.concurrent.atomic.AtomicReference;
39  
40  import org.apache.mina.core.RuntimeIoException;
41  import org.apache.mina.core.filterchain.IoFilter;
42  import org.apache.mina.core.service.AbstractIoAcceptor;
43  import org.apache.mina.core.service.AbstractIoService;
44  import org.apache.mina.core.service.IoAcceptor;
45  import org.apache.mina.core.service.IoHandler;
46  import org.apache.mina.core.service.IoProcessor;
47  import org.apache.mina.core.service.SimpleIoProcessorPool;
48  import org.apache.mina.core.session.AbstractIoSession;
49  import org.apache.mina.core.session.IoSession;
50  import org.apache.mina.core.session.IoSessionConfig;
51  import org.apache.mina.transport.socket.SocketSessionConfig;
52  import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
53  import org.apache.mina.util.ExceptionMonitor;
54  
55  /**
56   * A base class for implementing transport using a polling strategy. The
57   * underlying sockets will be checked in an active loop and woke up when an
58   * socket needed to be processed. This class handle the logic behind binding,
59   * accepting and disposing the server sockets. An {@link Executor} will be used
60   * for running client accepting and an {@link AbstractPollingIoProcessor} will
61   * be used for processing client I/O operations like reading, writing and
62   * closing.
63   * 
64   * All the low level methods for binding, accepting, closing need to be provided
65   * by the subclassing implementation.
66   * 
67   * @see NioSocketAcceptor for a example of implementation
68   * 
69   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
70   */
71  public abstract class AbstractPollingIoAcceptor<S extends AbstractIoSession, H> extends AbstractIoAcceptor {
72      /** A lock used to protect the selector to be waked up before it's created */
73      private final Semaphore lock = new Semaphore(1);
74  
75      private final IoProcessor<S> processor;
76  
77      private final boolean createdProcessor;
78  
79      private final Queue<AcceptorOperationFuture> registerQueue = new ConcurrentLinkedQueue<>();
80  
81      private final Queue<AcceptorOperationFuture> cancelQueue = new ConcurrentLinkedQueue<>();
82  
83      private final Map<SocketAddress, H> boundHandles = Collections.synchronizedMap(new HashMap<SocketAddress, H>());
84  
85      private final ServiceOperationFuture disposalFuture = new ServiceOperationFuture();
86  
87      /** A flag set when the acceptor has been created and initialized */
88      private volatile boolean selectable;
89  
90      /** The thread responsible of accepting incoming requests */
91      private AtomicReference<Acceptor> acceptorRef = new AtomicReference<>();
92  
93      protected boolean reuseAddress = false;
94  
95      /**
96       * Define the number of socket that can wait to be accepted. Default
97       * to 50 (as in the SocketServer default).
98       */
99      protected int backlog = 50;
100 
101     /**
102      * Constructor for {@link AbstractPollingIoAcceptor}. You need to provide a default
103      * session configuration, a class of {@link IoProcessor} which will be instantiated in a
104      * {@link SimpleIoProcessorPool} for better scaling in multiprocessor systems. The default
105      * pool size will be used.
106      * 
107      * @see SimpleIoProcessorPool
108      * 
109      * @param sessionConfig
110      *            the default configuration for the managed {@link IoSession}
111      * @param processorClass a {@link Class} of {@link IoProcessor} for the associated {@link IoSession}
112      *            type.
113      */
114     protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Class<? extends IoProcessor<S>> processorClass) {
115         this(sessionConfig, null, new SimpleIoProcessorPool<S>(processorClass), true, null);
116     }
117 
118     /**
119      * Constructor for {@link AbstractPollingIoAcceptor}. You need to provide a default
120      * session configuration, a class of {@link IoProcessor} which will be instantiated in a
121      * {@link SimpleIoProcessorPool} for using multiple thread for better scaling in multiprocessor
122      * systems.
123      * 
124      * @see SimpleIoProcessorPool
125      * 
126      * @param sessionConfig
127      *            the default configuration for the managed {@link IoSession}
128      * @param processorClass a {@link Class} of {@link IoProcessor} for the associated {@link IoSession}
129      *            type.
130      * @param processorCount the amount of processor to instantiate for the pool
131      */
132     protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Class<? extends IoProcessor<S>> processorClass,
133             int processorCount) {
134         this(sessionConfig, null, new SimpleIoProcessorPool<S>(processorClass, processorCount), true, null);
135     }
136 
137     /**
138      * Constructor for {@link AbstractPollingIoAcceptor}. You need to provide a default
139      * session configuration, a class of {@link IoProcessor} which will be instantiated in a
140      * {@link SimpleIoProcessorPool} for using multiple thread for better scaling in multiprocessor
141      * systems.
142      *
143      * @see SimpleIoProcessorPool
144      *
145      * @param sessionConfig
146      *            the default configuration for the managed {@link IoSession}
147      * @param processorClass a {@link Class}�of {@link IoProcessor} for the associated {@link IoSession}
148      *            type.
149      * @param processorCount the amount of processor to instantiate for the pool
150      * @param selectorProvider The SelectorProvider to use
151      */
152     protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Class<? extends IoProcessor<S>> processorClass,
153             int processorCount, SelectorProvider selectorProvider ) {
154         this(sessionConfig, null, new SimpleIoProcessorPool<S>(processorClass, processorCount, selectorProvider), true, selectorProvider);
155     }
156 
157     /**
158      * Constructor for {@link AbstractPollingIoAcceptor}. You need to provide a default
159      * session configuration, a default {@link Executor} will be created using
160      * {@link Executors#newCachedThreadPool()}.
161      * 
162      * @see AbstractIoService
163      * 
164      * @param sessionConfig
165      *            the default configuration for the managed {@link IoSession}
166      * @param processor the {@link IoProcessor} for processing the {@link IoSession} of this transport, triggering
167      *            events to the bound {@link IoHandler} and processing the chains of {@link IoFilter}
168      */
169     protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, IoProcessor<S> processor) {
170         this(sessionConfig, null, processor, false, null);
171     }
172 
173     /**
174      * Constructor for {@link AbstractPollingIoAcceptor}. You need to provide a
175      * default session configuration and an {@link Executor} for handling I/O
176      * events. If a null {@link Executor} is provided, a default one will be
177      * created using {@link Executors#newCachedThreadPool()}.
178      * 
179      * @see AbstractIoService#AbstractIoService(IoSessionConfig, Executor)
180      * 
181      * @param sessionConfig
182      *            the default configuration for the managed {@link IoSession}
183      * @param executor
184      *            the {@link Executor} used for handling asynchronous execution
185      *            of I/O events. Can be <code>null</code>.
186      * @param processor
187      *            the {@link IoProcessor} for processing the {@link IoSession}
188      *            of this transport, triggering events to the bound
189      *            {@link IoHandler} and processing the chains of
190      *            {@link IoFilter}
191      */
192     protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Executor executor, IoProcessor<S> processor) {
193         this(sessionConfig, executor, processor, false, null);
194     }
195 
196     /**
197      * Constructor for {@link AbstractPollingIoAcceptor}. You need to provide a
198      * default session configuration and an {@link Executor} for handling I/O
199      * events. If a null {@link Executor} is provided, a default one will be
200      * created using {@link Executors#newCachedThreadPool()}.
201      * 
202      * @see AbstractIoService(IoSessionConfig, Executor)
203      * 
204      * @param sessionConfig
205      *            the default configuration for the managed {@link IoSession}
206      * @param executor
207      *            the {@link Executor} used for handling asynchronous execution
208      *            of I/O events. Can be <code>null</code>.
209      * @param processor
210      *            the {@link IoProcessor} for processing the {@link IoSession}
211      *            of this transport, triggering events to the bound
212      *            {@link IoHandler} and processing the chains of
213      *            {@link IoFilter}
214      * @param createdProcessor
215      *            tagging the processor as automatically created, so it will be
216      *            automatically disposed
217      */
218     private AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Executor executor, IoProcessor<S> processor,
219             boolean createdProcessor, SelectorProvider selectorProvider) {
220         super(sessionConfig, executor);
221 
222         if (processor == null) {
223             throw new IllegalArgumentException("processor");
224         }
225 
226         this.processor = processor;
227         this.createdProcessor = createdProcessor;
228 
229         try {
230             // Initialize the selector
231             init(selectorProvider);
232 
233             // The selector is now ready, we can switch the
234             // flag to true so that incoming connection can be accepted
235             selectable = true;
236         } catch (RuntimeException e) {
237             throw e;
238         } catch (Exception e) {
239             throw new RuntimeIoException("Failed to initialize.", e);
240         } finally {
241             if (!selectable) {
242                 try {
243                     destroy();
244                 } catch (Exception e) {
245                     ExceptionMonitor.getInstance().exceptionCaught(e);
246                 }
247             }
248         }
249     }
250 
251     /**
252      * Initialize the polling system, will be called at construction time.
253      * @throws Exception any exception thrown by the underlying system calls
254      */
255     protected abstract void init() throws Exception;
256 
257     /**
258      * Initialize the polling system, will be called at construction time.
259      * 
260      * @param selectorProvider The Selector Provider that will be used by this polling acceptor
261      * @throws Exception any exception thrown by the underlying system calls
262      */
263     protected abstract void init(SelectorProvider selectorProvider) throws Exception;
264 
265     /**
266      * Destroy the polling system, will be called when this {@link IoAcceptor}
267      * implementation will be disposed.
268      * @throws Exception any exception thrown by the underlying systems calls
269      */
270     protected abstract void destroy() throws Exception;
271 
272     /**
273      * Check for acceptable connections, interrupt when at least a server is ready for accepting.
274      * All the ready server socket descriptors need to be returned by {@link #selectedHandles()}
275      * @return The number of sockets having got incoming client
276      * @throws Exception any exception thrown by the underlying systems calls
277      */
278     protected abstract int select() throws Exception;
279 
280     /**
281      * Interrupt the {@link #select()} method. Used when the poll set need to be modified.
282      */
283     protected abstract void wakeup();
284 
285     /**
286      * {@link Iterator} for the set of server sockets found with acceptable incoming connections
287      *  during the last {@link #select()} call.
288      * @return the list of server handles ready
289      */
290     protected abstract Iterator<H> selectedHandles();
291 
292     /**
293      * Open a server socket for a given local address.
294      * @param localAddress the associated local address
295      * @return the opened server socket
296      * @throws Exception any exception thrown by the underlying systems calls
297      */
298     protected abstract H open(SocketAddress localAddress) throws Exception;
299 
300     /**
301      * Get the local address associated with a given server socket
302      * @param handle the server socket
303      * @return the local {@link SocketAddress} associated with this handle
304      * @throws Exception any exception thrown by the underlying systems calls
305      */
306     protected abstract SocketAddress localAddress(H handle) throws Exception;
307 
308     /**
309      * Accept a client connection for a server socket and return a new {@link IoSession}
310      * associated with the given {@link IoProcessor}
311      * @param processor the {@link IoProcessor} to associate with the {@link IoSession}
312      * @param handle the server handle
313      * @return the created {@link IoSession}
314      * @throws Exception any exception thrown by the underlying systems calls
315      */
316     protected abstract S accept(IoProcessor<S> processor, H handle) throws Exception;
317 
318     /**
319      * Close a server socket.
320      * @param handle the server socket
321      * @throws Exception any exception thrown by the underlying systems calls
322      */
323     protected abstract void close(H handle) throws Exception;
324 
325     /**
326      * {@inheritDoc}
327      */
328     @Override
329     protected void dispose0() throws Exception {
330         unbind();
331 
332         startupAcceptor();
333         wakeup();
334     }
335 
336     /**
337      * {@inheritDoc}
338      */
339     @Override
340     protected final Set<SocketAddress> bindInternal(List<? extends SocketAddress> localAddresses) throws Exception {
341         // Create a bind request as a Future operation. When the selector
342         // have handled the registration, it will signal this future.
343         AcceptorOperationFuture request = new AcceptorOperationFuture(localAddresses);
344 
345         // adds the Registration request to the queue for the Workers
346         // to handle
347         registerQueue.add(request);
348 
349         // creates the Acceptor instance and has the local
350         // executor kick it off.
351         startupAcceptor();
352 
353         // As we just started the acceptor, we have to unblock the select()
354         // in order to process the bind request we just have added to the
355         // registerQueue.
356         try {
357             lock.acquire();
358 
359             wakeup();
360         } finally {
361             lock.release();
362         }
363 
364         // Now, we wait until this request is completed.
365         request.awaitUninterruptibly();
366 
367         if (request.getException() != null) {
368             throw request.getException();
369         }
370 
371         // Update the local addresses.
372         // setLocalAddresses() shouldn't be called from the worker thread
373         // because of deadlock.
374         Set<SocketAddress> newLocalAddresses = new HashSet<>();
375 
376         for (H handle : boundHandles.values()) {
377             newLocalAddresses.add(localAddress(handle));
378         }
379 
380         return newLocalAddresses;
381     }
382 
383     /**
384      * This method is called by the doBind() and doUnbind()
385      * methods.  If the acceptor is null, the acceptor object will
386      * be created and kicked off by the executor.  If the acceptor
387      * object is null, probably already created and this class
388      * is now working, then nothing will happen and the method
389      * will just return.
390      */
391     private void startupAcceptor() throws InterruptedException {
392         // If the acceptor is not ready, clear the queues
393         // TODO : they should already be clean : do we have to do that ?
394         if (!selectable) {
395             registerQueue.clear();
396             cancelQueue.clear();
397         }
398 
399         // start the acceptor if not already started
400         Acceptor acceptor = acceptorRef.get();
401 
402         if (acceptor == null) {
403             lock.acquire();
404             acceptor = new Acceptor();
405 
406             if (acceptorRef.compareAndSet(null, acceptor)) {
407                 executeWorker(acceptor);
408             } else {
409                 lock.release();
410             }
411         }
412     }
413 
414     /**
415      * {@inheritDoc}
416      */
417     @Override
418     protected final void unbind0(List<? extends SocketAddress> localAddresses) throws Exception {
419         AcceptorOperationFuture future = new AcceptorOperationFuture(localAddresses);
420 
421         cancelQueue.add(future);
422         startupAcceptor();
423         wakeup();
424 
425         future.awaitUninterruptibly();
426         if (future.getException() != null) {
427             throw future.getException();
428         }
429     }
430 
431     /**
432      * This class is called by the startupAcceptor() method and is
433      * placed into a NamePreservingRunnable class.
434      * It's a thread accepting incoming connections from clients.
435      * The loop is stopped when all the bound handlers are unbound.
436      */
437     private class Acceptor implements Runnable {
438         public void run() {
439             assert (acceptorRef.get() == this);
440 
441             int nHandles = 0;
442 
443             // Release the lock
444             lock.release();
445 
446             while (selectable) {
447                 try {
448                     // Process the bound sockets to this acceptor.
449                     // this actually sets the selector to OP_ACCEPT,
450                     // and binds to the port on which this class will
451                     // listen on. We do that before the select because 
452                     // the registerQueue containing the new handler is
453                     // already updated at this point.
454                     nHandles += registerHandles();
455 
456                     // Detect if we have some keys ready to be processed
457                     // The select() will be woke up if some new connection
458                     // have occurred, or if the selector has been explicitly
459                     // woke up
460                     int selected = select();
461 
462                     // Now, if the number of registred handles is 0, we can
463                     // quit the loop: we don't have any socket listening
464                     // for incoming connection.
465                     if (nHandles == 0) {
466                         acceptorRef.set(null);
467 
468                         if (registerQueue.isEmpty() && cancelQueue.isEmpty()) {
469                             assert (acceptorRef.get() != this);
470                             break;
471                         }
472 
473                         if (!acceptorRef.compareAndSet(null, this)) {
474                             assert (acceptorRef.get() != this);
475                             break;
476                         }
477 
478                         assert (acceptorRef.get() == this);
479                     }
480 
481                     if (selected > 0) {
482                         // We have some connection request, let's process
483                         // them here.
484                         processHandles(selectedHandles());
485                     }
486 
487                     // check to see if any cancellation request has been made.
488                     nHandles -= unregisterHandles();
489                 } catch (ClosedSelectorException cse) {
490                     // If the selector has been closed, we can exit the loop
491                     ExceptionMonitor.getInstance().exceptionCaught(cse);
492                     break;
493                 } catch (Exception e) {
494                     ExceptionMonitor.getInstance().exceptionCaught(e);
495 
496                     try {
497                         Thread.sleep(1000);
498                     } catch (InterruptedException e1) {
499                         ExceptionMonitor.getInstance().exceptionCaught(e1);
500                     }
501                 }
502             }
503 
504             // Cleanup all the processors, and shutdown the acceptor.
505             if (selectable && isDisposing()) {
506                 selectable = false;
507                 try {
508                     if (createdProcessor) {
509                         processor.dispose();
510                     }
511                 } finally {
512                     try {
513                         synchronized (disposalLock) {
514                             if (isDisposing()) {
515                                 destroy();
516                             }
517                         }
518                     } catch (Exception e) {
519                         ExceptionMonitor.getInstance().exceptionCaught(e);
520                     } finally {
521                         disposalFuture.setDone();
522                     }
523                 }
524             }
525         }
526 
527         /**
528          * This method will process new sessions for the Worker class.  All
529          * keys that have had their status updates as per the Selector.selectedKeys()
530          * method will be processed here.  Only keys that are ready to accept
531          * connections are handled here.
532          * <p/>
533          * Session objects are created by making new instances of SocketSessionImpl
534          * and passing the session object to the SocketIoProcessor class.
535          */
536         @SuppressWarnings("unchecked")
537         private void processHandles(Iterator<H> handles) throws Exception {
538             while (handles.hasNext()) {
539                 H handle = handles.next();
540                 handles.remove();
541 
542                 // Associates a new created connection to a processor,
543                 // and get back a session
544                 S session = accept(processor, handle);
545 
546                 if (session == null) {
547                     continue;
548                 }
549 
550                 initSession(session, null, null);
551 
552                 // add the session to the SocketIoProcessor
553                 session.getProcessor().add(session);
554             }
555         }
556     }
557 
558     /**
559      * Sets up the socket communications.  Sets items such as:
560      * <p/>
561      * Blocking
562      * Reuse address
563      * Receive buffer size
564      * Bind to listen port
565      * Registers OP_ACCEPT for selector
566      */
567     private int registerHandles() {
568         for (;;) {
569             // The register queue contains the list of services to manage
570             // in this acceptor.
571             AcceptorOperationFuture future = registerQueue.poll();
572 
573             if (future == null) {
574                 return 0;
575             }
576 
577             // We create a temporary map to store the bound handles,
578             // as we may have to remove them all if there is an exception
579             // during the sockets opening.
580             Map<SocketAddress, H> newHandles = new ConcurrentHashMap<>();
581             List<SocketAddress> localAddresses = future.getLocalAddresses();
582 
583             try {
584                 // Process all the addresses
585                 for (SocketAddress a : localAddresses) {
586                     H handle = open(a);
587                     newHandles.put(localAddress(handle), handle);
588                 }
589 
590                 // Everything went ok, we can now update the map storing
591                 // all the bound sockets.
592                 boundHandles.putAll(newHandles);
593 
594                 // and notify.
595                 future.setDone();
596                 
597                 return newHandles.size();
598             } catch (Exception e) {
599                 // We store the exception in the future
600                 future.setException(e);
601             } finally {
602                 // Roll back if failed to bind all addresses.
603                 if (future.getException() != null) {
604                     for (H handle : newHandles.values()) {
605                         try {
606                             close(handle);
607                         } catch (Exception e) {
608                             ExceptionMonitor.getInstance().exceptionCaught(e);
609                         }
610                     }
611 
612                     // Wake up the selector to be sure we will process the newly bound handle
613                     // and not block forever in the select()
614                     wakeup();
615                 }
616             }
617         }
618     }
619 
620     /**
621      * This method just checks to see if anything has been placed into the
622      * cancellation queue.  The only thing that should be in the cancelQueue
623      * is CancellationRequest objects and the only place this happens is in
624      * the doUnbind() method.
625      */
626     private int unregisterHandles() {
627         int cancelledHandles = 0;
628         for (;;) {
629             AcceptorOperationFuture future = cancelQueue.poll();
630             if (future == null) {
631                 break;
632             }
633 
634             // close the channels
635             for (SocketAddress a : future.getLocalAddresses()) {
636                 H handle = boundHandles.remove(a);
637 
638                 if (handle == null) {
639                     continue;
640                 }
641 
642                 try {
643                     close(handle);
644                     wakeup(); // wake up again to trigger thread death
645                 } catch (Exception e) {
646                     ExceptionMonitor.getInstance().exceptionCaught(e);
647                 } finally {
648                     cancelledHandles++;
649                 }
650             }
651 
652             future.setDone();
653         }
654 
655         return cancelledHandles;
656     }
657 
658     /**
659      * {@inheritDoc}
660      */
661     @Override
662     public final IoSession newSession(SocketAddress remoteAddress, SocketAddress localAddress) {
663         throw new UnsupportedOperationException();
664     }
665 
666     /**
667      * @return the backLog
668      */
669     public int getBacklog() {
670         return backlog;
671     }
672 
673     /**
674      * Sets the Backlog parameter
675      * 
676      * @param backlog
677      *            the backlog variable
678      */
679     public void setBacklog(int backlog) {
680         synchronized (bindLock) {
681             if (isActive()) {
682                 throw new IllegalStateException("backlog can't be set while the acceptor is bound.");
683             }
684 
685             this.backlog = backlog;
686         }
687     }
688 
689     /**
690      * @return the flag that sets the reuseAddress information
691      */
692     public boolean isReuseAddress() {
693         return reuseAddress;
694     }
695 
696     /**
697      * Set the Reuse Address flag
698      * 
699      * @param reuseAddress
700      *            The flag to set
701      */
702     public void setReuseAddress(boolean reuseAddress) {
703         synchronized (bindLock) {
704             if (isActive()) {
705                 throw new IllegalStateException("backlog can't be set while the acceptor is bound.");
706             }
707 
708             this.reuseAddress = reuseAddress;
709         }
710     }
711 
712     /**
713      * {@inheritDoc}
714      */
715     @Override
716     public SocketSessionConfig getSessionConfig() {
717         return (SocketSessionConfig)sessionConfig;
718     }
719 }